diff --git a/src/isolate/server/definitions/server.proto b/src/isolate/server/definitions/server.proto index 1df105a9..e004f23e 100644 --- a/src/isolate/server/definitions/server.proto +++ b/src/isolate/server/definitions/server.proto @@ -10,6 +10,9 @@ service Isolate { // Submit a function to be run without waiting for results. rpc Submit (SubmitRequest) returns (SubmitResponse) {} + + // Subscribe to a submitted function. + rpc Subscribe (SubscribeRequest) returns (stream PartialRunResult) {} // Set the metadata for a task. rpc SetMetadata (SetMetadataRequest) returns (SetMetadataResponse) {} @@ -44,6 +47,10 @@ message SubmitRequest { TaskMetadata metadata = 2; } +message SubscribeRequest { + string task_id = 1; +} + message TaskMetadata { // Labels to attach to the logs. map logger_labels = 1; diff --git a/src/isolate/server/definitions/server_pb2.py b/src/isolate/server/definitions/server_pb2.py index dbab29df..8afd1f4f 100644 --- a/src/isolate/server/definitions/server_pb2.py +++ b/src/isolate/server/definitions/server_pb2.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: server.proto -# Protobuf Python Version: 4.25.1 +# Protobuf Python Version: 5.26.1 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool @@ -16,14 +16,14 @@ from google.protobuf import struct_pb2 as google_dot_protobuf_dot_struct__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0cserver.proto\x1a\x0c\x63ommon.proto\x1a\x1cgoogle/protobuf/struct.proto\"\xb2\x01\n\rBoundFunction\x12,\n\x0c\x65nvironments\x18\x01 \x03(\x0b\x32\x16.EnvironmentDefinition\x12#\n\x08\x66unction\x18\x02 \x01(\x0b\x32\x11.SerializedObject\x12*\n\nsetup_func\x18\x03 \x01(\x0b\x32\x11.SerializedObjectH\x00\x88\x01\x01\x12\x13\n\x0bstream_logs\x18\x04 \x01(\x08\x42\r\n\x0b_setup_func\"d\n\x15\x45nvironmentDefinition\x12\x0c\n\x04kind\x18\x01 \x01(\t\x12.\n\rconfiguration\x18\x02 \x01(\x0b\x32\x17.google.protobuf.Struct\x12\r\n\x05\x66orce\x18\x03 \x01(\x08\"R\n\rSubmitRequest\x12 \n\x08\x66unction\x18\x01 \x01(\x0b\x32\x0e.BoundFunction\x12\x1f\n\x08metadata\x18\x02 \x01(\x0b\x32\r.TaskMetadata\"{\n\x0cTaskMetadata\x12\x36\n\rlogger_labels\x18\x01 \x03(\x0b\x32\x1f.TaskMetadata.LoggerLabelsEntry\x1a\x33\n\x11LoggerLabelsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"!\n\x0eSubmitResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"F\n\x12SetMetadataRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x1f\n\x08metadata\x18\x02 \x01(\x0b\x32\r.TaskMetadata\"\x15\n\x13SetMetadataResponse\"\r\n\x0bListRequest\"\x1b\n\x08TaskInfo\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"(\n\x0cListResponse\x12\x18\n\x05tasks\x18\x01 \x03(\x0b\x32\t.TaskInfo\" \n\rCancelRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"\x10\n\x0e\x43\x61ncelResponse2\xf4\x01\n\x07Isolate\x12,\n\x03Run\x12\x0e.BoundFunction\x1a\x11.PartialRunResult\"\x00\x30\x01\x12+\n\x06Submit\x12\x0e.SubmitRequest\x1a\x0f.SubmitResponse\"\x00\x12:\n\x0bSetMetadata\x12\x13.SetMetadataRequest\x1a\x14.SetMetadataResponse\"\x00\x12%\n\x04List\x12\x0c.ListRequest\x1a\r.ListResponse\"\x00\x12+\n\x06\x43\x61ncel\x12\x0e.CancelRequest\x1a\x0f.CancelResponse\"\x00\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0cserver.proto\x1a\x0c\x63ommon.proto\x1a\x1cgoogle/protobuf/struct.proto\"\xb2\x01\n\rBoundFunction\x12,\n\x0c\x65nvironments\x18\x01 \x03(\x0b\x32\x16.EnvironmentDefinition\x12#\n\x08\x66unction\x18\x02 \x01(\x0b\x32\x11.SerializedObject\x12*\n\nsetup_func\x18\x03 \x01(\x0b\x32\x11.SerializedObjectH\x00\x88\x01\x01\x12\x13\n\x0bstream_logs\x18\x04 \x01(\x08\x42\r\n\x0b_setup_func\"d\n\x15\x45nvironmentDefinition\x12\x0c\n\x04kind\x18\x01 \x01(\t\x12.\n\rconfiguration\x18\x02 \x01(\x0b\x32\x17.google.protobuf.Struct\x12\r\n\x05\x66orce\x18\x03 \x01(\x08\"R\n\rSubmitRequest\x12 \n\x08\x66unction\x18\x01 \x01(\x0b\x32\x0e.BoundFunction\x12\x1f\n\x08metadata\x18\x02 \x01(\x0b\x32\r.TaskMetadata\"#\n\x10SubscribeRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"{\n\x0cTaskMetadata\x12\x36\n\rlogger_labels\x18\x01 \x03(\x0b\x32\x1f.TaskMetadata.LoggerLabelsEntry\x1a\x33\n\x11LoggerLabelsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"!\n\x0eSubmitResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"F\n\x12SetMetadataRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x1f\n\x08metadata\x18\x02 \x01(\x0b\x32\r.TaskMetadata\"\x15\n\x13SetMetadataResponse\"\r\n\x0bListRequest\"\x1b\n\x08TaskInfo\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"(\n\x0cListResponse\x12\x18\n\x05tasks\x18\x01 \x03(\x0b\x32\t.TaskInfo\" \n\rCancelRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"\x10\n\x0e\x43\x61ncelResponse2\xab\x02\n\x07Isolate\x12,\n\x03Run\x12\x0e.BoundFunction\x1a\x11.PartialRunResult\"\x00\x30\x01\x12+\n\x06Submit\x12\x0e.SubmitRequest\x1a\x0f.SubmitResponse\"\x00\x12\x35\n\tSubscribe\x12\x11.SubscribeRequest\x1a\x11.PartialRunResult\"\x00\x30\x01\x12:\n\x0bSetMetadata\x12\x13.SetMetadataRequest\x1a\x14.SetMetadataResponse\"\x00\x12%\n\x04List\x12\x0c.ListRequest\x1a\r.ListResponse\"\x00\x12+\n\x06\x43\x61ncel\x12\x0e.CancelRequest\x1a\x0f.CancelResponse\"\x00\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'server_pb2', _globals) -if _descriptor._USE_C_DESCRIPTORS == False: - DESCRIPTOR._options = None - _globals['_TASKMETADATA_LOGGERLABELSENTRY']._options = None +if not _descriptor._USE_C_DESCRIPTORS: + DESCRIPTOR._loaded_options = None + _globals['_TASKMETADATA_LOGGERLABELSENTRY']._loaded_options = None _globals['_TASKMETADATA_LOGGERLABELSENTRY']._serialized_options = b'8\001' _globals['_BOUNDFUNCTION']._serialized_start=61 _globals['_BOUNDFUNCTION']._serialized_end=239 @@ -31,26 +31,28 @@ _globals['_ENVIRONMENTDEFINITION']._serialized_end=341 _globals['_SUBMITREQUEST']._serialized_start=343 _globals['_SUBMITREQUEST']._serialized_end=425 - _globals['_TASKMETADATA']._serialized_start=427 - _globals['_TASKMETADATA']._serialized_end=550 - _globals['_TASKMETADATA_LOGGERLABELSENTRY']._serialized_start=499 - _globals['_TASKMETADATA_LOGGERLABELSENTRY']._serialized_end=550 - _globals['_SUBMITRESPONSE']._serialized_start=552 - _globals['_SUBMITRESPONSE']._serialized_end=585 - _globals['_SETMETADATAREQUEST']._serialized_start=587 - _globals['_SETMETADATAREQUEST']._serialized_end=657 - _globals['_SETMETADATARESPONSE']._serialized_start=659 - _globals['_SETMETADATARESPONSE']._serialized_end=680 - _globals['_LISTREQUEST']._serialized_start=682 - _globals['_LISTREQUEST']._serialized_end=695 - _globals['_TASKINFO']._serialized_start=697 - _globals['_TASKINFO']._serialized_end=724 - _globals['_LISTRESPONSE']._serialized_start=726 - _globals['_LISTRESPONSE']._serialized_end=766 - _globals['_CANCELREQUEST']._serialized_start=768 - _globals['_CANCELREQUEST']._serialized_end=800 - _globals['_CANCELRESPONSE']._serialized_start=802 - _globals['_CANCELRESPONSE']._serialized_end=818 - _globals['_ISOLATE']._serialized_start=821 - _globals['_ISOLATE']._serialized_end=1065 + _globals['_SUBSCRIBEREQUEST']._serialized_start=427 + _globals['_SUBSCRIBEREQUEST']._serialized_end=462 + _globals['_TASKMETADATA']._serialized_start=464 + _globals['_TASKMETADATA']._serialized_end=587 + _globals['_TASKMETADATA_LOGGERLABELSENTRY']._serialized_start=536 + _globals['_TASKMETADATA_LOGGERLABELSENTRY']._serialized_end=587 + _globals['_SUBMITRESPONSE']._serialized_start=589 + _globals['_SUBMITRESPONSE']._serialized_end=622 + _globals['_SETMETADATAREQUEST']._serialized_start=624 + _globals['_SETMETADATAREQUEST']._serialized_end=694 + _globals['_SETMETADATARESPONSE']._serialized_start=696 + _globals['_SETMETADATARESPONSE']._serialized_end=717 + _globals['_LISTREQUEST']._serialized_start=719 + _globals['_LISTREQUEST']._serialized_end=732 + _globals['_TASKINFO']._serialized_start=734 + _globals['_TASKINFO']._serialized_end=761 + _globals['_LISTRESPONSE']._serialized_start=763 + _globals['_LISTRESPONSE']._serialized_end=803 + _globals['_CANCELREQUEST']._serialized_start=805 + _globals['_CANCELREQUEST']._serialized_end=837 + _globals['_CANCELRESPONSE']._serialized_start=839 + _globals['_CANCELRESPONSE']._serialized_end=855 + _globals['_ISOLATE']._serialized_start=858 + _globals['_ISOLATE']._serialized_end=1157 # @@protoc_insertion_point(module_scope) diff --git a/src/isolate/server/definitions/server_pb2.pyi b/src/isolate/server/definitions/server_pb2.pyi index 30504e71..316c65e6 100644 --- a/src/isolate/server/definitions/server_pb2.pyi +++ b/src/isolate/server/definitions/server_pb2.pyi @@ -2,6 +2,7 @@ @generated by mypy-protobuf. Do not edit manually! isort:skip_file """ + import builtins import collections.abc from isolate.connections.grpc.definitions import common_pb2 @@ -9,16 +10,11 @@ import google.protobuf.descriptor import google.protobuf.internal.containers import google.protobuf.message import google.protobuf.struct_pb2 -import sys - -if sys.version_info >= (3, 8): - import typing as typing_extensions -else: - import typing_extensions +import typing DESCRIPTOR: google.protobuf.descriptor.FileDescriptor -@typing_extensions.final +@typing.final class BoundFunction(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -26,13 +22,13 @@ class BoundFunction(google.protobuf.message.Message): FUNCTION_FIELD_NUMBER: builtins.int SETUP_FUNC_FIELD_NUMBER: builtins.int STREAM_LOGS_FIELD_NUMBER: builtins.int + stream_logs: builtins.bool @property def environments(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___EnvironmentDefinition]: ... @property def function(self) -> common_pb2.SerializedObject: ... @property def setup_func(self) -> common_pb2.SerializedObject: ... - stream_logs: builtins.bool def __init__( self, *, @@ -41,13 +37,13 @@ class BoundFunction(google.protobuf.message.Message): setup_func: common_pb2.SerializedObject | None = ..., stream_logs: builtins.bool = ..., ) -> None: ... - def HasField(self, field_name: typing_extensions.Literal["_setup_func", b"_setup_func", "function", b"function", "setup_func", b"setup_func"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["_setup_func", b"_setup_func", "environments", b"environments", "function", b"function", "setup_func", b"setup_func", "stream_logs", b"stream_logs"]) -> None: ... - def WhichOneof(self, oneof_group: typing_extensions.Literal["_setup_func", b"_setup_func"]) -> typing_extensions.Literal["setup_func"] | None: ... + def HasField(self, field_name: typing.Literal["_setup_func", b"_setup_func", "function", b"function", "setup_func", b"setup_func"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["_setup_func", b"_setup_func", "environments", b"environments", "function", b"function", "setup_func", b"setup_func", "stream_logs", b"stream_logs"]) -> None: ... + def WhichOneof(self, oneof_group: typing.Literal["_setup_func", b"_setup_func"]) -> typing.Literal["setup_func"] | None: ... global___BoundFunction = BoundFunction -@typing_extensions.final +@typing.final class EnvironmentDefinition(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -56,11 +52,12 @@ class EnvironmentDefinition(google.protobuf.message.Message): FORCE_FIELD_NUMBER: builtins.int kind: builtins.str """Kind of the isolate environment.""" + force: builtins.bool + """Whether to force-create this environment or not.""" @property def configuration(self) -> google.protobuf.struct_pb2.Struct: """A free-form definition of environment properties.""" - force: builtins.bool - """Whether to force-create this environment or not.""" + def __init__( self, *, @@ -68,12 +65,12 @@ class EnvironmentDefinition(google.protobuf.message.Message): configuration: google.protobuf.struct_pb2.Struct | None = ..., force: builtins.bool = ..., ) -> None: ... - def HasField(self, field_name: typing_extensions.Literal["configuration", b"configuration"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["configuration", b"configuration", "force", b"force", "kind", b"kind"]) -> None: ... + def HasField(self, field_name: typing.Literal["configuration", b"configuration"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["configuration", b"configuration", "force", b"force", "kind", b"kind"]) -> None: ... global___EnvironmentDefinition = EnvironmentDefinition -@typing_extensions.final +@typing.final class SubmitRequest(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -82,25 +79,42 @@ class SubmitRequest(google.protobuf.message.Message): @property def function(self) -> global___BoundFunction: """The function to run.""" + @property def metadata(self) -> global___TaskMetadata: """Task metadata.""" + def __init__( self, *, function: global___BoundFunction | None = ..., metadata: global___TaskMetadata | None = ..., ) -> None: ... - def HasField(self, field_name: typing_extensions.Literal["function", b"function", "metadata", b"metadata"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["function", b"function", "metadata", b"metadata"]) -> None: ... + def HasField(self, field_name: typing.Literal["function", b"function", "metadata", b"metadata"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["function", b"function", "metadata", b"metadata"]) -> None: ... global___SubmitRequest = SubmitRequest -@typing_extensions.final +@typing.final +class SubscribeRequest(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + TASK_ID_FIELD_NUMBER: builtins.int + task_id: builtins.str + def __init__( + self, + *, + task_id: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing.Literal["task_id", b"task_id"]) -> None: ... + +global___SubscribeRequest = SubscribeRequest + +@typing.final class TaskMetadata(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor - @typing_extensions.final + @typing.final class LoggerLabelsEntry(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -114,22 +128,23 @@ class TaskMetadata(google.protobuf.message.Message): key: builtins.str = ..., value: builtins.str = ..., ) -> None: ... - def ClearField(self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"]) -> None: ... + def ClearField(self, field_name: typing.Literal["key", b"key", "value", b"value"]) -> None: ... LOGGER_LABELS_FIELD_NUMBER: builtins.int @property def logger_labels(self) -> google.protobuf.internal.containers.ScalarMap[builtins.str, builtins.str]: """Labels to attach to the logs.""" + def __init__( self, *, logger_labels: collections.abc.Mapping[builtins.str, builtins.str] | None = ..., ) -> None: ... - def ClearField(self, field_name: typing_extensions.Literal["logger_labels", b"logger_labels"]) -> None: ... + def ClearField(self, field_name: typing.Literal["logger_labels", b"logger_labels"]) -> None: ... global___TaskMetadata = TaskMetadata -@typing_extensions.final +@typing.final class SubmitResponse(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -140,11 +155,11 @@ class SubmitResponse(google.protobuf.message.Message): *, task_id: builtins.str = ..., ) -> None: ... - def ClearField(self, field_name: typing_extensions.Literal["task_id", b"task_id"]) -> None: ... + def ClearField(self, field_name: typing.Literal["task_id", b"task_id"]) -> None: ... global___SubmitResponse = SubmitResponse -@typing_extensions.final +@typing.final class SetMetadataRequest(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -159,12 +174,12 @@ class SetMetadataRequest(google.protobuf.message.Message): task_id: builtins.str = ..., metadata: global___TaskMetadata | None = ..., ) -> None: ... - def HasField(self, field_name: typing_extensions.Literal["metadata", b"metadata"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["metadata", b"metadata", "task_id", b"task_id"]) -> None: ... + def HasField(self, field_name: typing.Literal["metadata", b"metadata"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["metadata", b"metadata", "task_id", b"task_id"]) -> None: ... global___SetMetadataRequest = SetMetadataRequest -@typing_extensions.final +@typing.final class SetMetadataResponse(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -174,7 +189,7 @@ class SetMetadataResponse(google.protobuf.message.Message): global___SetMetadataResponse = SetMetadataResponse -@typing_extensions.final +@typing.final class ListRequest(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -184,7 +199,7 @@ class ListRequest(google.protobuf.message.Message): global___ListRequest = ListRequest -@typing_extensions.final +@typing.final class TaskInfo(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -195,11 +210,11 @@ class TaskInfo(google.protobuf.message.Message): *, task_id: builtins.str = ..., ) -> None: ... - def ClearField(self, field_name: typing_extensions.Literal["task_id", b"task_id"]) -> None: ... + def ClearField(self, field_name: typing.Literal["task_id", b"task_id"]) -> None: ... global___TaskInfo = TaskInfo -@typing_extensions.final +@typing.final class ListResponse(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -211,11 +226,11 @@ class ListResponse(google.protobuf.message.Message): *, tasks: collections.abc.Iterable[global___TaskInfo] | None = ..., ) -> None: ... - def ClearField(self, field_name: typing_extensions.Literal["tasks", b"tasks"]) -> None: ... + def ClearField(self, field_name: typing.Literal["tasks", b"tasks"]) -> None: ... global___ListResponse = ListResponse -@typing_extensions.final +@typing.final class CancelRequest(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -226,11 +241,11 @@ class CancelRequest(google.protobuf.message.Message): *, task_id: builtins.str = ..., ) -> None: ... - def ClearField(self, field_name: typing_extensions.Literal["task_id", b"task_id"]) -> None: ... + def ClearField(self, field_name: typing.Literal["task_id", b"task_id"]) -> None: ... global___CancelRequest = CancelRequest -@typing_extensions.final +@typing.final class CancelResponse(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor diff --git a/src/isolate/server/definitions/server_pb2_grpc.py b/src/isolate/server/definitions/server_pb2_grpc.py index 429034ee..7ef2fc86 100644 --- a/src/isolate/server/definitions/server_pb2_grpc.py +++ b/src/isolate/server/definitions/server_pb2_grpc.py @@ -1,10 +1,35 @@ # Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! """Client and server classes corresponding to protobuf-defined services.""" import grpc +import warnings from isolate.connections.grpc.definitions import common_pb2 as common__pb2 from isolate.server.definitions import server_pb2 as server__pb2 +GRPC_GENERATED_VERSION = '1.64.0' +GRPC_VERSION = grpc.__version__ +EXPECTED_ERROR_RELEASE = '1.65.0' +SCHEDULED_RELEASE_DATE = 'June 25, 2024' +_version_not_supported = False + +try: + from grpc._utilities import first_version_is_lower + _version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION) +except ImportError: + _version_not_supported = True + +if _version_not_supported: + warnings.warn( + f'The grpc package installed is at version {GRPC_VERSION},' + + f' but the generated code in server_pb2_grpc.py depends on' + + f' grpcio>={GRPC_GENERATED_VERSION}.' + + f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}' + + f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.' + + f' This warning will become an error in {EXPECTED_ERROR_RELEASE},' + + f' scheduled for release on {SCHEDULED_RELEASE_DATE}.', + RuntimeWarning + ) + class IsolateStub(object): """Missing associated documentation comment in .proto file.""" @@ -19,27 +44,32 @@ def __init__(self, channel): '/Isolate/Run', request_serializer=server__pb2.BoundFunction.SerializeToString, response_deserializer=common__pb2.PartialRunResult.FromString, - ) + _registered_method=True) self.Submit = channel.unary_unary( '/Isolate/Submit', request_serializer=server__pb2.SubmitRequest.SerializeToString, response_deserializer=server__pb2.SubmitResponse.FromString, - ) + _registered_method=True) + self.Subscribe = channel.unary_stream( + '/Isolate/Subscribe', + request_serializer=server__pb2.SubscribeRequest.SerializeToString, + response_deserializer=common__pb2.PartialRunResult.FromString, + _registered_method=True) self.SetMetadata = channel.unary_unary( '/Isolate/SetMetadata', request_serializer=server__pb2.SetMetadataRequest.SerializeToString, response_deserializer=server__pb2.SetMetadataResponse.FromString, - ) + _registered_method=True) self.List = channel.unary_unary( '/Isolate/List', request_serializer=server__pb2.ListRequest.SerializeToString, response_deserializer=server__pb2.ListResponse.FromString, - ) + _registered_method=True) self.Cancel = channel.unary_unary( '/Isolate/Cancel', request_serializer=server__pb2.CancelRequest.SerializeToString, response_deserializer=server__pb2.CancelResponse.FromString, - ) + _registered_method=True) class IsolateServicer(object): @@ -60,6 +90,13 @@ def Submit(self, request, context): context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') + def Subscribe(self, request, context): + """Subscribe to a submitted function. + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + def SetMetadata(self, request, context): """Set the metadata for a task. """ @@ -94,6 +131,11 @@ def add_IsolateServicer_to_server(servicer, server): request_deserializer=server__pb2.SubmitRequest.FromString, response_serializer=server__pb2.SubmitResponse.SerializeToString, ), + 'Subscribe': grpc.unary_stream_rpc_method_handler( + servicer.Subscribe, + request_deserializer=server__pb2.SubscribeRequest.FromString, + response_serializer=common__pb2.PartialRunResult.SerializeToString, + ), 'SetMetadata': grpc.unary_unary_rpc_method_handler( servicer.SetMetadata, request_deserializer=server__pb2.SetMetadataRequest.FromString, @@ -113,6 +155,7 @@ def add_IsolateServicer_to_server(servicer, server): generic_handler = grpc.method_handlers_generic_handler( 'Isolate', rpc_method_handlers) server.add_generic_rpc_handlers((generic_handler,)) + server.add_registered_method_handlers('Isolate', rpc_method_handlers) # This class is part of an EXPERIMENTAL API. @@ -130,11 +173,21 @@ def Run(request, wait_for_ready=None, timeout=None, metadata=None): - return grpc.experimental.unary_stream(request, target, '/Isolate/Run', + return grpc.experimental.unary_stream( + request, + target, + '/Isolate/Run', server__pb2.BoundFunction.SerializeToString, common__pb2.PartialRunResult.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) @staticmethod def Submit(request, @@ -147,11 +200,48 @@ def Submit(request, wait_for_ready=None, timeout=None, metadata=None): - return grpc.experimental.unary_unary(request, target, '/Isolate/Submit', + return grpc.experimental.unary_unary( + request, + target, + '/Isolate/Submit', server__pb2.SubmitRequest.SerializeToString, server__pb2.SubmitResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def Subscribe(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_stream( + request, + target, + '/Isolate/Subscribe', + server__pb2.SubscribeRequest.SerializeToString, + common__pb2.PartialRunResult.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) @staticmethod def SetMetadata(request, @@ -164,11 +254,21 @@ def SetMetadata(request, wait_for_ready=None, timeout=None, metadata=None): - return grpc.experimental.unary_unary(request, target, '/Isolate/SetMetadata', + return grpc.experimental.unary_unary( + request, + target, + '/Isolate/SetMetadata', server__pb2.SetMetadataRequest.SerializeToString, server__pb2.SetMetadataResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) @staticmethod def List(request, @@ -181,11 +281,21 @@ def List(request, wait_for_ready=None, timeout=None, metadata=None): - return grpc.experimental.unary_unary(request, target, '/Isolate/List', + return grpc.experimental.unary_unary( + request, + target, + '/Isolate/List', server__pb2.ListRequest.SerializeToString, server__pb2.ListResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) @staticmethod def Cancel(request, @@ -198,8 +308,18 @@ def Cancel(request, wait_for_ready=None, timeout=None, metadata=None): - return grpc.experimental.unary_unary(request, target, '/Isolate/Cancel', + return grpc.experimental.unary_unary( + request, + target, + '/Isolate/Cancel', server__pb2.CancelRequest.SerializeToString, server__pb2.CancelResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) diff --git a/src/isolate/server/server.py b/src/isolate/server/server.py index e2a81c85..ba68cc20 100644 --- a/src/isolate/server/server.py +++ b/src/isolate/server/server.py @@ -225,6 +225,14 @@ class IsolateServicer(definitions.IsolateServicer): bridge_manager: BridgeManager default_settings: IsolateSettings = field(default_factory=IsolateSettings) background_tasks: dict[str, RunTask] = field(default_factory=dict) + + task_message_queues: dict[str, Queue[definitions.PartialRunResult]] = field( + default_factory=dict + ) + + _active_subscriptions: dict[str, threading.Event] = field(default_factory=dict) + _subscription_lock: threading.Lock = field(default_factory=threading.Lock) + _shutting_down: bool = field(default=False) _thread_pool: futures.ThreadPoolExecutor = field( @@ -360,9 +368,17 @@ def _run_task(self, task: RunTask) -> Iterator[definitions.PartialRunResult]: StatusCode.UNKNOWN, ) - def _run_task_in_background(self, task: RunTask) -> None: - for _ in self._run_task(task): - pass + def _run_task_in_background(self, task: RunTask, task_id: str) -> None: + self.task_message_queues[task_id] = Queue() + try: + for message in self._run_task(task): + self.task_message_queues[task_id].put_nowait(message) + print(f"Task {task_id} finished with result: {message}") + except Exception as e: + print(f"Task {task_id} finished with error: {e}") + raise e + finally: + self.background_tasks.pop(task_id, None) def Submit( self, @@ -372,25 +388,75 @@ def Submit( task = RunTask(request=request.function) self.set_metadata(task, request.metadata) - task.future = self._thread_pool.submit(self._run_task_in_background, task) task_id = str(uuid.uuid4()) + task.future = self._thread_pool.submit( + self._run_task_in_background, task, task_id + ) print(f"Submitted a task {task_id}") self.background_tasks[task_id] = task - def _callback(future: futures.Future) -> None: - msg = f"Task {task_id} finished with" - if exc := future.exception(): - msg += f" error: {exc!r}" - else: - msg += f" result: {future.result()!r}" - print(msg) - self.background_tasks.pop(task_id, None) + return definitions.SubmitResponse(task_id=task_id) - task.future.add_done_callback(_callback) + def Subscribe( + self, + request: definitions.SubscribeRequest, + context: ServicerContext, + ) -> Iterator[definitions.PartialRunResult]: + task_id = request.task_id + if ( + task_id not in self.background_tasks + or task_id not in self.task_message_queues + ): + raise GRPCException( + f"Task {task_id} not found.", + StatusCode.NOT_FOUND, + ) - return definitions.SubmitResponse(task_id=task_id) + future = self.background_tasks[task_id].future + + start_time = time.monotonic() + while future is None and time.monotonic() - start_time < 10: + time.sleep(0.1) + future = self.background_tasks[task_id].future + + if future is None: + raise GRPCException( + f"Timeout waiting for task {task_id} to start.", + StatusCode.DEADLINE_EXCEEDED, + ) + + # Cancel any existing subscriber for this task and register ourselves. + cancelled = threading.Event() + with self._subscription_lock: + old_event = self._active_subscriptions.get(task_id) + if old_event is not None: + old_event.set() # Signal the previous subscriber to stop + self._active_subscriptions[task_id] = cancelled + + queue = self.task_message_queues[task_id] + + try: + yield from self.watch_queue_until_completed( + queue, future.done, cancelled=cancelled + ) + + if cancelled.is_set(): + raise GRPCException( + "Subscription to task " + f"{task_id} was superseded by a new subscriber.", + StatusCode.ABORTED, + ) + + exception = future.exception(timeout=0.1) + if exception is not None: + raise exception + finally: + with self._subscription_lock: + # Only clean up if we are still the active subscriber + if self._active_subscriptions.get(task_id) is cancelled: + del self._active_subscriptions[task_id] def SetMetadata( self, @@ -469,15 +535,24 @@ def shutdown(self) -> None: print("All tasks canceled.") def watch_queue_until_completed( - self, queue: Queue, is_completed: Callable[[], bool] + self, + queue: Queue, + is_completed: Callable[[], bool], + cancelled: threading.Event | None = None, ) -> Iterator[definitions.PartialRunResult]: """Watch the given queue until the is_completed function returns True. Note that even if the function is completed, this function might not finish until the queue is empty. + + If a ``cancelled`` event is provided and becomes set, the watcher + will stop yielding immediately so the caller can handle the + cancellation (e.g. a subscription superseded by a new subscriber). """ timer = time.monotonic() while not is_completed(): + if cancelled is not None and cancelled.is_set(): + return try: yield queue.get(timeout=_Q_WAIT_DELAY) except QueueEmpty: @@ -492,7 +567,7 @@ def watch_queue_until_completed( ) # Clear the final messages - while not queue.empty(): + while not queue.empty() and not (cancelled is not None and cancelled.is_set()): try: yield queue.get_nowait() except QueueEmpty: