Source code for fm_weck.grpc_service.proto.fm_weck_service_pb2_grpc

# This file is part of fm-weck: executing fm-tools in containerized environments.
# https://gitlab.com/sosy-lab/software/fm-weck
#
# SPDX-FileCopyrightText: 2024 Dirk Beyer <https://www.sosy-lab.org>
#
# SPDX-License-Identifier: Apache-2.0

# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import warnings

from . import fm_weck_service_pb2 as fm__weck__service__pb2

GRPC_GENERATED_VERSION = '1.71.0'
GRPC_VERSION = grpc.__version__
_version_not_supported = False

try:
    from grpc._utilities import first_version_is_lower
    _version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION)
except ImportError:
    _version_not_supported = True

if _version_not_supported:
    raise RuntimeError(
        f'The grpc package installed is at version {GRPC_VERSION},'
        + f' but the generated code in fm_weck_service_pb2_grpc.py depends on'
        + f' grpcio>={GRPC_GENERATED_VERSION}.'
        + f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}'
        + f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.'
    )


[docs] class FmWeckRemoteStub(object): """This service runs fm-weck remotely. """ def __init__(self, channel): """Constructor. Args: channel: A grpc.Channel. """ self.startRun = channel.unary_unary( '/FmWeckRemote/startRun', request_serializer=fm__weck__service__pb2.RunRequest.SerializeToString, response_deserializer=fm__weck__service__pb2.RunID.FromString, _registered_method=True) self.startExpertRun = channel.unary_unary( '/FmWeckRemote/startExpertRun', request_serializer=fm__weck__service__pb2.ExpertRunRequest.SerializeToString, response_deserializer=fm__weck__service__pb2.RunID.FromString, _registered_method=True) self.cancelRun = channel.unary_unary( '/FmWeckRemote/cancelRun', request_serializer=fm__weck__service__pb2.CancelRunRequest.SerializeToString, response_deserializer=fm__weck__service__pb2.CancelRunResult.FromString, _registered_method=True) self.cleanupRun = channel.unary_unary( '/FmWeckRemote/cleanupRun', request_serializer=fm__weck__service__pb2.RunID.SerializeToString, response_deserializer=fm__weck__service__pb2.CleanUpResponse.FromString, _registered_method=True) self.waitOnRun = channel.unary_unary( '/FmWeckRemote/waitOnRun', request_serializer=fm__weck__service__pb2.WaitParameters.SerializeToString, response_deserializer=fm__weck__service__pb2.WaitRunResult.FromString, _registered_method=True) self.queryFiles = channel.unary_stream( '/FmWeckRemote/queryFiles', request_serializer=fm__weck__service__pb2.FileQuery.SerializeToString, response_deserializer=fm__weck__service__pb2.File.FromString, _registered_method=True)
[docs] class FmWeckRemoteServicer(object): """This service runs fm-weck remotely. """
[docs] def startRun(self, request, context): """Runs a verification task for a given C program. """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!')
[docs] def startExpertRun(self, request, context): """Runs a tool in expert mode """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!')
[docs] def cancelRun(self, request, context): """Cancels a previously started run. """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!')
[docs] def cleanupRun(self, request, context): """Cleans up files of a finished run. """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!')
[docs] def waitOnRun(self, request, context): """Gets the result of a previously started run using its unique ID. """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!')
[docs] def queryFiles(self, request, context): """Query for a number of result files. """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!')
[docs] def add_FmWeckRemoteServicer_to_server(servicer, server): rpc_method_handlers = { 'startRun': grpc.unary_unary_rpc_method_handler( servicer.startRun, request_deserializer=fm__weck__service__pb2.RunRequest.FromString, response_serializer=fm__weck__service__pb2.RunID.SerializeToString, ), 'startExpertRun': grpc.unary_unary_rpc_method_handler( servicer.startExpertRun, request_deserializer=fm__weck__service__pb2.ExpertRunRequest.FromString, response_serializer=fm__weck__service__pb2.RunID.SerializeToString, ), 'cancelRun': grpc.unary_unary_rpc_method_handler( servicer.cancelRun, request_deserializer=fm__weck__service__pb2.CancelRunRequest.FromString, response_serializer=fm__weck__service__pb2.CancelRunResult.SerializeToString, ), 'cleanupRun': grpc.unary_unary_rpc_method_handler( servicer.cleanupRun, request_deserializer=fm__weck__service__pb2.RunID.FromString, response_serializer=fm__weck__service__pb2.CleanUpResponse.SerializeToString, ), 'waitOnRun': grpc.unary_unary_rpc_method_handler( servicer.waitOnRun, request_deserializer=fm__weck__service__pb2.WaitParameters.FromString, response_serializer=fm__weck__service__pb2.WaitRunResult.SerializeToString, ), 'queryFiles': grpc.unary_stream_rpc_method_handler( servicer.queryFiles, request_deserializer=fm__weck__service__pb2.FileQuery.FromString, response_serializer=fm__weck__service__pb2.File.SerializeToString, ), } generic_handler = grpc.method_handlers_generic_handler( 'FmWeckRemote', rpc_method_handlers) server.add_generic_rpc_handlers((generic_handler,)) server.add_registered_method_handlers('FmWeckRemote', rpc_method_handlers)
# This class is part of an EXPERIMENTAL API.
[docs] class FmWeckRemote(object): """This service runs fm-weck remotely. """
[docs] @staticmethod def startRun(request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None): return grpc.experimental.unary_unary( request, target, '/FmWeckRemote/startRun', fm__weck__service__pb2.RunRequest.SerializeToString, fm__weck__service__pb2.RunID.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, _registered_method=True)
[docs] @staticmethod def startExpertRun(request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None): return grpc.experimental.unary_unary( request, target, '/FmWeckRemote/startExpertRun', fm__weck__service__pb2.ExpertRunRequest.SerializeToString, fm__weck__service__pb2.RunID.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, _registered_method=True)
[docs] @staticmethod def cancelRun(request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None): return grpc.experimental.unary_unary( request, target, '/FmWeckRemote/cancelRun', fm__weck__service__pb2.CancelRunRequest.SerializeToString, fm__weck__service__pb2.CancelRunResult.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, _registered_method=True)
[docs] @staticmethod def cleanupRun(request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None): return grpc.experimental.unary_unary( request, target, '/FmWeckRemote/cleanupRun', fm__weck__service__pb2.RunID.SerializeToString, fm__weck__service__pb2.CleanUpResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, _registered_method=True)
[docs] @staticmethod def waitOnRun(request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None): return grpc.experimental.unary_unary( request, target, '/FmWeckRemote/waitOnRun', fm__weck__service__pb2.WaitParameters.SerializeToString, fm__weck__service__pb2.WaitRunResult.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, _registered_method=True)
[docs] @staticmethod def queryFiles(request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None): return grpc.experimental.unary_stream( request, target, '/FmWeckRemote/queryFiles', fm__weck__service__pb2.FileQuery.SerializeToString, fm__weck__service__pb2.File.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, _registered_method=True)