From d586a570404dc5629e965ed7369ac1f965cef2e1 Mon Sep 17 00:00:00 2001 From: tsubakiky Date: Mon, 12 May 2025 20:45:09 +0900 Subject: [PATCH] handler: update doc --- conformance/server.py | 1 - src/connect/handler.py | 455 +++++++++++++++++++---------------------- src/connect/writer.py | 63 +++--- 3 files changed, 240 insertions(+), 279 deletions(-) diff --git a/conformance/server.py b/conformance/server.py index 6ff727f..ee046c9 100644 --- a/conformance/server.py +++ b/conformance/server.py @@ -19,7 +19,6 @@ create_ConformanceService_handlers, ) -logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger("conformance.server") diff --git a/src/connect/handler.py b/src/connect/handler.py index 338dc03..ee1f9c0 100644 --- a/src/connect/handler.py +++ b/src/connect/handler.py @@ -1,7 +1,6 @@ """Module provides handler configurations and implementations for unary procedures and stream types.""" import asyncio -import logging from collections.abc import Awaitable, Callable from http import HTTPMethod, HTTPStatus from typing import Any @@ -47,17 +46,22 @@ from connect.utils import aiterate from connect.writer import ServerResponseWriter -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) +type UnaryFunc[T_Request, T_Response] = Callable[[UnaryRequest[T_Request]], Awaitable[UnaryResponse[T_Response]]] +type StreamFunc[T_Request, T_Response] = Callable[[StreamRequest[T_Request]], Awaitable[StreamResponse[T_Response]]] class HandlerConfig: - """Configuration class for handling procedures and stream types. + """HandlerConfig encapsulates the configuration for a handler in the Connect framework. Attributes: - procedure (str): The name of the procedure to handle. - stream_type (StreamType): The type of stream to use. - codecs (dict[str, Codec]): A dictionary of codecs used for encoding and decoding. + codecs (dict[str, Codec]): A mapping of codec names to codec instances supported by the handler. + compressions (list[Compression]): A list of compression algorithms supported by the handler. + descriptor (Any): The descriptor providing metadata about the procedure. + compress_min_bytes (int): The minimum message size (in bytes) before compression is applied. + read_max_bytes (int): The maximum number of bytes allowed to be read in a single message. + send_max_bytes (int): The maximum number of bytes allowed to be sent in a single message. + require_connect_protocol_header (bool): Whether the Connect protocol header is required. + idempotency_level (IdempotencyLevel): The idempotency level of the procedure. """ @@ -73,12 +77,12 @@ class HandlerConfig: idempotency_level: IdempotencyLevel def __init__(self, procedure: str, stream_type: StreamType, options: ConnectOptions): - """Initialize the handler with the given procedure, stream type, and optional settings. + """Initialize a new handler instance with the specified procedure, stream type, and options. Args: procedure (str): The name of the procedure to handle. - stream_type (StreamType): The type of stream to use. - options (Any, optional): Additional options for the handler. Defaults to None. + stream_type (StreamType): The type of stream (e.g., unary, server streaming, etc.). + options (ConnectOptions): Configuration options for the handler, including descriptor, compression, and protocol settings. """ self.procedure = procedure @@ -98,10 +102,10 @@ def __init__(self, procedure: str, stream_type: StreamType, options: ConnectOpti self.idempotency_level = options.idempotency_level def spec(self) -> Spec: - """Return a Spec object initialized with the current stream type. + """Create and returns a Spec object initialized with the current handler's procedure, descriptor, stream type, and idempotency level. Returns: - Spec: An instance of the Spec class with the stream type set to the current stream type. + Spec: An instance of the Spec class containing the handler's configuration. """ return Spec( @@ -113,14 +117,14 @@ def spec(self) -> Spec: def create_protocol_handlers(config: HandlerConfig) -> list[ProtocolHandler]: - """Create a list of protocol handlers based on the given configuration. + """Create and returns a list of protocol handlers based on the provided configuration. Args: - config (HandlerConfig): The configuration object containing the necessary parameters - such as codecs and protocol specifications. + config (HandlerConfig): The configuration object containing settings for codecs, compressions, + byte limits, protocol requirements, and idempotency level. Returns: - list[ProtocolHandler]: A list of initialized protocol handlers. + list[ProtocolHandler]: A list of initialized protocol handler instances for each supported protocol. """ protocols = [ProtocolConnect(), ProtocolGRPC(web=False), ProtocolGRPC(web=True)] @@ -148,14 +152,13 @@ def create_protocol_handlers(config: HandlerConfig) -> list[ProtocolHandler]: class Handler: - """A class to handle incoming HTTP requests and generate appropriate HTTP responses. + """Handler is an abstract base class for handling HTTP requests in a protocol-agnostic way, supporting both unary and streaming RPCs. Attributes: - procedure (str): The procedure name. - protocol_handlers (dict[HTTPMethod, list[ProtocolHandler]]): A dictionary mapping HTTP methods to protocol handlers. - allow_methods (str): Allowed HTTP methods. - accept_post (str): Accepted content types for POST requests. - protocol_handler (ProtocolHandler): The protocol handler for the current request. + protocol_handlers (dict[HTTPMethod, list[ProtocolHandler]]): Mapping of HTTP methods to their protocol handlers. + allow_methods (str): String specifying allowed HTTP methods. + accept_post (str): String specifying accepted content types for POST requests. + protocol_handler (ProtocolHandler): The protocol handler selected for the current request. """ @@ -172,13 +175,13 @@ def __init__( allow_methods: str, accept_post: str, ) -> None: - """Initialize a new handler instance. + """Initialize the handler with the specified procedure, protocol handlers, and HTTP method configurations. Args: - procedure (str): The name of the procedure. - protocol_handlers (dict[HTTPMethod, list[ProtocolHandler]]): A dictionary mapping HTTP methods to protocol handlers. - allow_methods (str): A string specifying allowed HTTP methods. - accept_post (str): A string specifying if POST method is accepted. + procedure (str): The name of the procedure to be handled. + protocol_handlers (dict[HTTPMethod, list[ProtocolHandler]]): A mapping of HTTP methods to their corresponding protocol handlers. + allow_methods (str): A string specifying which HTTP methods are allowed. + accept_post (str): A string specifying the accepted content types for POST requests. """ self.procedure = procedure @@ -187,31 +190,41 @@ def __init__( self.accept_post = accept_post async def implementation(self, conn: StreamingHandlerConn, timeout: float | None) -> None: - """Handle the implementation of the request processing. - - This method should be overridden by subclasses. + """Abstract method to be implemented by subclasses to handle streaming connections. Args: - conn: The connection handler - timeout: Optional timeout in milliseconds + conn (StreamingHandlerConn): The streaming connection handler instance. + timeout (float | None): Optional timeout value in seconds for the operation. Raises: - NotImplementedError: If not implemented by a subclass + NotImplementedError: If the method is not implemented by a subclass. """ - raise NotImplementedError("Implementation must be provided by subclass") + raise NotImplementedError() async def handle(self, request: Request) -> Response: - """Handle an incoming HTTP request and return an HTTP response. + """Handle an incoming HTTP request and returns an appropriate response. + + This method determines the correct protocol handler based on the HTTP method and content type, + validates the request (including checking for unsupported methods or media types), and manages + asynchronous processing of the request and response writing. Args: - request (Request): The incoming HTTP request. + request (Request): The incoming HTTP request to be handled. Returns: - Response: The HTTP response generated after processing the request. + Response: The HTTP response generated by the handler. Raises: - NotImplementedError: If the HTTP method or content type is not implemented. + Exception: Propagates any exception raised during request handling. + asyncio.CancelledError: If the handling task is cancelled. + + Behavior: + - Returns 405 Method Not Allowed if the HTTP method is not supported. + - Returns 415 Unsupported Media Type if no protocol handler can handle the request's content type. + - For GET requests, returns 415 if a request body is present. + - Handles the request asynchronously, ensuring proper cleanup of tasks. + - Returns a 500 Internal Server Error if no response is generated. """ response_headers = Headers(encoding="latin-1") @@ -246,13 +259,10 @@ async def handle(self, request: Request) -> Response: if content_length and int(content_length) > 0: has_body = True else: - try: - async for chunk in request.stream(): - if chunk: - has_body = True - break - except Exception: - pass + async for chunk in request.stream(): + if chunk: + has_body = True + break if has_body: status = HTTPStatus.UNSUPPORTED_MEDIA_TYPE @@ -260,18 +270,18 @@ async def handle(self, request: Request) -> Response: writer = ServerResponseWriter() - main_task = asyncio.create_task(self._handle(request, response_headers, response_trailers, writer)) + handle_task = asyncio.create_task(self._handle(request, response_headers, response_trailers, writer)) writer_task = asyncio.create_task(writer.receive()) response: Response | None = None try: done, _ = await asyncio.wait( - [main_task, writer_task], + [handle_task, writer_task], return_when=asyncio.FIRST_COMPLETED, ) - if main_task in done: - exc = main_task.exception() + if handle_task in done: + exc = handle_task.exception() if exc: raise exc @@ -282,11 +292,12 @@ async def handle(self, request: Request) -> Response: raise finally: - for t in (main_task, writer_task): + tasks = [handle_task, writer_task] + for t in tasks: if not t.done(): t.cancel() - await asyncio.gather(main_task, writer_task, return_exceptions=True) + await asyncio.gather(*tasks, return_exceptions=True) if not response: response = PlainTextResponse(content="Internal Server Error", status_code=500) @@ -296,28 +307,19 @@ async def handle(self, request: Request) -> Response: async def _handle( self, request: Request, response_headers: Headers, response_trailers: Headers, writer: ServerResponseWriter ) -> None: - if getattr(self, "stream_type", StreamType.Unary) == StreamType.Unary: - await self.unary_handle(request, response_headers, response_trailers, writer) - else: - await self.stream_handle(request, response_headers, response_trailers, writer) - - async def stream_handle( - self, request: Request, response_headers: Headers, response_trailers: Headers, writer: ServerResponseWriter - ) -> None: - """Handle streaming requests. + """Handle an incoming request by establishing a connection, parsing timeout values, and invoking the implementation logic. Args: request (Request): The incoming request object. - response_headers (Headers): The headers to be sent in the response. - response_trailers (Headers): The trailers to be sent in the response. - writer (ServerResponseWriter): The writer used to send the response. + response_headers (Headers): Headers to be sent in the response. + response_trailers (Headers): Trailers to be sent in the response. + writer (ServerResponseWriter): The writer used to send responses to the client. Returns: None Raises: - ValueError: If the implementation method is invalid. - ConnectError: If an internal error occurs during the handling of the stream. + Sends an appropriate ConnectError to the client if an exception occurs during processing, including timeout, unimplemented, or internal errors. """ conn = await self.protocol_handler.conn(request, response_headers, response_trailers, writer) @@ -334,82 +336,31 @@ async def stream_handle( await self.implementation(conn, None) except Exception as e: - error = e if isinstance(e, ConnectError) else ConnectError("internal error", Code.INTERNAL) - if isinstance(e, TimeoutError): error = ConnectError("the operation timed out", Code.DEADLINE_EXCEEDED) - if isinstance(e, NotImplementedError): + elif isinstance(e, NotImplementedError): error = ConnectError("not implemented", Code.UNIMPLEMENTED) - await conn.send_error(error) - - async def unary_handle( - self, request: Request, response_headers: Headers, response_trailers: Headers, writer: ServerResponseWriter - ) -> None: - """Handle a unary request. - - Args: - request (Request): The incoming request object. - response_headers (Headers): The headers for the response. - response_trailers (Headers): The trailers for the response. - writer (ServerResponseWriter): The writer to send the response. - - Raises: - ValueError: If the implementation method is invalid. - ConnectError: If there is an error parsing the timeout or an internal error occurs. - - Returns: - None - - """ - conn = await self.protocol_handler.conn(request, response_headers, response_trailers, writer) - if conn is None: - return - - try: - timeout = conn.parse_timeout() - if timeout: - timeout_ms = int(timeout * 1000) - with anyio.fail_after(delay=timeout): - await self.implementation(conn, timeout_ms) else: - await self.implementation(conn, None) - - except Exception as e: - error = e if isinstance(e, ConnectError) else ConnectError("internal error", Code.INTERNAL) - - if isinstance(e, TimeoutError): - error = ConnectError("the operation timed out", Code.DEADLINE_EXCEEDED) - - if isinstance(e, NotImplementedError): - error = ConnectError("not implemented", Code.UNIMPLEMENTED) + error = e if isinstance(e, ConnectError) else ConnectError("internal error", Code.INTERNAL) await conn.send_error(error) -type UnaryFunc[T_Request, T_Response] = Callable[[UnaryRequest[T_Request]], Awaitable[UnaryResponse[T_Response]]] -type StreamFunc[T_Request, T_Response] = Callable[[StreamRequest[T_Request]], Awaitable[StreamResponse[T_Response]]] - - class UnaryHandler[T_Request, T_Response](Handler): - """A handler for unary RPC (Remote Procedure Call) operations. + """UnaryHandler is a generic handler class for unary RPC procedures. - Attributes: - protocol_handlers (dict[HTTPMethod, list[ProtocolHandler]]): A dictionary mapping HTTP methods to lists of protocol handlers. - procedure (str): The name of the procedure being handled. - unary (UnaryFunc[Req, Res]): The unary function to be executed. - input (type[Req]): The type of the request input. - output (type[Res]): The type of the response output. - options (ConnectOptions | None): Optional configuration options for the handler. + Type Parameters: + T_Request: The type of the request message. + T_Response: The type of the response message. """ - procedure: str - protocol_handlers: dict[HTTPMethod, list[ProtocolHandler]] - allow_methods: str - accept_post: str stream_type: StreamType = StreamType.Unary + input: type[T_Request] + output: type[T_Response] + call: UnaryFunc[T_Request, T_Response] def __init__( self, @@ -418,23 +369,34 @@ def __init__( input: type[T_Request], output: type[T_Response], options: ConnectOptions | None = None, - ): - """Initialize the unary handler.""" + ) -> None: + """Initialize a handler for a unary RPC procedure. + + Args: + procedure (str): The name of the RPC procedure. + unary (UnaryFunc[T_Request, T_Response]): The asynchronous function implementing the unary RPC logic. + input (type[T_Request]): The expected input type for the request. + output (type[T_Response]): The expected output type for the response. + options (ConnectOptions | None, optional): Optional configuration for the handler, such as interceptors. Defaults to None. + + Calls the superclass initializer with the configured protocol handlers and method options. + + """ options = options if options is not None else ConnectOptions() config = HandlerConfig(procedure=procedure, stream_type=StreamType.Unary, options=options) protocol_handlers = create_protocol_handlers(config) - async def _untyped(request: UnaryRequest[T_Request]) -> UnaryResponse[T_Response]: + async def _call(request: UnaryRequest[T_Request]) -> UnaryResponse[T_Response]: response = await unary(request) return response - untyped = apply_interceptors(_untyped, options.interceptors) + call = apply_interceptors(_call, options.interceptors) self.input = input self.output = output - self.untyped = untyped + self.call = call super().__init__( procedure=procedure, @@ -444,51 +406,51 @@ async def _untyped(request: UnaryRequest[T_Request]) -> UnaryResponse[T_Response ) async def implementation(self, conn: StreamingHandlerConn, timeout: float | None) -> None: - """Handle a unary request and send a unary response. + """Handle the implementation of a streaming handler connection. + + This asynchronous method receives a unary request from the given connection, + optionally sets a timeout on the request, invokes the handler's call method, + and sends the response message back through the connection. It also updates + the connection's response headers and trailers, excluding protocol-specific headers. Args: - conn: The connection handler - timeout: Optional timeout in milliseconds + conn (StreamingHandlerConn): The streaming handler connection to process. + timeout (float | None): Optional timeout value to set on the request. - Raises: - ConnectError: If the response message type is incorrect + Returns: + None """ request = await receive_unary_request(conn, self.input) if timeout: request.timeout = timeout - response = await self.untyped(request) - - if not isinstance(response.message, self.output): - raise ConnectError( - f"expected response of type: {self.output.__name__}", - Code.INTERNAL, - ) + response = await self.call(request) conn.response_headers.update(exclude_protocol_headers(response.headers)) conn.response_trailers.update(exclude_protocol_headers(response.trailers)) + await conn.send(aiterate([response.message])) class ServerStreamHandler[T_Request, T_Response](Handler): - """A handler for server-side streaming RPCs. + """ServerStreamHandler is a handler class for server-streaming RPC procedures. - This handler manages the server-side streaming procedure, including receiving - requests, processing them, and sending responses. + This generic class manages the lifecycle and protocol handling for server-streaming RPCs, + where a single request from the client results in a stream of responses from the server. + It sets up protocol handlers, applies interceptors, and provides an asynchronous implementation + method to process incoming streaming requests and send responses. - Args: - procedure (str): The name of the procedure being handled. - stream (StreamFunc[T_Request, T_Response]): The streaming function that processes - the request and generates the response. - input (type[T_Request]): The type of the request message. - output (type[T_Response]): The type of the response message. - options (ConnectOptions | None, optional): Configuration options for the handler. - Defaults to None. + Type Parameters: + T_Request: The type of the request message. + T_Response: The type of the response message. """ stream_type: StreamType = StreamType.ServerStream + input: type[T_Request] + output: type[T_Response] + call: StreamFunc[T_Request, T_Response] def __init__( self, @@ -498,29 +460,32 @@ def __init__( output: type[T_Response], options: ConnectOptions | None = None, ) -> None: - """Initialize a new handler instance. + """Initialize a server-streaming handler for a given procedure. Args: - procedure (str): The name of the procedure to handle. - stream (StreamFunc[T_Request, T_Response]): The stream function to handle requests and responses. - input (type[T_Request]): The type of the request message. - output (type[T_Response]): The type of the response message. - options (ConnectOptions | None, optional): Additional options for the handler. Defaults to None. + procedure (str): The name of the RPC procedure. + stream (StreamFunc[T_Request, T_Response]): The asynchronous stream function handling the server-streaming logic. + input (type[T_Request]): The expected request message type. + output (type[T_Response]): The expected response message type. + options (ConnectOptions | None, optional): Additional configuration options for the handler. Defaults to None. + + Raises: + Any exceptions raised by the parent class initializer. """ options = options if options is not None else ConnectOptions() config = HandlerConfig(procedure=procedure, stream_type=StreamType.ServerStream, options=options) protocol_handlers = create_protocol_handlers(config) - async def _untyped(request: StreamRequest[T_Request]) -> StreamResponse[T_Response]: + async def _call(request: StreamRequest[T_Request]) -> StreamResponse[T_Response]: response = await stream(request) return response - untyped = apply_interceptors(_untyped, options.interceptors) + call = apply_interceptors(_call, options.interceptors) self.input = input self.output = output - self.untyped = untyped + self.call = call super().__init__( procedure=procedure, @@ -530,18 +495,28 @@ async def _untyped(request: StreamRequest[T_Request]) -> StreamResponse[T_Respon ) async def implementation(self, conn: StreamingHandlerConn, timeout: float | None) -> None: - """Handle a server stream request and response. + """Handle the implementation of a streaming handler. + + This asynchronous method receives a stream request, optionally sets a timeout, + invokes the handler's call method, updates the connection's response headers and trailers, + and sends the response messages through the connection. Args: - conn: The connection handler - timeout: Optional timeout in milliseconds + conn (StreamingHandlerConn): The streaming connection handler. + timeout (float | None): Optional timeout value for the request in seconds. + + Returns: + None + + Raises: + Any exceptions raised by `receive_stream_request`, `self.call`, or `conn.send` will propagate. """ request = await receive_stream_request(conn, self.input) if timeout: request.timeout = timeout - response = await self.untyped(request) + response = await self.call(request) conn.response_headers.update(response.headers) conn.response_trailers.update(response.trailers) @@ -550,28 +525,25 @@ async def implementation(self, conn: StreamingHandlerConn, timeout: float | None class ClientStreamHandler[T_Request, T_Response](Handler): - """A handler for client-side streaming RPCs. + """ClientStreamHandler is a handler class for client-streaming RPC procedures. - Args: - procedure (str): The name of the RPC procedure. - stream (StreamFunc[T_Request, T_Response]): The function that handles the streaming request. - input (type[T_Request]): The type of the request message. - output (type[T_Response]): The type of the response message. - options (ConnectOptions | None, optional): Configuration options for the handler. Defaults to None. + This generic class manages the lifecycle of a client-streaming RPC, including request handling, + stream invocation, interceptor application, and response transmission. It is parameterized by + the request and response message types. - Methods: - implementation(conn: StreamingHandlerConn): Handles the streaming connection, receiving requests and sending responses. + Type Parameters: + T_Request: The type of the input message for the stream. + T_Response: The type of the output message for the stream. - Attributes: - procedure (str): The name of the RPC procedure. - implementation (Callable): The function that implements the streaming logic. - protocol_handlers (Dict): Handlers for different protocols. - allow_methods (List[str]): Allowed HTTP methods. - accept_post (List[str]): Accepted POST values. + stream_type (StreamType): The type of stream handled (ClientStream). + call (StreamFunc[T_Request, T_Response]): The wrapped stream call function with applied interceptors. """ stream_type: StreamType = StreamType.ClientStream + input: type[T_Request] + output: type[T_Response] + call: StreamFunc[T_Request, T_Response] def __init__( self, @@ -581,32 +553,32 @@ def __init__( output: type[T_Response], options: ConnectOptions | None = None, ) -> None: - """Initialize a new instance of the handler. + """Initialize a handler for a client-streaming RPC procedure. Args: - procedure (str): The name of the procedure to handle. - stream (StreamFunc[T_Request, T_Response]): The stream function to handle requests and responses. - input (type[T_Request]): The type of the request message. - output (type[T_Response]): The type of the response message. - options (ConnectOptions | None, optional): Additional options for the handler. Defaults to None. + procedure (str): The name of the RPC procedure. + stream (StreamFunc[T_Request, T_Response]): The asynchronous stream function handling the client-streaming logic. + input (type[T_Request]): The expected input message type. + output (type[T_Response]): The expected output message type. + options (ConnectOptions | None, optional): Additional configuration options for the handler. Defaults to None. - Returns: - None + Raises: + Any exceptions raised by the parent class initializer or protocol handler creation. """ options = options if options is not None else ConnectOptions() config = HandlerConfig(procedure=procedure, stream_type=StreamType.ClientStream, options=options) protocol_handlers = create_protocol_handlers(config) - async def _untyped(request: StreamRequest[T_Request]) -> StreamResponse[T_Response]: + async def _call(request: StreamRequest[T_Request]) -> StreamResponse[T_Response]: response = await stream(request) return response - untyped = apply_interceptors(_untyped, options.interceptors) + call = apply_interceptors(_call, options.interceptors) self.input = input self.output = output - self.untyped = untyped + self.call = call super().__init__( procedure=procedure, @@ -616,18 +588,28 @@ async def _untyped(request: StreamRequest[T_Request]) -> StreamResponse[T_Respon ) async def implementation(self, conn: StreamingHandlerConn, timeout: float | None) -> None: - """Handle a client stream request and response. + """Handle the implementation of a streaming handler. + + This asynchronous method receives a streaming request, optionally sets a timeout, + calls the handler logic, updates the connection's response headers and trailers, + and sends the response messages back through the connection. Args: - conn: The connection handler - timeout: Optional timeout in milliseconds + conn (StreamingHandlerConn): The streaming connection handler. + timeout (float | None): Optional timeout value for the request in seconds. + + Returns: + None + + Raises: + Any exceptions raised by `receive_stream_request` or `self.call` will propagate. """ request = await receive_stream_request(conn, self.input) if timeout: request.timeout = timeout - response = await self.untyped(request) + response = await self.call(request) conn.response_headers.update(response.headers) conn.response_trailers.update(response.trailers) @@ -636,42 +618,26 @@ async def implementation(self, conn: StreamingHandlerConn, timeout: float | None class BidiStreamHandler[T_Request, T_Response](Handler): - """A handler for bidirectional streaming RPCs in a Connect-based framework. + """BidiStreamHandler is a handler class for bidirectional streaming procedures. - This class facilitates the implementation of bidirectional streaming - handlers by managing the lifecycle of the stream, applying interceptors, - and handling protocol-specific details. + This generic class manages the lifecycle of a bidirectional streaming RPC, including request/response type validation, + application of interceptors, and integration with protocol-specific handlers. It wraps the provided stream function, + applies any configured interceptors, and exposes an asynchronous implementation method to process streaming requests + and send responses. Type Parameters: - T_Request: The type of the request messages in the stream. - T_Response: The type of the response messages in the stream. - - Args: - procedure (str): The name of the RPC procedure being handled. - stream (StreamFunc[T_Request, T_Response]): The user-defined function - that processes the bidirectional stream of requests and responses. - input (type[T_Request]): The type of the request messages. - output (type[T_Response]): The type of the response messages. - options (ConnectOptions | None, optional): Configuration options for - the handler. Defaults to `None`. + T_Request: The type of the request messages. + T_Response: The type of the response messages. - Attributes: - procedure (str): The name of the RPC procedure. - implementation (Callable): The internal implementation of the handler - that manages the streaming connection. - protocol_handlers (dict): A mapping of protocol-specific handlers. - allow_methods (list): A sorted list of allowed HTTP methods for the - handler. - accept_post (list): A sorted list of accepted POST methods for the - handler. - - Methods: - __init__: Initializes the handler with the provided procedure, stream - function, input/output types, and options. + stream_type (StreamType): The type of stream handled (always StreamType.BiDiStream). + call (StreamFunc[T_Request, T_Response]): The wrapped stream call function with applied interceptors. """ stream_type: StreamType = StreamType.BiDiStream + input: type[T_Request] + output: type[T_Response] + call: StreamFunc[T_Request, T_Response] def __init__( self, @@ -681,37 +647,32 @@ def __init__( output: type[T_Response], options: ConnectOptions | None = None, ) -> None: - """Initialize a bidirectional streaming handler. + """Initialize a handler for a bidirectional streaming procedure. Args: - procedure (str): The name of the procedure being handled. - stream (StreamFunc[T_Request, T_Response]): The function to handle the bidirectional stream. - input (type[T_Request]): The type of the request messages. - output (type[T_Response]): The type of the response messages. + procedure (str): The name of the procedure to handle. + stream (StreamFunc[T_Request, T_Response]): The asynchronous stream function handling requests and responses. + input (type[T_Request]): The expected input type for requests. + output (type[T_Response]): The expected output type for responses. options (ConnectOptions | None, optional): Configuration options for the handler. Defaults to None. Raises: - Any exceptions raised during the initialization of protocol handlers or interceptors. - - Notes: - - This handler is designed for bidirectional streaming communication. - - Interceptors, if provided, are applied to the untyped stream function. - - The implementation processes incoming requests, applies the stream function, and sends responses. + Any exceptions raised by the parent class initializer. """ options = options if options is not None else ConnectOptions() config = HandlerConfig(procedure=procedure, stream_type=StreamType.BiDiStream, options=options) protocol_handlers = create_protocol_handlers(config) - async def _untyped(request: StreamRequest[T_Request]) -> StreamResponse[T_Response]: + async def _call(request: StreamRequest[T_Request]) -> StreamResponse[T_Response]: response = await stream(request) return response - untyped = apply_interceptors(_untyped, options.interceptors) + call = apply_interceptors(_call, options.interceptors) self.input = input self.output = output - self.untyped = untyped + self.call = call super().__init__( procedure=procedure, @@ -721,18 +682,28 @@ async def _untyped(request: StreamRequest[T_Request]) -> StreamResponse[T_Respon ) async def implementation(self, conn: StreamingHandlerConn, timeout: float | None) -> None: - """Handle a bidirectional stream request and response. + """Handle the implementation of a streaming handler. + + This asynchronous method receives a streaming request, optionally sets a timeout, + calls the main processing function, updates the connection's response headers and trailers, + and sends the response messages back through the connection. Args: - conn: The connection handler - timeout: Optional timeout in milliseconds + conn (StreamingHandlerConn): The streaming connection handler. + timeout (float | None): Optional timeout value for the request in seconds. + + Returns: + None + + Raises: + Any exceptions raised by `receive_stream_request` or `self.call` will propagate. """ request = await receive_stream_request(conn, self.input) if timeout: request.timeout = timeout - response = await self.untyped(request) + response = await self.call(request) conn.response_headers.update(response.headers) conn.response_trailers.update(response.trailers) diff --git a/src/connect/writer.py b/src/connect/writer.py index 9ee8f7a..e942145 100644 --- a/src/connect/writer.py +++ b/src/connect/writer.py @@ -6,63 +6,54 @@ class ServerResponseWriter: - """A class to handle writing and receiving server responses asynchronously. + """A writer class for handling server responses asynchronously using an asyncio.Queue. Attributes: - _future (asyncio.Future[Response]): A future object to hold the server response. + queue (asyncio.Queue[Response]): The queue used to store a single response. + is_closed (bool): Indicates whether the writer has been closed. """ - _future: asyncio.Future[Response] + queue: asyncio.Queue[Response] + is_closed: bool = False - def __init__(self, loop: asyncio.AbstractEventLoop | None = None) -> None: - """Initialize the writer instance. - - Args: - loop (asyncio.AbstractEventLoop | None, optional): The event loop to use. If None, the default event loop is used. - - Returns: - None - - """ - loop = loop or asyncio.get_event_loop() - self._future = loop.create_future() + def __init__(self) -> None: + """Initialize the instance with an asyncio queue of maximum size 1.""" + self.queue = asyncio.Queue(maxsize=1) async def write(self, response: Response) -> None: - """Asynchronously writes the given response to the future result if it is not already done. + """Asynchronously writes a response to the internal queue. Args: response (Response): The response object to be written. - Returns: - None + Raises: + RuntimeError: If the response writer is already closed. """ - if self._future.cancelled(): - raise RuntimeError("Cannot write response; the future has already been cancelled.") - - if self._future.done(): - raise RuntimeError("Cannot write response; the future is already done.") + if self.is_closed: + raise RuntimeError("Cannot write to a closed response writer.") - self._future.set_result(response) + await self.queue.put(response) async def receive(self) -> Response: - """Asynchronously receives a response. + """Asynchronously retrieves a response from the internal queue. - This method awaits the completion of a future and returns the response. + Raises: + RuntimeError: If the response writer is already closed. Returns: - Response: The response object awaited from the future. + Response: The next response item from the queue. - """ - return await self._future + Side Effects: + Marks the response writer as closed after receiving a response. - async def cancel(self) -> None: - """Cancel the future if it is not already done. + """ + if self.is_closed: + raise RuntimeError("Cannot receive from a closed response writer.") - Returns: - None + response = await self.queue.get() + self.queue.task_done() - """ - if not self._future.done(): - self._future.cancel() + self.is_closed = True + return response