Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions conformance/client_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ async def delayed_abort() -> None:

resp = await getattr(client, msg.method)(
UnaryRequest(
message=req,
content=req,
headers=headers,
timeout=msg.timeout_ms / 1000,
abort_event=abort_event,
Expand Down Expand Up @@ -286,7 +286,7 @@ async def delayed_abort() -> None:

async with getattr(client, msg.method)(
StreamRequest(
messages=_reqs(), headers=headers, timeout=msg.timeout_ms / 1000, abort_event=abort_event
content=_reqs(), headers=headers, timeout=msg.timeout_ms / 1000, abort_event=abort_event
),
) as resp:
async for message in resp.messages:
Expand All @@ -310,7 +310,7 @@ async def delayed_abort() -> None:

async with getattr(client, msg.method)(
StreamRequest(
messages=reqs, headers=headers, timeout=msg.timeout_ms / 1000, abort_event=abort_event
content=reqs, headers=headers, timeout=msg.timeout_ms / 1000, abort_event=abort_event
),
) as resp:
if msg.cancel.HasField("after_close_send_ms"):
Expand Down Expand Up @@ -352,7 +352,7 @@ async def _reqs() -> AsyncGenerator[service_pb2.ClientStreamRequest]:

async with getattr(client, msg.method)(
StreamRequest(
messages=_reqs(), headers=headers, timeout=msg.timeout_ms / 1000, abort_event=abort_event
content=_reqs(), headers=headers, timeout=msg.timeout_ms / 1000, abort_event=abort_event
),
) as resp:
if msg.cancel.HasField("before_close_send"):
Expand Down
10 changes: 5 additions & 5 deletions conformance/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ async def Unary(self, request: UnaryRequest[service_pb2.UnaryRequest]) -> UnaryR
except Exception:
raise

return UnaryResponse(message=service_pb2.UnaryResponse(payload=payload), headers=headers, trailers=trailers)
return UnaryResponse(content=service_pb2.UnaryResponse(payload=payload), headers=headers, trailers=trailers)

async def IdempotentUnary(
self, request: UnaryRequest[service_pb2.IdempotentUnaryRequest]
Expand Down Expand Up @@ -262,7 +262,7 @@ async def IdempotentUnary(
raise

return UnaryResponse(
message=service_pb2.IdempotentUnaryResponse(payload=payload), headers=headers, trailers=trailers
content=service_pb2.IdempotentUnaryResponse(payload=payload), headers=headers, trailers=trailers
)

async def ClientStream(
Expand Down Expand Up @@ -355,7 +355,7 @@ async def ClientStream(
raise

return StreamResponse(
messages=service_pb2.ClientStreamResponse(payload=payload),
content=service_pb2.ClientStreamResponse(payload=payload),
headers=headers,
trailers=trailers,
)
Expand Down Expand Up @@ -465,7 +465,7 @@ async def iterator() -> typing.AsyncIterator[service_pb2.ServerStreamResponse]:
raise

return StreamResponse(
messages=iterator(),
content=iterator(),
headers=headers,
trailers=trailers,
)
Expand Down Expand Up @@ -578,7 +578,7 @@ async def iterator() -> typing.AsyncIterator[service_pb2.BidiStreamResponse]:
raise

return StreamResponse(
messages=iterator(),
content=iterator(),
headers=headers,
trailers=trailers,
)
Expand Down
26 changes: 13 additions & 13 deletions src/connect/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class StreamRequest[T](RequestCommon):

def __init__(
self,
messages: AsyncIterable[T] | T,
content: AsyncIterable[T] | T,
spec: Spec | None = None,
peer: Peer | None = None,
headers: Headers | None = None,
Expand All @@ -163,7 +163,7 @@ def __init__(
"""Initialize a new Request instance.

Args:
messages (AsyncIterable[T] | T): The request messages.
content (AsyncIterable[T] | T): The request content, which can be an async iterable or a single message.
spec (Spec): The specification for the request.
peer (Peer): The peer information.
headers (Mapping[str, str]): The request headers.
Expand All @@ -176,7 +176,7 @@ def __init__(

"""
super().__init__(spec, peer, headers, method)
self._messages = messages if isinstance(messages, AsyncIterable) else aiterate([messages])
self._messages = content if isinstance(content, AsyncIterable) else aiterate([content])
self.timeout = timeout
self.abort_event = abort_event

Expand Down Expand Up @@ -204,7 +204,7 @@ class UnaryRequest[T](RequestCommon):

def __init__(
self,
message: T,
content: T,
spec: Spec | None = None,
peer: Peer | None = None,
headers: Headers | None = None,
Expand All @@ -215,7 +215,7 @@ def __init__(
"""Initialize a new Request instance.

Args:
message (Req): The request message.
content (T): The request message.
spec (Spec): The specification for the request.
peer (Peer): The peer information.
headers (Mapping[str, str]): The request headers.
Expand All @@ -228,7 +228,7 @@ def __init__(

"""
super().__init__(spec, peer, headers, method)
self._message = message
self._message = content
self.timeout = timeout
self.abort_event = abort_event

Expand Down Expand Up @@ -277,13 +277,13 @@ class UnaryResponse[T](ResponseCommon):

def __init__(
self,
message: T,
content: T,
headers: Headers | None = None,
trailers: Headers | None = None,
) -> None:
"""Initialize the response with a message."""
super().__init__(headers, trailers)
self._message = message
self._message = content

@property
def message(self) -> T:
Expand All @@ -298,13 +298,13 @@ class StreamResponse[T](ResponseCommon):

def __init__(
self,
messages: AsyncIterable[T] | T,
content: AsyncIterable[T] | T,
headers: Headers | None = None,
trailers: Headers | None = None,
) -> None:
"""Initialize the response with a message."""
super().__init__(headers, trailers)
self._messages = messages if isinstance(messages, AsyncIterable) else aiterate([messages])
self._messages = content if isinstance(content, AsyncIterable) else aiterate([content])

@property
def messages(self) -> AsyncIterable[T]:
Expand Down Expand Up @@ -604,7 +604,7 @@ async def receive_unary_request[T](conn: StreamingHandlerConn, t: type[T]) -> Un
method = cast(HTTPMethod, get_http_method())

return UnaryRequest(
message=message,
content=message,
spec=conn.spec,
peer=conn.peer,
headers=conn.request_headers,
Expand All @@ -628,15 +628,15 @@ async def receive_stream_request[T](conn: StreamingHandlerConn, t: type[T]) -> S
message = await ensure_single(conn.receive(t))

return StreamRequest(
messages=aiterate([message]),
content=aiterate([message]),
spec=conn.spec,
peer=conn.peer,
headers=conn.request_headers,
method=HTTPMethod.POST.value,
)
else:
return StreamRequest(
messages=conn.receive(t),
content=conn.receive(t),
spec=conn.spec,
peer=conn.peer,
headers=conn.request_headers,
Expand Down
1 change: 0 additions & 1 deletion src/connect/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,6 @@ async def handle(self, request: Request) -> Response:

writer = ServerResponseWriter()

# Create tasks for handling the request and receiving responses
main_task = asyncio.create_task(self._handle(request, response_headers, response_trailers, writer))
writer_task = asyncio.create_task(writer.receive())

Expand Down
22 changes: 11 additions & 11 deletions tests/test_streaming_connect_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async def test_server_streaming(hypercorn_server: ServerConfig) -> None:

async with AsyncClientSession() as session:
client = Client(session=session, url=url, input=PingRequest, output=PingResponse)
ping_request = StreamRequest(messages=PingRequest(name="Bob"))
ping_request = StreamRequest(content=PingRequest(name="Bob"))

async with client.call_server_stream(ping_request) as response:
want = ["Hi Bob.", "I'm Eliza."]
Expand Down Expand Up @@ -118,7 +118,7 @@ async def test_server_streaming_end_stream_error(hypercorn_server: ServerConfig)

async with AsyncClientSession() as session:
client = Client(session=session, url=url, input=PingRequest, output=PingResponse)
ping_request = StreamRequest(messages=PingRequest(name="Bob"))
ping_request = StreamRequest(content=PingRequest(name="Bob"))

async with client.call_server_stream(ping_request) as response:
want = ["Hi Bob.", "I'm Eliza."]
Expand Down Expand Up @@ -181,7 +181,7 @@ async def test_server_streaming_received_message_after_end_stream(hypercorn_serv

async with AsyncClientSession() as session:
client = Client(session=session, url=url, input=PingRequest, output=PingResponse)
ping_request = StreamRequest(messages=PingRequest(name="Bob"))
ping_request = StreamRequest(content=PingRequest(name="Bob"))

async with client.call_server_stream(ping_request) as response:
want = ["Hi Bob.", "I'm Eliza."]
Expand Down Expand Up @@ -246,7 +246,7 @@ async def test_server_streaming_received_extra_end_stream(hypercorn_server: Serv

async with AsyncClientSession() as session:
client = Client(session=session, url=url, input=PingRequest, output=PingResponse)
ping_request = StreamRequest(messages=PingRequest(name="Bob"))
ping_request = StreamRequest(content=PingRequest(name="Bob"))

async with client.call_server_stream(ping_request) as response:
want = ["Hi Bob.", "I'm Eliza."]
Expand Down Expand Up @@ -297,7 +297,7 @@ async def test_server_streaming_not_received_end_stream(hypercorn_server: Server

async with AsyncClientSession() as session:
client = Client(session=session, url=url, input=PingRequest, output=PingResponse)
ping_request = StreamRequest(messages=PingRequest(name="Bob"))
ping_request = StreamRequest(content=PingRequest(name="Bob"))

async with client.call_server_stream(ping_request) as response:
want = ["Hi Bob.", "I'm Eliza."]
Expand Down Expand Up @@ -354,7 +354,7 @@ async def test_server_streaming_response_envelope_message_compression(hypercorn_

async with AsyncClientSession() as session:
client = Client(session=session, url=url, input=PingRequest, output=PingResponse)
ping_request = StreamRequest(messages=PingRequest(name="Bob"))
ping_request = StreamRequest(content=PingRequest(name="Bob"))

async with client.call_server_stream(ping_request) as response:
want = ["Hi Bob.", "I'm Eliza."]
Expand Down Expand Up @@ -419,7 +419,7 @@ async def test_server_streaming_request_envelope_message_compression(hypercorn_s
output=PingResponse,
options=ClientOptions(request_compression_name="gzip"),
)
ping_request = StreamRequest(messages=PingRequest(name="Bob"))
ping_request = StreamRequest(content=PingRequest(name="Bob"))

async with client.call_server_stream(ping_request) as response:
want = ["Hi Bob.", "I'm Eliza."]
Expand Down Expand Up @@ -479,7 +479,7 @@ async def _wrapped(request: StreamRequest[Any]) -> StreamResponse[Any]:
options=ClientOptions(interceptors=[FileInterceptor1(), FileInterceptor2()]),
)

ping_request = StreamRequest(messages=PingRequest(name="test"))
ping_request = StreamRequest(content=PingRequest(name="test"))

async with client.call_server_stream(ping_request):
assert len(ephemeral_files) == 2
Expand Down Expand Up @@ -516,7 +516,7 @@ async def test_server_streaming_not_httpstatus_200(hypercorn_server: ServerConfi

async with AsyncClientSession() as session:
client = Client(session=session, url=url, input=PingRequest, output=PingResponse)
ping_request = StreamRequest(messages=PingRequest(name="Bob"))
ping_request = StreamRequest(content=PingRequest(name="Bob"))

with pytest.raises(ConnectError) as excinfo:
async with client.call_server_stream(ping_request):
Expand Down Expand Up @@ -578,7 +578,7 @@ async def iterator() -> AsyncIterator[PingRequest]:

async with AsyncClientSession() as session:
client = Client(session=session, url=url, input=PingRequest, output=PingResponse)
ping_request = StreamRequest(messages=iterator())
ping_request = StreamRequest(content=iterator())

async with client.call_client_stream(ping_request) as response:
want = ["I'm fine."]
Expand Down Expand Up @@ -641,7 +641,7 @@ async def iterator() -> AsyncIterator[PingRequest]:
options=ClientOptions(interceptors=[FileInterceptor1(), FileInterceptor2()]),
)

ping_request = StreamRequest(messages=iterator())
ping_request = StreamRequest(content=iterator())

async with client.call_client_stream(ping_request):
assert len(ephemeral_files) == 2
Expand Down
18 changes: 9 additions & 9 deletions tests/test_unary_connect_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async def test_post_application_proto(hypercorn_server: ServerConfig) -> None:

async with AsyncClientSession() as session:
client = Client(session=session, url=url, input=PingRequest, output=PingResponse)
ping_request = UnaryRequest(message=PingRequest(name="test"))
ping_request = UnaryRequest(content=PingRequest(name="test"))

response = await client.call_unary(ping_request)

Expand Down Expand Up @@ -58,7 +58,7 @@ async def test_post_response_gzip(hypercorn_server: ServerConfig) -> None:

async with AsyncClientSession() as session:
client = Client(session=session, url=url, input=PingRequest, output=PingResponse)
ping_request = UnaryRequest(message=PingRequest(name="test"))
ping_request = UnaryRequest(content=PingRequest(name="test"))

await client.call_unary(ping_request)

Expand Down Expand Up @@ -98,7 +98,7 @@ async def test_post_request_gzip(hypercorn_server: ServerConfig) -> None:
output=PingResponse,
options=ClientOptions(request_compression_name="gzip"),
)
ping_request = UnaryRequest(message=PingRequest(name="test"))
ping_request = UnaryRequest(content=PingRequest(name="test"))

await client.call_unary(ping_request)

Expand Down Expand Up @@ -162,7 +162,7 @@ async def test_get_application_proto(hypercorn_server: ServerConfig) -> None:
output=PingResponse,
options=ClientOptions(idempotency_level=IdempotencyLevel.NO_SIDE_EFFECTS, enable_get=True),
)
ping_request = UnaryRequest(message=PingRequest(name="test"))
ping_request = UnaryRequest(content=PingRequest(name="test"))

await client.call_unary(ping_request)

Expand All @@ -188,7 +188,7 @@ async def test_post_not_found(hypercorn_server: ServerConfig) -> None:

async with AsyncClientSession() as session:
client = Client(session=session, url=url, input=PingRequest, output=PingResponse)
ping_request = UnaryRequest(message=PingRequest(name="test"))
ping_request = UnaryRequest(content=PingRequest(name="test"))

with pytest.raises(ConnectError) as excinfo:
await client.call_unary(ping_request)
Expand Down Expand Up @@ -220,7 +220,7 @@ async def test_post_invalid_content_type_prefix(hypercorn_server: ServerConfig)

async with AsyncClientSession() as session:
client = Client(session=session, url=url, input=PingRequest, output=PingResponse)
ping_request = UnaryRequest(message=PingRequest(name="test"))
ping_request = UnaryRequest(content=PingRequest(name="test"))

with pytest.raises(ConnectError) as excinfo:
await client.call_unary(ping_request)
Expand Down Expand Up @@ -270,7 +270,7 @@ async def test_post_error_details(hypercorn_server: ServerConfig) -> None:

async with AsyncClientSession() as session:
client = Client(session=session, url=url, input=PingRequest, output=PingResponse)
ping_request = UnaryRequest(message=PingRequest(name="test"))
ping_request = UnaryRequest(content=PingRequest(name="test"))

with pytest.raises(ConnectError) as excinfo:
await client.call_unary(ping_request)
Expand Down Expand Up @@ -330,7 +330,7 @@ async def test_post_compressed_error_details(hypercorn_server: ServerConfig) ->

async with AsyncClientSession() as session:
client = Client(session=session, url=url, input=PingRequest, output=PingResponse)
ping_request = UnaryRequest(message=PingRequest(name="test"))
ping_request = UnaryRequest(content=PingRequest(name="test"))

with pytest.raises(ConnectError) as excinfo:
await client.call_unary(ping_request)
Expand Down Expand Up @@ -401,7 +401,7 @@ async def _wrapped(request: UnaryRequest[Any]) -> UnaryResponse[Any]:
output=PingResponse,
options=ClientOptions(interceptors=[FileInterceptor1(), FileInterceptor2()]),
)
ping_request = UnaryRequest(message=PingRequest(name="test"))
ping_request = UnaryRequest(content=PingRequest(name="test"))

await client.call_unary(ping_request)

Expand Down
Loading