diff --git a/conformance/run-testcase.txt b/conformance/run-testcase.txt deleted file mode 100644 index 718089b..0000000 --- a/conformance/run-testcase.txt +++ /dev/null @@ -1 +0,0 @@ -Duplicate Metadata/HTTPVersion:1/Protocol:PROTOCOL_GRPC_WEB/Codec:CODEC_PROTO/Compression:COMPRESSION_IDENTITY/TLS:false/(grpc server impl)/bidi-stream/half-duplex/error-with-responses diff --git a/conformance/server.py b/conformance/server.py index ee046c9..b0f2c4c 100644 --- a/conformance/server.py +++ b/conformance/server.py @@ -123,61 +123,54 @@ async def Unary(self, request: UnaryRequest[service_pb2.UnaryRequest]) -> UnaryR - Returns the constructed response or raises the error if defined. """ - try: - response_definition = request.message.response_definition - - request_any = any_pb2.Any() - request_any.Pack(request.message) - - request_info = service_pb2.ConformancePayload.RequestInfo( - request_headers=pb_headers_from_headers(request.headers), - requests=[request_any], - timeout_ms=int(request.timeout) if request.timeout else None, - connect_get_info=service_pb2.ConformancePayload.ConnectGetInfo( - query_params=pb_query_params_from_peer_query(request.peer.query), - ), - ) + response_definition = request.message.response_definition + + request_any = any_pb2.Any() + request_any.Pack(request.message) + + request_info = service_pb2.ConformancePayload.RequestInfo( + request_headers=pb_headers_from_headers(request.headers), + requests=[request_any], + timeout_ms=int(request.timeout) if request.timeout else None, + connect_get_info=service_pb2.ConformancePayload.ConnectGetInfo( + query_params=pb_query_params_from_peer_query(request.peer.query), + ), + ) - error = None - if response_definition.HasField("error"): - detail = any_pb2.Any() - detail.Pack(request_info) - response_definition.error.details.append(detail) + error = None + if response_definition.HasField("error"): + detail = any_pb2.Any() + detail.Pack(request_info) + response_definition.error.details.append(detail) - headers = headers_from_pb_headers(response_definition.response_headers) - trailers = headers_from_pb_headers(response_definition.response_trailers) - - metadata = Headers() - metadata.update(headers) - metadata.update(trailers) + headers = headers_from_pb_headers(response_definition.response_headers) + trailers = headers_from_pb_headers(response_definition.response_trailers) - error = ConnectError( - message=response_definition.error.message, - code=code_from_pb_code(response_definition.error.code), - details=[ErrorDetail(pb_any=error) for error in response_definition.error.details], - metadata=metadata, - ) - else: - payload = service_pb2.ConformancePayload( - data=response_definition.response_data, - request_info=request_info, - ) + metadata = Headers() + metadata.update(headers) + metadata.update(trailers) - if response_definition: - headers = headers_from_pb_headers(response_definition.response_headers) - trailers = headers_from_pb_headers(response_definition.response_trailers) - - if response_definition.response_delay_ms: - await asyncio.sleep(response_definition.response_delay_ms / 1000) + error = ConnectError( + message=response_definition.error.message, + code=code_from_pb_code(response_definition.error.code), + details=[ErrorDetail(pb_any=error) for error in response_definition.error.details], + metadata=metadata, + ) + else: + payload = service_pb2.ConformancePayload( + data=response_definition.response_data, + request_info=request_info, + ) - if error: - raise error + if response_definition: + headers = headers_from_pb_headers(response_definition.response_headers) + trailers = headers_from_pb_headers(response_definition.response_trailers) - except ConnectError: - raise + if response_definition.response_delay_ms: + await asyncio.sleep(response_definition.response_delay_ms / 1000) - except Exception: - raise + if error: + raise error return UnaryResponse(content=service_pb2.UnaryResponse(payload=payload), headers=headers, trailers=trailers) @@ -204,61 +197,54 @@ async def IdempotentUnary( Exception: For any other unexpected errors. """ - try: - response_definition = request.message.response_definition - - request_any = any_pb2.Any() - request_any.Pack(request.message) - - request_info = service_pb2.ConformancePayload.RequestInfo( - request_headers=pb_headers_from_headers(request.headers), - requests=[request_any], - timeout_ms=int(request.timeout) if request.timeout else None, - connect_get_info=service_pb2.ConformancePayload.ConnectGetInfo( - query_params=pb_query_params_from_peer_query(request.peer.query), - ), - ) - - error = None - if response_definition.HasField("error"): - detail = any_pb2.Any() - detail.Pack(request_info) - response_definition.error.details.append(detail) - - headers = headers_from_pb_headers(response_definition.response_headers) - trailers = headers_from_pb_headers(response_definition.response_trailers) + response_definition = request.message.response_definition + + request_any = any_pb2.Any() + request_any.Pack(request.message) + + request_info = service_pb2.ConformancePayload.RequestInfo( + request_headers=pb_headers_from_headers(request.headers), + requests=[request_any], + timeout_ms=int(request.timeout) if request.timeout else None, + connect_get_info=service_pb2.ConformancePayload.ConnectGetInfo( + query_params=pb_query_params_from_peer_query(request.peer.query), + ), + ) - metadata = Headers() - metadata.update(headers) - metadata.update(trailers) + error = None + if response_definition.HasField("error"): + detail = any_pb2.Any() + detail.Pack(request_info) + response_definition.error.details.append(detail) - error = ConnectError( - message=response_definition.error.message, - code=code_from_pb_code(response_definition.error.code), - details=[ErrorDetail(pb_any=error) for error in response_definition.error.details], - metadata=metadata, - ) - else: - payload = service_pb2.ConformancePayload( - data=response_definition.response_data, - request_info=request_info, - ) + headers = headers_from_pb_headers(response_definition.response_headers) + trailers = headers_from_pb_headers(response_definition.response_trailers) - if response_definition: - headers = headers_from_pb_headers(response_definition.response_headers) - trailers = headers_from_pb_headers(response_definition.response_trailers) + metadata = Headers() + metadata.update(headers) + metadata.update(trailers) - if response_definition.response_delay_ms: - await asyncio.sleep(response_definition.response_delay_ms / 1000) + error = ConnectError( + message=response_definition.error.message, + code=code_from_pb_code(response_definition.error.code), + details=[ErrorDetail(pb_any=error) for error in response_definition.error.details], + metadata=metadata, + ) + else: + payload = service_pb2.ConformancePayload( + data=response_definition.response_data, + request_info=request_info, + ) - if error: - raise error + if response_definition: + headers = headers_from_pb_headers(response_definition.response_headers) + trailers = headers_from_pb_headers(response_definition.response_trailers) - except ConnectError: - raise + if response_definition.response_delay_ms: + await asyncio.sleep(response_definition.response_delay_ms / 1000) - except Exception: - raise + if error: + raise error return UnaryResponse( content=service_pb2.IdempotentUnaryResponse(payload=payload), headers=headers, trailers=trailers @@ -293,65 +279,58 @@ async def ClientStream( response_definition = None messages = [] - try: - async for message in request.messages: - if response_definition is None: - response_definition = message.response_definition - - message_any = any_pb2.Any() - message_any.Pack(message) - messages.append(message_any) - - request_info = service_pb2.ConformancePayload.RequestInfo( - request_headers=pb_headers_from_headers(request.headers), - requests=messages, - timeout_ms=int(request.timeout) if request.timeout else None, - connect_get_info=service_pb2.ConformancePayload.ConnectGetInfo( - query_params=pb_query_params_from_peer_query(request.peer.query), - ), - ) + async for message in request.messages: + if response_definition is None: + response_definition = message.response_definition + + message_any = any_pb2.Any() + message_any.Pack(message) + messages.append(message_any) + + request_info = service_pb2.ConformancePayload.RequestInfo( + request_headers=pb_headers_from_headers(request.headers), + requests=messages, + timeout_ms=int(request.timeout) if request.timeout else None, + connect_get_info=service_pb2.ConformancePayload.ConnectGetInfo( + query_params=pb_query_params_from_peer_query(request.peer.query), + ), + ) - error = None - payload = None - if response_definition and response_definition.HasField("error"): - detail = any_pb2.Any() - detail.Pack(request_info) - response_definition.error.details.append(detail) + error = None + payload = None + if response_definition and response_definition.HasField("error"): + detail = any_pb2.Any() + detail.Pack(request_info) + response_definition.error.details.append(detail) + + headers = headers_from_pb_headers(response_definition.response_headers) + trailers = headers_from_pb_headers(response_definition.response_trailers) + + metadata = Headers() + metadata.update(headers) + metadata.update(trailers) + + error = ConnectError( + message=response_definition.error.message, + code=code_from_pb_code(response_definition.error.code), + details=[ErrorDetail(pb_any=error) for error in response_definition.error.details], + metadata=metadata, + ) + else: + payload = service_pb2.ConformancePayload( + request_info=request_info, + ) + if response_definition: + payload.data = response_definition.response_data headers = headers_from_pb_headers(response_definition.response_headers) trailers = headers_from_pb_headers(response_definition.response_trailers) - metadata = Headers() - metadata.update(headers) - metadata.update(trailers) - - error = ConnectError( - message=response_definition.error.message, - code=code_from_pb_code(response_definition.error.code), - details=[ErrorDetail(pb_any=error) for error in response_definition.error.details], - metadata=metadata, - ) - else: - payload = service_pb2.ConformancePayload( - request_info=request_info, - ) - - if response_definition: - payload.data = response_definition.response_data - headers = headers_from_pb_headers(response_definition.response_headers) - trailers = headers_from_pb_headers(response_definition.response_trailers) - - if response_definition and response_definition.response_delay_ms: - await asyncio.sleep(response_definition.response_delay_ms / 1000) - - if error: - raise error - - except ConnectError: - raise + if response_definition and response_definition.response_delay_ms: + await asyncio.sleep(response_definition.response_delay_ms / 1000) - except Exception: - raise + if error: + raise error return StreamResponse( content=service_pb2.ClientStreamResponse(payload=payload), @@ -386,82 +365,75 @@ async def ServerStream( response_definition = None messages = [] - try: - async for message in request.messages: - if response_definition is None: - response_definition = message.response_definition + async for message in request.messages: + if response_definition is None: + response_definition = message.response_definition + + message_any = any_pb2.Any() + message_any.Pack(message) + messages.append(message_any) + + headers = None + trailers = None + if response_definition: + headers = headers_from_pb_headers(response_definition.response_headers) + trailers = headers_from_pb_headers(response_definition.response_trailers) + + request_info = service_pb2.ConformancePayload.RequestInfo( + request_headers=pb_headers_from_headers(request.headers), + requests=messages, + timeout_ms=int(request.timeout) if request.timeout else None, + connect_get_info=service_pb2.ConformancePayload.ConnectGetInfo( + query_params=pb_query_params_from_peer_query(request.peer.query), + ), + ) - message_any = any_pb2.Any() - message_any.Pack(message) - messages.append(message_any) + async def iterator() -> typing.AsyncIterator[service_pb2.ServerStreamResponse]: + first_response = True - headers = None - trailers = None - if response_definition: - headers = headers_from_pb_headers(response_definition.response_headers) - trailers = headers_from_pb_headers(response_definition.response_trailers) + if response_definition is None: + return - request_info = service_pb2.ConformancePayload.RequestInfo( - request_headers=pb_headers_from_headers(request.headers), - requests=messages, - timeout_ms=int(request.timeout) if request.timeout else None, - connect_get_info=service_pb2.ConformancePayload.ConnectGetInfo( - query_params=pb_query_params_from_peer_query(request.peer.query), - ), - ) - - async def iterator() -> typing.AsyncIterator[service_pb2.ServerStreamResponse]: - first_response = True - - if response_definition is None: - return - - for response_data in response_definition.response_data: - if first_response: - payload = service_pb2.ConformancePayload( - data=response_data, - request_info=request_info, - ) - first_response = False - else: - payload = service_pb2.ConformancePayload( - data=response_data, - ) - - if response_definition.response_delay_ms: - await asyncio.sleep(response_definition.response_delay_ms / 1000) - - yield service_pb2.ServerStreamResponse( - payload=payload, + for response_data in response_definition.response_data: + if first_response: + payload = service_pb2.ConformancePayload( + data=response_data, + request_info=request_info, + ) + first_response = False + else: + payload = service_pb2.ConformancePayload( + data=response_data, ) - if response_definition.HasField("error"): - headers = headers_from_pb_headers(response_definition.response_headers) - trailers = headers_from_pb_headers(response_definition.response_trailers) + if response_definition.response_delay_ms: + await asyncio.sleep(response_definition.response_delay_ms / 1000) - metadata = Headers() - metadata.update(headers) - metadata.update(trailers) + yield service_pb2.ServerStreamResponse( + payload=payload, + ) - if first_response: - detail = any_pb2.Any() - detail.Pack(request_info) - response_definition.error.details.append(detail) + if response_definition.HasField("error"): + headers = headers_from_pb_headers(response_definition.response_headers) + trailers = headers_from_pb_headers(response_definition.response_trailers) - error = ConnectError( - message=response_definition.error.message, - code=code_from_pb_code(response_definition.error.code), - details=[ErrorDetail(pb_any=error) for error in response_definition.error.details], - metadata=metadata, - ) + metadata = Headers() + metadata.update(headers) + metadata.update(trailers) - raise error + if first_response: + detail = any_pb2.Any() + detail.Pack(request_info) + response_definition.error.details.append(detail) - except ConnectError: - raise + error = ConnectError( + message=response_definition.error.message, + code=code_from_pb_code(response_definition.error.code), + details=[ErrorDetail(pb_any=error) for error in response_definition.error.details], + metadata=metadata, + ) - except Exception: - raise + raise error return StreamResponse( content=iterator(), @@ -503,78 +475,71 @@ async def BidiStream( first_response = True response_index = 0 - try: - async for message in request.messages: - message_any = any_pb2.Any() - message_any.Pack(message) - messages.append(message_any) + async for message in request.messages: + message_any = any_pb2.Any() + message_any.Pack(message) + messages.append(message_any) - if first_response: - response_definition = message.response_definition - first_response = False + if first_response: + response_definition = message.response_definition + first_response = False - if response_definition: - headers = headers_from_pb_headers(response_definition.response_headers) - trailers = headers_from_pb_headers(response_definition.response_trailers) - - async def iterator() -> typing.AsyncIterator[service_pb2.BidiStreamResponse]: - nonlocal response_index - - while response_definition and response_index < len(response_definition.response_data): - if response_index == 0: - request_info = service_pb2.ConformancePayload.RequestInfo( - request_headers=pb_headers_from_headers(request.headers), - requests=messages, - timeout_ms=int(request.timeout) if request.timeout else None, - ) - else: - request_info = None - - response = service_pb2.BidiStreamResponse( - payload=service_pb2.ConformancePayload( - request_info=request_info, - data=response_definition.response_data[response_index], - ) + if response_definition: + headers = headers_from_pb_headers(response_definition.response_headers) + trailers = headers_from_pb_headers(response_definition.response_trailers) + + async def iterator() -> typing.AsyncIterator[service_pb2.BidiStreamResponse]: + nonlocal response_index + + while response_definition and response_index < len(response_definition.response_data): + if response_index == 0: + request_info = service_pb2.ConformancePayload.RequestInfo( + request_headers=pb_headers_from_headers(request.headers), + requests=messages, + timeout_ms=int(request.timeout) if request.timeout else None, + ) + else: + request_info = None + + response = service_pb2.BidiStreamResponse( + payload=service_pb2.ConformancePayload( + request_info=request_info, + data=response_definition.response_data[response_index], ) - if response_definition.response_delay_ms: - await asyncio.sleep(response_definition.response_delay_ms / 1000) + ) + if response_definition.response_delay_ms: + await asyncio.sleep(response_definition.response_delay_ms / 1000) - response_index += 1 - yield response + response_index += 1 + yield response - if response_definition and response_definition.HasField("error"): - headers = headers_from_pb_headers(response_definition.response_headers) - trailers = headers_from_pb_headers(response_definition.response_trailers) + if response_definition and response_definition.HasField("error"): + headers = headers_from_pb_headers(response_definition.response_headers) + trailers = headers_from_pb_headers(response_definition.response_trailers) - metadata = Headers() - metadata.update(headers) - metadata.update(trailers) - - if response_index == 0: - request_info = service_pb2.ConformancePayload.RequestInfo( - request_headers=pb_headers_from_headers(request.headers), - requests=messages, - timeout_ms=int(request.timeout) if request.timeout else None, - ) - - detail = any_pb2.Any() - detail.Pack(request_info) - response_definition.error.details.append(detail) - - error = ConnectError( - message=response_definition.error.message, - code=code_from_pb_code(response_definition.error.code), - details=[ErrorDetail(pb_any=error) for error in response_definition.error.details], - metadata=metadata, + metadata = Headers() + metadata.update(headers) + metadata.update(trailers) + + if response_index == 0: + request_info = service_pb2.ConformancePayload.RequestInfo( + request_headers=pb_headers_from_headers(request.headers), + requests=messages, + timeout_ms=int(request.timeout) if request.timeout else None, ) - raise error + detail = any_pb2.Any() + detail.Pack(request_info) + response_definition.error.details.append(detail) - except ConnectError: - raise + error = ConnectError( + message=response_definition.error.message, + code=code_from_pb_code(response_definition.error.code), + details=[ErrorDetail(pb_any=error) for error in response_definition.error.details], + metadata=metadata, + ) - except Exception: - raise + raise error return StreamResponse( content=iterator(), diff --git a/conformance/server_known_failing.yaml b/conformance/server_known_failing.yaml deleted file mode 100644 index f91f5db..0000000 --- a/conformance/server_known_failing.yaml +++ /dev/null @@ -1 +0,0 @@ -Deadline Propagation/** diff --git a/conformance/server_runner.py b/conformance/server_runner.py index c2558f5..ab91da9 100644 --- a/conformance/server_runner.py +++ b/conformance/server_runner.py @@ -21,15 +21,13 @@ from gen.connectrpc.conformance.v1.server_compat_pb2 import ServerCompatRequest, ServerCompatResponse from server import app -logger = logging.getLogger("conformance.runner") - -# Setup logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[logging.FileHandler("conformance_server.log"), logging.StreamHandler()], ) -logger = logging.getLogger(__name__) + +logger = logging.getLogger("conformance.runner") def find_free_port() -> int: