diff --git a/examples/client.py b/examples/client.py index f1c5ca9..0a96219 100644 --- a/examples/client.py +++ b/examples/client.py @@ -4,7 +4,7 @@ import asyncio from collections.abc import AsyncGenerator -from connect.connect import StreamRequest, UnaryRequest, ensure_single +from connect.connect import StreamRequest, UnaryRequest from connect.connection_pool import AsyncConnectionPool from proto.connectrpc.eliza.v1.eliza_pb2 import IntroduceRequest, ReflectRequest, SayRequest @@ -21,11 +21,7 @@ async def run_unary(client: ElizaServiceClient) -> None: async def run_server_streaming(client: ElizaServiceClient) -> None: """Run server streaming RPC (Introduce).""" - - async def request_generator() -> AsyncGenerator[IntroduceRequest]: - yield IntroduceRequest(name="Alice") - - request = StreamRequest(request_generator()) + request = StreamRequest(IntroduceRequest(name="Alice")) message_count = 1 async with client.Introduce(request) as response: @@ -43,7 +39,7 @@ async def request_generator() -> AsyncGenerator[ReflectRequest]: request = StreamRequest(request_generator()) async with client.Reflect(request) as response: - message = await ensure_single(response.messages) + message = await response.single() print(f"Final response: {message.sentence}") diff --git a/examples/server.py b/examples/server.py index 5812d0c..3bba0bc 100644 --- a/examples/server.py +++ b/examples/server.py @@ -5,7 +5,7 @@ import hypercorn import hypercorn.asyncio -from connect.connect import StreamRequest, StreamResponse, UnaryRequest, UnaryResponse, ensure_single +from connect.connect import StreamRequest, StreamResponse, UnaryRequest, UnaryResponse from connect.handler_context import HandlerContext from connect.middleware import ConnectMiddleware from starlette.applications import Starlette @@ -36,7 +36,7 @@ async def Introduce( self, request: StreamRequest[IntroduceRequest], _context: HandlerContext ) -> StreamResponse[IntroduceResponse]: """Introduce the Eliza service.""" - message = await ensure_single(request.messages) + message = await request.single() name = message.name intros = eliza.get_intro_responses(name) diff --git a/src/connect/client.py b/src/connect/client.py index 19145ed..b11c2b2 100644 --- a/src/connect/client.py +++ b/src/connect/client.py @@ -22,8 +22,8 @@ StreamType, UnaryRequest, UnaryResponse, - recieve_stream_response, - recieve_unary_response, + receive_stream_response, + receive_unary_response, ) from connect.connection_pool import AsyncConnectionPool from connect.error import ConnectError @@ -244,7 +244,7 @@ def on_request_send(r: httpcore.Request) -> None: await conn.send(aiterate([request.message]), call_options.timeout, abort_event=call_options.abort_event) - response = await recieve_unary_response(conn=conn, t=output, abort_event=call_options.abort_event) + response = await receive_unary_response(conn=conn, t=output, abort_event=call_options.abort_event) return response unary_func = apply_interceptors(_unary_func, options.interceptors) @@ -290,7 +290,7 @@ def on_request_send(r: httpcore.Request) -> None: await conn.send(request.messages, call_options.timeout, call_options.abort_event) - response = await recieve_stream_response(conn, output, request.spec, call_options.abort_event) + response = await receive_stream_response(conn, output, request.spec, call_options.abort_event) return response stream_func = apply_interceptors(_stream_func, options.interceptors) diff --git a/src/connect/connect.py b/src/connect/connect.py index 3232a06..bfb35b2 100644 --- a/src/connect/connect.py +++ b/src/connect/connect.py @@ -51,14 +51,22 @@ class Peer(BaseModel): class RequestCommon: - """RequestCommon is a class that encapsulates common attributes and methods for handling HTTP requests. + """A common base class for handling request-related functionality. - Attributes: - _spec (Spec): The specification for the request. - _peer (Peer): The peer information. - _headers (Headers): The request headers. - _method (str): The HTTP method used for the request. + This class encapsulates the common properties and behaviors shared across + different types of requests, including specification details, peer information, + headers, and HTTP method configuration. + Attributes: + _spec (Spec): The specification for the request containing procedure details, + descriptor, stream type, and idempotency level. + _peer (Peer): The peer information including address, protocol, and query parameters. + _headers (Headers): The request headers as a collection of key-value pairs. + _method (str): The HTTP method used for the request (defaults to POST). + + The class provides property accessors for all attributes with appropriate getters + and setters where modification is allowed. Default values are provided for all + parameters during initialization to ensure the object is always in a valid state. """ _spec: Spec @@ -73,17 +81,19 @@ def __init__( headers: Headers | None = None, method: str | None = None, ) -> None: - """Initialize a new Request instance. + """Initialize a Connect request/response context. Args: - spec (Spec): The specification for the request. - peer (Peer): The peer information. - headers (Mapping[str, str]): The request headers. - method (str): The HTTP method used for the request. + spec: The RPC specification containing procedure name, descriptor, stream type, + and idempotency level. If None, creates a default Spec with empty procedure, + no descriptor, unary stream type, and idempotent level. + peer: The peer information including address, protocol, and query parameters. + If None, creates a default Peer with no address, empty protocol, and empty query. + headers: HTTP headers for the request/response. If None, creates an empty Headers object. + method: HTTP method to use for the request. If None, defaults to POST. Returns: None - """ self._spec = ( spec @@ -138,18 +148,12 @@ def method(self, value: str) -> None: class StreamRequest[T](RequestCommon): """StreamRequest class represents a request that can handle streaming messages. - Attributes: - messages (AsyncIterable[T]): An asynchronous iterable of messages. - _spec (Spec): The specification for the request. - _peer (Peer): The peer information. - _headers (Headers): The request headers. - _method (str): The HTTP method used for the request. - + This class provides a unified interface for handling both single and multiple + messages in streaming requests. It automatically determines the appropriate + method based on the stream type and usage context. """ _messages: AsyncIterable[T] - # timeout: float | None - # abort_event: asyncio.Event | None = None def __init__( self, @@ -158,50 +162,53 @@ def __init__( peer: Peer | None = None, headers: Headers | None = None, method: str | None = None, - # timeout: float | None = None, - # abort_event: asyncio.Event | None = None, ) -> None: - """Initialize a new Request instance. + """Initialize a new instance. Args: - content (AsyncIterable[T] | T): The request content, which can be an async iterable or a single message. - spec (Spec): The specification for the request. - peer (Peer): The peer information. - headers (Mapping[str, str]): The request headers. - method (str): The HTTP method used for the request. - timeout (float): The timeout for the request. - abort_event (asyncio.Event): An event to signal request abortion. + content: The content to be processed, either a single item of type T or an async iterable of items. + spec: Optional specification object defining the behavior or configuration. + peer: Optional peer object representing the connection endpoint. + headers: Optional headers dictionary for metadata or configuration. + method: Optional string specifying the method or operation type. Returns: None - """ super().__init__(spec, peer, headers, method) self._messages = content if isinstance(content, AsyncIterable) else aiterate([content]) - # self.timeout = timeout - # self.abort_event = abort_event @property def messages(self) -> AsyncIterable[T]: - """Return the request message.""" + """Return the request messages as an async iterable. + + Use this when you expect multiple messages (client streaming, bidi streaming). + + Example: + async for message in request.messages: + process(message) + """ return self._messages + async def single(self) -> T: + """Return a single message from the request. -class UnaryRequest[T](RequestCommon): - """UnaryRequest is a class that encapsulates a request with a message, specification, peer, headers, and method. + Use this when you expect exactly one message (server-side handlers for client streaming). + Raises ConnectError if there are zero or multiple messages. - Attributes: - message (Req): The request message. - _spec (Spec): The specification of the request. - _peer (Peer): The peer associated with the request. - _headers (Mapping[str, str]): The headers of the request. - _method (str): The method of the request. + Example: + message = await request.single() + process(message) + """ + return await ensure_single(self._messages) - """ - _message: T - # timeout: float | None - # abort_event: asyncio.Event | None = None +class UnaryRequest[T](RequestCommon): + """A unary request wrapper that extends RequestCommon functionality. + + This class encapsulates a single message/content of type T along with common request + metadata such as specifications, peer information, headers, and HTTP method. + """ def __init__( self, @@ -210,28 +217,21 @@ def __init__( peer: Peer | None = None, headers: Headers | None = None, method: str | None = None, - # timeout: float | None = None, - # abort_event: asyncio.Event | None = None, ) -> None: - """Initialize a new Request instance. + """Initialize a new instance with content and optional parameters. Args: - content (T): The request message. - spec (Spec): The specification for the request. - peer (Peer): The peer information. - headers (Mapping[str, str]): The request headers. - method (str): The HTTP method used for the request. - timeout (float): The timeout for the request. - abort_event (asyncio.Event): An event to signal request abortion. + content (T): The main content/message to be stored in this instance. + spec (Spec | None, optional): Specification object defining behavior or configuration. Defaults to None. + peer (Peer | None, optional): Peer object representing the remote endpoint or connection. Defaults to None. + headers (Headers | None, optional): HTTP headers or metadata associated with the request/response. Defaults to None. + method (str | None, optional): HTTP method or operation type (e.g., 'GET', 'POST'). Defaults to None. Returns: None - """ super().__init__(spec, peer, headers, method) self._message = content - # self.timeout = timeout - # self.abort_event = abort_event @property def message(self) -> T: @@ -293,7 +293,12 @@ def message(self) -> T: class StreamResponse[T](ResponseCommon): - """Response class for handling responses.""" + """Response class for handling streaming responses. + + This class provides a unified interface for handling both single and multiple + messages from streaming responses. It automatically determines the appropriate + method based on the stream type and usage context. + """ _messages: AsyncIterable[T] @@ -303,15 +308,40 @@ def __init__( headers: Headers | None = None, trailers: Headers | None = None, ) -> None: - """Initialize the response with a message.""" + """Initialize the response with content. + + Args: + content: Either a single message or an async iterable of messages + headers: Optional response headers + trailers: Optional response trailers + """ super().__init__(headers, trailers) self._messages = content if isinstance(content, AsyncIterable) else aiterate([content]) @property def messages(self) -> AsyncIterable[T]: - """Return the response message.""" + """Return the response messages as an async iterable. + + Use this when you expect multiple messages (server streaming, bidi streaming). + + Example: + async for message in response.messages: + print(message) + """ return self._messages + async def single(self) -> T: + """Return a single message from the response. + + Use this when you expect exactly one message (client streaming results). + Raises ConnectError if there are zero or multiple messages. + + Example: + message = await response.single() + print(message) + """ + return await ensure_single(self._messages) + async def aclose(self) -> None: """Asynchronously close the response stream.""" aclose = get_acallable_attribute(self._messages, "aclose") @@ -475,8 +505,8 @@ async def send_error(self, error: ConnectError) -> None: raise NotImplementedError() -class UnaryClientConn: - """Abstract base class for a streaming client connection.""" +class UnaryClientConn(abc.ABC): + """Abstract base class for a unary client connection.""" @property @abc.abstractmethod @@ -529,7 +559,7 @@ async def aclose(self) -> None: raise NotImplementedError() -class StreamingClientConn: +class StreamingClientConn(abc.ABC): """Abstract base class for a streaming client connection.""" @property @@ -645,7 +675,7 @@ async def receive_stream_request[T](conn: StreamingHandlerConn, t: type[T]) -> S ) -async def recieve_unary_response[T]( +async def receive_unary_response[T]( conn: StreamingClientConn, t: type[T], abort_event: asyncio.Event | None ) -> UnaryResponse[T]: """Receives a unary response message from a streaming client connection. @@ -672,7 +702,7 @@ async def recieve_unary_response[T]( return UnaryResponse(message, conn.response_headers, conn.response_trailers) -async def recieve_stream_response[T]( +async def receive_stream_response[T]( conn: StreamingClientConn, t: type[T], spec: Spec, abort_event: asyncio.Event | None ) -> StreamResponse[T]: """Handle receiving a stream response from a streaming client connection. @@ -697,10 +727,10 @@ async def recieve_stream_response[T]( """ if spec.stream_type == StreamType.ClientStream: - single_message = await ensure_single(conn.receive(t, abort_event), conn.aclose) + single_message = await ensure_single(conn.receive(t, abort_event)) return StreamResponse( - AsyncDataStream[T](aiterate([single_message])), conn.response_headers, conn.response_trailers + AsyncDataStream[T](aiterate([single_message]), conn.aclose), conn.response_headers, conn.response_trailers ) else: return StreamResponse(