From c442bcd8036f720ae398e3a7b92497f2fb42e329 Mon Sep 17 00:00:00 2001 From: Brent Champion Date: Mon, 1 Dec 2025 18:48:50 -0500 Subject: [PATCH 1/4] fix: add InvocationCompleted event support --- .../execution.py | 16 +++++++ .../executor.py | 26 +++++++++++- .../invoker.py | 23 +++++++--- .../model.py | 6 +++ tests/executor_test.py | 42 ++++++++++++------- tests/invoker_test.py | 16 +++++-- 6 files changed, 103 insertions(+), 26 deletions(-) diff --git a/src/aws_durable_execution_sdk_python_testing/execution.py b/src/aws_durable_execution_sdk_python_testing/execution.py index 113db5b..b12e345 100644 --- a/src/aws_durable_execution_sdk_python_testing/execution.py +++ b/src/aws_durable_execution_sdk_python_testing/execution.py @@ -60,6 +60,9 @@ def __init__( self.start_input: StartDurableExecutionInput = start_input self.operations: list[Operation] = operations self.updates: list[OperationUpdate] = [] + self.invocation_completions: list[ + tuple[float, float, str] + ] = [] # (start_ts, end_ts, request_id) self.used_tokens: set[str] = set() # TODO: this will need to persist/rehydrate depending on inmemory vs sqllite store self._token_sequence: int = 0 @@ -101,6 +104,10 @@ def to_dict(self) -> dict[str, Any]: "StartInput": self.start_input.to_dict(), "Operations": [op.to_dict() for op in self.operations], "Updates": [update.to_dict() for update in self.updates], + "InvocationCompletions": [ + [start, end, req_id] + for start, end, req_id in self.invocation_completions + ], "UsedTokens": list(self.used_tokens), "TokenSequence": self._token_sequence, "IsComplete": self.is_complete, @@ -129,6 +136,9 @@ def from_dict(cls, data: dict[str, Any]) -> Execution: execution.updates = [ OperationUpdate.from_dict(update_data) for update_data in data["Updates"] ] + execution.invocation_completions = [ + tuple(item) for item in data.get("InvocationCompletions", []) + ] execution.used_tokens = set(data["UsedTokens"]) execution._token_sequence = data["TokenSequence"] # noqa: SLF001 execution.is_complete = data["IsComplete"] @@ -215,6 +225,12 @@ def has_pending_operations(self, execution: Execution) -> bool: return True return False + def record_invocation_completion( + self, start_timestamp: float, end_timestamp: float, request_id: str + ) -> None: + """Record an invocation completion event.""" + self.invocation_completions.append((start_timestamp, end_timestamp, request_id)) + def complete_success(self, result: str | None) -> None: """Complete execution successfully (DecisionType.COMPLETE_WORKFLOW_EXECUTION).""" self.result = DurableExecutionInvocationOutput( diff --git a/src/aws_durable_execution_sdk_python_testing/executor.py b/src/aws_durable_execution_sdk_python_testing/executor.py index 32abf7d..6e3ae06 100644 --- a/src/aws_durable_execution_sdk_python_testing/executor.py +++ b/src/aws_durable_execution_sdk_python_testing/executor.py @@ -3,6 +3,7 @@ from __future__ import annotations import logging +import time import uuid from datetime import UTC, datetime from typing import TYPE_CHECKING @@ -413,6 +414,20 @@ def get_execution_history( updates_dict: dict[str, OperationUpdate] = {u.operation_id: u for u in updates} durable_execution_arn: str = execution.durable_execution_arn + # Add InvocationCompleted events + for start_ts, end_ts, request_id in execution.invocation_completions: + invocation_event = HistoryEvent( + event_id=0, # Temporary, will be reassigned + event_type="InvocationCompleted", + event_timestamp=datetime.fromtimestamp(end_ts, tz=UTC), + invocation_completed_details={ + "StartTimestamp": start_ts, + "EndTimestamp": end_ts, + "RequestId": request_id, + }, + ) + all_events.append(invocation_event) + # Generate all events first (without final event IDs) for op in ops: operation_update: OperationUpdate | None = updates_dict.get( @@ -769,14 +784,23 @@ async def invoke() -> None: self._store.save(execution) - response: DurableExecutionInvocationOutput = self._invoker.invoke( + invocation_start = time.time() + response, request_id = self._invoker.invoke( execution.start_input.function_name, invocation_input, execution.start_input.lambda_endpoint, ) + invocation_end = time.time() # Reload execution after invocation in case it was completed via checkpoint execution = self._store.load(execution_arn) + + # Record invocation completion and save immediately + execution.record_invocation_completion( + invocation_start, invocation_end, request_id + ) + self._store.save(execution) + if execution.is_complete: logger.info( "[%s] Execution completed during invocation, ignoring result", diff --git a/src/aws_durable_execution_sdk_python_testing/invoker.py b/src/aws_durable_execution_sdk_python_testing/invoker.py index a363340..af9d448 100644 --- a/src/aws_durable_execution_sdk_python_testing/invoker.py +++ b/src/aws_durable_execution_sdk_python_testing/invoker.py @@ -3,6 +3,7 @@ import json from threading import Lock from typing import TYPE_CHECKING, Any, Protocol +from uuid import uuid4 import boto3 # type: ignore from aws_durable_execution_sdk_python.execution import ( @@ -65,7 +66,7 @@ def invoke( function_name: str, input: DurableExecutionInvocationInput, endpoint_url: str | None = None, - ) -> DurableExecutionInvocationOutput: ... # pragma: no cover + ) -> tuple[DurableExecutionInvocationOutput, str]: ... # pragma: no cover def update_endpoint( self, endpoint_url: str, region_name: str @@ -96,14 +97,15 @@ def invoke( function_name: str, # noqa: ARG002 input: DurableExecutionInvocationInput, endpoint_url: str | None = None, # noqa: ARG002 - ) -> DurableExecutionInvocationOutput: + ) -> tuple[DurableExecutionInvocationOutput, str]: # TODO: reasses if function_name will be used in future input_with_client = DurableExecutionInvocationInputWithClient.from_durable_execution_invocation_input( input, self.service_client ) context = create_test_lambda_context() response_dict = self.handler(input_with_client, context) - return DurableExecutionInvocationOutput.from_dict(response_dict) + output = DurableExecutionInvocationOutput.from_dict(response_dict) + return output, context.aws_request_id def update_endpoint(self, endpoint_url: str, region_name: str) -> None: """No-op for in-process invoker.""" @@ -192,7 +194,7 @@ def invoke( function_name: str, input: DurableExecutionInvocationInput, endpoint_url: str | None = None, - ) -> DurableExecutionInvocationOutput: + ) -> tuple[DurableExecutionInvocationOutput, str]: """Invoke AWS Lambda function and return durable execution result. Args: @@ -201,7 +203,7 @@ def invoke( endpoint_url: Lambda endpoint url Returns: - DurableExecutionInvocationOutput: Result of the function execution + tuple: (DurableExecutionInvocationOutput, request_id) Raises: ResourceNotFoundException: If function does not exist @@ -247,8 +249,17 @@ def invoke( response_payload = response["Payload"].read().decode("utf-8") response_dict = json.loads(response_payload) + # Extract request ID from response headers (x-amzn-RequestId or x-amzn-request-id) + headers = response.get("ResponseMetadata", {}).get("HTTPHeaders", {}) + request_id = ( + headers.get("x-amzn-RequestId") + or headers.get("x-amzn-request-id") + or f"local-{uuid4()}" + ) + # Convert to DurableExecutionInvocationOutput - return DurableExecutionInvocationOutput.from_dict(response_dict) + output = DurableExecutionInvocationOutput.from_dict(response_dict) + return output, request_id except client.exceptions.ResourceNotFoundException as e: msg = f"Function not found: {function_name}" diff --git a/src/aws_durable_execution_sdk_python_testing/model.py b/src/aws_durable_execution_sdk_python_testing/model.py index 87f624c..c0b2bad 100644 --- a/src/aws_durable_execution_sdk_python_testing/model.py +++ b/src/aws_durable_execution_sdk_python_testing/model.py @@ -1329,6 +1329,7 @@ class Event: callback_succeeded_details: CallbackSucceededDetails | None = None callback_failed_details: CallbackFailedDetails | None = None callback_timed_out_details: CallbackTimedOutDetails | None = None + invocation_completed_details: dict[str, Any] | None = None @classmethod def from_dict(cls, data: dict) -> Event: @@ -1447,6 +1448,8 @@ def from_dict(cls, data: dict) -> Event: if details_data := data.get("CallbackTimedOutDetails"): callback_timed_out_details = CallbackTimedOutDetails.from_dict(details_data) + invocation_completed_details = data.get("InvocationCompletedDetails") + return cls( event_type=data["EventType"], event_timestamp=data["EventTimestamp"], @@ -1479,6 +1482,7 @@ def from_dict(cls, data: dict) -> Event: callback_succeeded_details=callback_succeeded_details, callback_failed_details=callback_failed_details, callback_timed_out_details=callback_timed_out_details, + invocation_completed_details=invocation_completed_details, ) def to_dict(self) -> dict[str, Any]: @@ -1563,6 +1567,8 @@ def to_dict(self) -> dict[str, Any]: result["CallbackTimedOutDetails"] = ( self.callback_timed_out_details.to_dict() ) + if self.invocation_completed_details is not None: + result["InvocationCompletedDetails"] = self.invocation_completed_details return result # region execution diff --git a/tests/executor_test.py b/tests/executor_test.py index dad04f0..51237b7 100644 --- a/tests/executor_test.py +++ b/tests/executor_test.py @@ -285,7 +285,7 @@ def test_should_complete_workflow_with_error_when_invocation_fails( failed_response = DurableExecutionInvocationOutput( status=InvocationStatus.FAILED, error=ErrorObject.from_message("Test error") ) - mock_invoker.invoke.return_value = failed_response + mock_invoker.invoke.return_value = (failed_response, "test-request-id") # Mock execution creation and store behavior with patch( @@ -329,7 +329,7 @@ def test_should_complete_workflow_with_result_when_invocation_succeeds( success_response = DurableExecutionInvocationOutput( status=InvocationStatus.SUCCEEDED, result="success result" ) - mock_invoker.invoke.return_value = success_response + mock_invoker.invoke.return_value = (success_response, "test-request-id") # Mock execution creation and store behavior with patch( @@ -372,7 +372,7 @@ def test_should_handle_pending_status_when_operations_exist( mock_invocation_input = Mock() mock_invoker.create_invocation_input.return_value = mock_invocation_input pending_response = DurableExecutionInvocationOutput(status=InvocationStatus.PENDING) - mock_invoker.invoke.return_value = pending_response + mock_invoker.invoke.return_value = (pending_response, "test-request-id") # Mock execution creation and store behavior with patch( @@ -409,8 +409,9 @@ def test_should_ignore_response_when_execution_already_complete( # Mock invoker - this shouldn't be called since execution is complete mock_invoker.create_invocation_input.return_value = Mock() - mock_invoker.invoke.return_value = DurableExecutionInvocationOutput( - status=InvocationStatus.SUCCEEDED + mock_invoker.invoke.return_value = ( + DurableExecutionInvocationOutput(status=InvocationStatus.SUCCEEDED), + "test-request-id", ) # Mock execution creation and store behavior @@ -452,7 +453,7 @@ def test_should_retry_when_response_has_no_status( mock_invocation_input = Mock() mock_invoker.create_invocation_input.return_value = mock_invocation_input no_status_response = DurableExecutionInvocationOutput(status=None) - mock_invoker.invoke.return_value = no_status_response + mock_invoker.invoke.return_value = (no_status_response, "test-request-id") # Mock execution creation and store behavior with patch( @@ -495,7 +496,7 @@ def test_should_retry_when_failed_response_has_result( invalid_response = DurableExecutionInvocationOutput( status=InvocationStatus.FAILED, result="should not have result" ) - mock_invoker.invoke.return_value = invalid_response + mock_invoker.invoke.return_value = (invalid_response, "test-request-id") # Mock execution creation and store behavior with patch( @@ -539,7 +540,7 @@ def test_should_retry_when_success_response_has_error( status=InvocationStatus.SUCCEEDED, error=ErrorObject.from_message("should not have error"), ) - mock_invoker.invoke.return_value = invalid_response + mock_invoker.invoke.return_value = (invalid_response, "test-request-id") # Mock execution creation and store behavior with patch( @@ -581,7 +582,7 @@ def test_should_retry_when_pending_response_has_no_operations( mock_invocation_input = Mock() mock_invoker.create_invocation_input.return_value = mock_invocation_input pending_response = DurableExecutionInvocationOutput(status=InvocationStatus.PENDING) - mock_invoker.invoke.return_value = pending_response + mock_invoker.invoke.return_value = (pending_response, "test-request-id") # Mock execution creation and store behavior with patch( @@ -622,7 +623,7 @@ def test_invoke_handler_success( mock_response = DurableExecutionInvocationOutput( status=InvocationStatus.SUCCEEDED, result="test" ) - mock_invoker.invoke.return_value = mock_response + mock_invoker.invoke.return_value = (mock_response, "test-request-id") # Mock execution creation and store behavior with patch( @@ -694,7 +695,7 @@ def test_invoke_handler_execution_completed_during_invocation( mock_invocation_input = Mock() mock_invoker.create_invocation_input.return_value = mock_invocation_input mock_response = Mock() - mock_invoker.invoke.return_value = mock_response + mock_invoker.invoke.return_value = (mock_response, "test-request-id") # Create a completed execution mock completed_execution = Mock() @@ -1037,7 +1038,10 @@ def test_should_retry_invocation_when_under_limit_through_public_api( success_response = DurableExecutionInvocationOutput( status=InvocationStatus.SUCCEEDED, result="final success" ) - mock_invoker.invoke.side_effect = [invalid_response, success_response] + mock_invoker.invoke.side_effect = [ + (invalid_response, "test-request-id-1"), + (success_response, "test-request-id-2"), + ] # Mock execution creation and store behavior with patch( @@ -1435,7 +1439,7 @@ def test_should_retry_when_response_has_unexpected_status( mock_invoker.create_invocation_input.return_value = mock_invocation_input unexpected_response = Mock() unexpected_response.status = "UNKNOWN_STATUS" - mock_invoker.invoke.return_value = unexpected_response + mock_invoker.invoke.return_value = (unexpected_response, "test-request-id") # Mock execution creation and store behavior with patch( @@ -1480,7 +1484,7 @@ def test_invoke_handler_execution_completed_during_invocation_async( mock_invocation_input = Mock() mock_invoker.create_invocation_input.return_value = mock_invocation_input mock_response = Mock() - mock_invoker.invoke.return_value = mock_response + mock_invoker.invoke.return_value = (mock_response, "test-request-id") # Mock execution creation with patch( @@ -1566,7 +1570,7 @@ def test_invoke_handler_general_exception_async( success_response = DurableExecutionInvocationOutput( status=InvocationStatus.SUCCEEDED, result="success" ) - mock_invoker.invoke.return_value = success_response + mock_invoker.invoke.return_value = (success_response, "test-request-id") # Mock execution creation and store behavior with patch( @@ -2094,6 +2098,7 @@ def test_get_execution_history(executor, mock_store): mock_execution = Mock() mock_execution.operations = [] # Empty operations list mock_execution.updates = [] + mock_execution.invocation_completions = [] mock_execution.durable_execution_arn = "" mock_execution.start_input = Mock() mock_execution.result = Mock() @@ -2123,6 +2128,7 @@ def test_get_execution_history_with_events(executor, mock_store): mock_execution = Mock() mock_execution.operations = [op1] mock_execution.updates = [] + mock_execution.invocation_completions = [] mock_execution.durable_execution_arn = "" mock_execution.start_input = Mock() mock_execution.result = Mock() @@ -2148,6 +2154,7 @@ def test_get_execution_history_reverse_order(executor, mock_store): mock_execution = Mock() mock_execution.operations = [op1] mock_execution.updates = [] + mock_execution.invocation_completions = [] mock_execution.durable_execution_arn = "" mock_execution.start_input = Mock() mock_execution.result = Mock() @@ -2178,6 +2185,7 @@ def test_get_execution_history_pagination(executor, mock_store): mock_execution = Mock() mock_execution.operations = operations mock_execution.updates = [] + mock_execution.invocation_completions = [] mock_execution.durable_execution_arn = "" mock_execution.start_input = Mock() mock_execution.result = Mock() @@ -2206,6 +2214,7 @@ def test_get_execution_history_pagination_with_marker(executor, mock_store): mock_execution = Mock() mock_execution.operations = operations mock_execution.updates = [] + mock_execution.invocation_completions = [] mock_execution.durable_execution_arn = "" mock_execution.start_input = Mock() mock_execution.result = Mock() @@ -2223,6 +2232,7 @@ def test_get_execution_history_invalid_marker(executor, mock_store): mock_execution = Mock() mock_execution.operations = [] mock_execution.updates = [] + mock_execution.invocation_completions = [] mock_execution.durable_execution_arn = "" mock_execution.start_input = Mock() mock_execution.result = Mock() @@ -2399,6 +2409,7 @@ def test_send_callback_heartbeat(executor, mock_store): mock_operation.status = OperationStatus.STARTED mock_execution.find_callback_operation.return_value = (0, mock_operation) mock_execution.updates = [] # No callback options to reset timeout + mock_execution.invocation_completions = [] mock_store.load.return_value = mock_execution result = executor.send_callback_heartbeat(callback_id) @@ -2651,6 +2662,7 @@ def test_schedule_callback_timeouts_no_callback_options(executor, mock_store): mock_execution = Mock() mock_execution.find_operation.return_value = (0, operation) mock_execution.updates = [] # No updates with callback options + mock_execution.invocation_completions = [] mock_store.load.return_value = mock_execution # Should return early without scheduling diff --git a/tests/invoker_test.py b/tests/invoker_test.py index e7fe4e1..dd942f4 100644 --- a/tests/invoker_test.py +++ b/tests/invoker_test.py @@ -87,11 +87,12 @@ def test_in_process_invoker_invoke(): initial_execution_state=InitialExecutionState(operations=[], next_marker=""), ) - result = invoker.invoke("test-function", input_data) + result, request_id = invoker.invoke("test-function", input_data) assert isinstance(result, DurableExecutionInvocationOutput) assert result.status == InvocationStatus.SUCCEEDED assert result.result == "test-result" + assert isinstance(request_id, str) # Verify handler was called with correct arguments handler.assert_called_once() @@ -162,6 +163,7 @@ def test_lambda_invoker_invoke_success(): lambda_client.invoke.return_value = { "StatusCode": 200, "Payload": mock_payload, + "ResponseMetadata": {"HTTPHeaders": {"x-amzn-RequestId": "test-request-id"}}, } invoker = LambdaInvoker(lambda_client) @@ -172,11 +174,12 @@ def test_lambda_invoker_invoke_success(): initial_execution_state=InitialExecutionState(operations=[], next_marker=""), ) - result = invoker.invoke("test-function", input_data) + result, request_id = invoker.invoke("test-function", input_data) assert isinstance(result, DurableExecutionInvocationOutput) assert result.status == InvocationStatus.SUCCEEDED assert result.result == "lambda-result" + assert request_id == "test-request-id" # Verify lambda client was called correctly lambda_client.invoke.assert_called_once_with( @@ -237,9 +240,10 @@ def test_in_process_invoker_invoke_with_execution_operations(): execution.start() # This adds operations invocation_input = invoker.create_invocation_input(execution) - result = invoker.invoke("test-function", invocation_input) + result, request_id = invoker.invoke("test-function", invocation_input) assert isinstance(result, DurableExecutionInvocationOutput) + assert isinstance(request_id, str) assert result.status == InvocationStatus.SUCCEEDED assert len(invocation_input.initial_execution_state.operations) > 0 @@ -322,6 +326,9 @@ def test_lambda_invoker_invoke_status_202(): lambda_client.invoke.return_value = { "StatusCode": 202, "Payload": mock_payload, + "ResponseMetadata": { + "HTTPHeaders": {"x-amzn-RequestId": "test-request-id-202"} + }, } invoker = LambdaInvoker(lambda_client) @@ -332,8 +339,9 @@ def test_lambda_invoker_invoke_status_202(): initial_execution_state=InitialExecutionState(operations=[], next_marker=""), ) - result = invoker.invoke("test-function", input_data) + result, request_id = invoker.invoke("test-function", input_data) assert isinstance(result, DurableExecutionInvocationOutput) + assert request_id == "test-request-id-202" def test_lambda_invoker_invoke_function_error(): From 755a35e02f341a2c01276c929fa28f6cbe95affc Mon Sep 17 00:00:00 2001 From: Brent Champion Date: Mon, 1 Dec 2025 21:30:08 -0500 Subject: [PATCH 2/4] chore: code review --- .../execution.py | 19 +++--- .../executor.py | 22 +++---- .../invoker.py | 23 +++++-- .../model.py | 61 ++++++++++++++++++- tests/executor_test.py | 57 ++++++++++++----- tests/invoker_test.py | 34 +++++------ 6 files changed, 158 insertions(+), 58 deletions(-) diff --git a/src/aws_durable_execution_sdk_python_testing/execution.py b/src/aws_durable_execution_sdk_python_testing/execution.py index b12e345..c13a14d 100644 --- a/src/aws_durable_execution_sdk_python_testing/execution.py +++ b/src/aws_durable_execution_sdk_python_testing/execution.py @@ -28,6 +28,7 @@ # Import AWS exceptions from aws_durable_execution_sdk_python_testing.model import ( + InvocationCompletedDetails, StartDurableExecutionInput, ) from aws_durable_execution_sdk_python_testing.token import ( @@ -60,9 +61,7 @@ def __init__( self.start_input: StartDurableExecutionInput = start_input self.operations: list[Operation] = operations self.updates: list[OperationUpdate] = [] - self.invocation_completions: list[ - tuple[float, float, str] - ] = [] # (start_ts, end_ts, request_id) + self.invocation_completions: list[InvocationCompletedDetails] = [] self.used_tokens: set[str] = set() # TODO: this will need to persist/rehydrate depending on inmemory vs sqllite store self._token_sequence: int = 0 @@ -105,8 +104,7 @@ def to_dict(self) -> dict[str, Any]: "Operations": [op.to_dict() for op in self.operations], "Updates": [update.to_dict() for update in self.updates], "InvocationCompletions": [ - [start, end, req_id] - for start, end, req_id in self.invocation_completions + completion.to_dict() for completion in self.invocation_completions ], "UsedTokens": list(self.used_tokens), "TokenSequence": self._token_sequence, @@ -137,7 +135,8 @@ def from_dict(cls, data: dict[str, Any]) -> Execution: OperationUpdate.from_dict(update_data) for update_data in data["Updates"] ] execution.invocation_completions = [ - tuple(item) for item in data.get("InvocationCompletions", []) + InvocationCompletedDetails.from_dict(item) + for item in data.get("InvocationCompletions", []) ] execution.used_tokens = set(data["UsedTokens"]) execution._token_sequence = data["TokenSequence"] # noqa: SLF001 @@ -229,7 +228,13 @@ def record_invocation_completion( self, start_timestamp: float, end_timestamp: float, request_id: str ) -> None: """Record an invocation completion event.""" - self.invocation_completions.append((start_timestamp, end_timestamp, request_id)) + self.invocation_completions.append( + InvocationCompletedDetails( + start_timestamp=start_timestamp, + end_timestamp=end_timestamp, + request_id=request_id, + ) + ) def complete_success(self, result: str | None) -> None: """Complete execution successfully (DecisionType.COMPLETE_WORKFLOW_EXECUTION).""" diff --git a/src/aws_durable_execution_sdk_python_testing/executor.py b/src/aws_durable_execution_sdk_python_testing/executor.py index 6e3ae06..3d3c4f6 100644 --- a/src/aws_durable_execution_sdk_python_testing/executor.py +++ b/src/aws_durable_execution_sdk_python_testing/executor.py @@ -33,6 +33,8 @@ from aws_durable_execution_sdk_python_testing.model import ( CheckpointDurableExecutionResponse, CheckpointUpdatedExecutionState, + EventCreationContext, + EventType, GetDurableExecutionHistoryResponse, GetDurableExecutionResponse, GetDurableExecutionStateResponse, @@ -45,7 +47,6 @@ StartDurableExecutionOutput, StopDurableExecutionResponse, TERMINAL_STATUSES, - EventCreationContext, ) from aws_durable_execution_sdk_python_testing.model import ( Event as HistoryEvent, @@ -415,16 +416,14 @@ def get_execution_history( durable_execution_arn: str = execution.durable_execution_arn # Add InvocationCompleted events - for start_ts, end_ts, request_id in execution.invocation_completions: + for completion in execution.invocation_completions: invocation_event = HistoryEvent( event_id=0, # Temporary, will be reassigned - event_type="InvocationCompleted", - event_timestamp=datetime.fromtimestamp(end_ts, tz=UTC), - invocation_completed_details={ - "StartTimestamp": start_ts, - "EndTimestamp": end_ts, - "RequestId": request_id, - }, + event_type=EventType.INVOCATION_COMPLETED.value, + event_timestamp=datetime.fromtimestamp( + completion.end_timestamp, tz=UTC + ), + invocation_completed_details=completion, ) all_events.append(invocation_event) @@ -785,7 +784,7 @@ async def invoke() -> None: self._store.save(execution) invocation_start = time.time() - response, request_id = self._invoker.invoke( + invoke_response = self._invoker.invoke( execution.start_input.function_name, invocation_input, execution.start_input.lambda_endpoint, @@ -797,7 +796,7 @@ async def invoke() -> None: # Record invocation completion and save immediately execution.record_invocation_completion( - invocation_start, invocation_end, request_id + invocation_start, invocation_end, invoke_response.request_id ) self._store.save(execution) @@ -809,6 +808,7 @@ async def invoke() -> None: return # Process successful received response - validate status and handle accordingly + response = invoke_response.invocation_output try: self._validate_invocation_response_and_store( execution_arn, response, execution diff --git a/src/aws_durable_execution_sdk_python_testing/invoker.py b/src/aws_durable_execution_sdk_python_testing/invoker.py index af9d448..0549ad7 100644 --- a/src/aws_durable_execution_sdk_python_testing/invoker.py +++ b/src/aws_durable_execution_sdk_python_testing/invoker.py @@ -1,6 +1,7 @@ from __future__ import annotations import json +from dataclasses import dataclass from threading import Lock from typing import TYPE_CHECKING, Any, Protocol from uuid import uuid4 @@ -27,6 +28,14 @@ from aws_durable_execution_sdk_python_testing.execution import Execution +@dataclass(frozen=True) +class InvokeResponse: + """Response from invoking a durable function.""" + + invocation_output: DurableExecutionInvocationOutput + request_id: str + + def create_test_lambda_context() -> LambdaContext: # Create client context as a dictionary, not as objects # LambdaContext.__init__ expects dictionaries and will create the objects internally @@ -66,7 +75,7 @@ def invoke( function_name: str, input: DurableExecutionInvocationInput, endpoint_url: str | None = None, - ) -> tuple[DurableExecutionInvocationOutput, str]: ... # pragma: no cover + ) -> InvokeResponse: ... # pragma: no cover def update_endpoint( self, endpoint_url: str, region_name: str @@ -97,7 +106,7 @@ def invoke( function_name: str, # noqa: ARG002 input: DurableExecutionInvocationInput, endpoint_url: str | None = None, # noqa: ARG002 - ) -> tuple[DurableExecutionInvocationOutput, str]: + ) -> InvokeResponse: # TODO: reasses if function_name will be used in future input_with_client = DurableExecutionInvocationInputWithClient.from_durable_execution_invocation_input( input, self.service_client @@ -105,7 +114,9 @@ def invoke( context = create_test_lambda_context() response_dict = self.handler(input_with_client, context) output = DurableExecutionInvocationOutput.from_dict(response_dict) - return output, context.aws_request_id + return InvokeResponse( + invocation_output=output, request_id=context.aws_request_id + ) def update_endpoint(self, endpoint_url: str, region_name: str) -> None: """No-op for in-process invoker.""" @@ -194,7 +205,7 @@ def invoke( function_name: str, input: DurableExecutionInvocationInput, endpoint_url: str | None = None, - ) -> tuple[DurableExecutionInvocationOutput, str]: + ) -> InvokeResponse: """Invoke AWS Lambda function and return durable execution result. Args: @@ -203,7 +214,7 @@ def invoke( endpoint_url: Lambda endpoint url Returns: - tuple: (DurableExecutionInvocationOutput, request_id) + InvokeResponse: Response containing invocation output and request ID Raises: ResourceNotFoundException: If function does not exist @@ -259,7 +270,7 @@ def invoke( # Convert to DurableExecutionInvocationOutput output = DurableExecutionInvocationOutput.from_dict(response_dict) - return output, request_id + return InvokeResponse(invocation_output=output, request_id=request_id) except client.exceptions.ResourceNotFoundException as e: msg = f"Function not found: {function_name}" diff --git a/src/aws_durable_execution_sdk_python_testing/model.py b/src/aws_durable_execution_sdk_python_testing/model.py index c0b2bad..7d482a3 100644 --- a/src/aws_durable_execution_sdk_python_testing/model.py +++ b/src/aws_durable_execution_sdk_python_testing/model.py @@ -68,6 +68,7 @@ class EventType(Enum): CALLBACK_SUCCEEDED = "CallbackSucceeded" CALLBACK_FAILED = "CallbackFailed" CALLBACK_TIMED_OUT = "CallbackTimedOut" + INVOCATION_COMPLETED = "InvocationCompleted" TERMINAL_STATUSES: set[OperationStatus] = { @@ -1222,6 +1223,30 @@ def to_dict(self) -> dict[str, Any]: return result +@dataclass(frozen=True) +class InvocationCompletedDetails: + """Invocation completed event details.""" + + start_timestamp: float + end_timestamp: float + request_id: str + + @classmethod + def from_dict(cls, data: dict) -> InvocationCompletedDetails: + return cls( + start_timestamp=data["StartTimestamp"], + end_timestamp=data["EndTimestamp"], + request_id=data["RequestId"], + ) + + def to_dict(self) -> dict[str, Any]: + return { + "StartTimestamp": self.start_timestamp, + "EndTimestamp": self.end_timestamp, + "RequestId": self.request_id, + } + + # endregion event_structures @@ -1329,7 +1354,7 @@ class Event: callback_succeeded_details: CallbackSucceededDetails | None = None callback_failed_details: CallbackFailedDetails | None = None callback_timed_out_details: CallbackTimedOutDetails | None = None - invocation_completed_details: dict[str, Any] | None = None + invocation_completed_details: InvocationCompletedDetails | None = None @classmethod def from_dict(cls, data: dict) -> Event: @@ -1448,7 +1473,11 @@ def from_dict(cls, data: dict) -> Event: if details_data := data.get("CallbackTimedOutDetails"): callback_timed_out_details = CallbackTimedOutDetails.from_dict(details_data) - invocation_completed_details = data.get("InvocationCompletedDetails") + invocation_completed_details = None + if details_data := data.get("InvocationCompletedDetails"): + invocation_completed_details = InvocationCompletedDetails.from_dict( + details_data + ) return cls( event_type=data["EventType"], @@ -1568,7 +1597,9 @@ def to_dict(self) -> dict[str, Any]: self.callback_timed_out_details.to_dict() ) if self.invocation_completed_details is not None: - result["InvocationCompletedDetails"] = self.invocation_completed_details + result["InvocationCompletedDetails"] = ( + self.invocation_completed_details.to_dict() + ) return result # region execution @@ -2224,6 +2255,30 @@ def create_callback_event(cls, context: EventCreationContext) -> Event: # endregion callback + # region invocation_completed + @classmethod + def create_invocation_completed( + cls, + event_id: int, + event_timestamp: datetime.datetime, + start_timestamp: float, + end_timestamp: float, + request_id: str, + ) -> Event: + """Create invocation completed event.""" + return cls( + event_type=EventType.INVOCATION_COMPLETED.value, + event_timestamp=event_timestamp, + event_id=event_id, + invocation_completed_details=InvocationCompletedDetails( + start_timestamp=start_timestamp, + end_timestamp=end_timestamp, + request_id=request_id, + ), + ) + + # endregion invocation_completed + @classmethod def create_event_started(cls, context: EventCreationContext) -> Event: """Convert operation to started event.""" diff --git a/tests/executor_test.py b/tests/executor_test.py index 51237b7..295248f 100644 --- a/tests/executor_test.py +++ b/tests/executor_test.py @@ -34,6 +34,7 @@ Execution, ) from aws_durable_execution_sdk_python_testing.executor import Executor +from aws_durable_execution_sdk_python_testing.invoker import InvokeResponse from aws_durable_execution_sdk_python_testing.model import ( ListDurableExecutionsResponse, SendDurableExecutionCallbackFailureResponse, @@ -285,7 +286,9 @@ def test_should_complete_workflow_with_error_when_invocation_fails( failed_response = DurableExecutionInvocationOutput( status=InvocationStatus.FAILED, error=ErrorObject.from_message("Test error") ) - mock_invoker.invoke.return_value = (failed_response, "test-request-id") + mock_invoker.invoke.return_value = InvokeResponse( + invocation_output=failed_response, request_id="test-request-id" + ) # Mock execution creation and store behavior with patch( @@ -329,7 +332,9 @@ def test_should_complete_workflow_with_result_when_invocation_succeeds( success_response = DurableExecutionInvocationOutput( status=InvocationStatus.SUCCEEDED, result="success result" ) - mock_invoker.invoke.return_value = (success_response, "test-request-id") + mock_invoker.invoke.return_value = InvokeResponse( + invocation_output=success_response, request_id="test-request-id" + ) # Mock execution creation and store behavior with patch( @@ -372,7 +377,9 @@ def test_should_handle_pending_status_when_operations_exist( mock_invocation_input = Mock() mock_invoker.create_invocation_input.return_value = mock_invocation_input pending_response = DurableExecutionInvocationOutput(status=InvocationStatus.PENDING) - mock_invoker.invoke.return_value = (pending_response, "test-request-id") + mock_invoker.invoke.return_value = InvokeResponse( + invocation_output=pending_response, request_id="test-request-id" + ) # Mock execution creation and store behavior with patch( @@ -453,7 +460,9 @@ def test_should_retry_when_response_has_no_status( mock_invocation_input = Mock() mock_invoker.create_invocation_input.return_value = mock_invocation_input no_status_response = DurableExecutionInvocationOutput(status=None) - mock_invoker.invoke.return_value = (no_status_response, "test-request-id") + mock_invoker.invoke.return_value = InvokeResponse( + invocation_output=no_status_response, request_id="test-request-id" + ) # Mock execution creation and store behavior with patch( @@ -496,7 +505,9 @@ def test_should_retry_when_failed_response_has_result( invalid_response = DurableExecutionInvocationOutput( status=InvocationStatus.FAILED, result="should not have result" ) - mock_invoker.invoke.return_value = (invalid_response, "test-request-id") + mock_invoker.invoke.return_value = InvokeResponse( + invocation_output=invalid_response, request_id="test-request-id" + ) # Mock execution creation and store behavior with patch( @@ -540,7 +551,9 @@ def test_should_retry_when_success_response_has_error( status=InvocationStatus.SUCCEEDED, error=ErrorObject.from_message("should not have error"), ) - mock_invoker.invoke.return_value = (invalid_response, "test-request-id") + mock_invoker.invoke.return_value = InvokeResponse( + invocation_output=invalid_response, request_id="test-request-id" + ) # Mock execution creation and store behavior with patch( @@ -582,7 +595,9 @@ def test_should_retry_when_pending_response_has_no_operations( mock_invocation_input = Mock() mock_invoker.create_invocation_input.return_value = mock_invocation_input pending_response = DurableExecutionInvocationOutput(status=InvocationStatus.PENDING) - mock_invoker.invoke.return_value = (pending_response, "test-request-id") + mock_invoker.invoke.return_value = InvokeResponse( + invocation_output=pending_response, request_id="test-request-id" + ) # Mock execution creation and store behavior with patch( @@ -623,7 +638,9 @@ def test_invoke_handler_success( mock_response = DurableExecutionInvocationOutput( status=InvocationStatus.SUCCEEDED, result="test" ) - mock_invoker.invoke.return_value = (mock_response, "test-request-id") + mock_invoker.invoke.return_value = InvokeResponse( + invocation_output=mock_response, request_id="test-request-id" + ) # Mock execution creation and store behavior with patch( @@ -695,7 +712,9 @@ def test_invoke_handler_execution_completed_during_invocation( mock_invocation_input = Mock() mock_invoker.create_invocation_input.return_value = mock_invocation_input mock_response = Mock() - mock_invoker.invoke.return_value = (mock_response, "test-request-id") + mock_invoker.invoke.return_value = InvokeResponse( + invocation_output=mock_response, request_id="test-request-id" + ) # Create a completed execution mock completed_execution = Mock() @@ -1039,8 +1058,12 @@ def test_should_retry_invocation_when_under_limit_through_public_api( status=InvocationStatus.SUCCEEDED, result="final success" ) mock_invoker.invoke.side_effect = [ - (invalid_response, "test-request-id-1"), - (success_response, "test-request-id-2"), + InvokeResponse( + invocation_output=invalid_response, request_id="test-request-id-1" + ), + InvokeResponse( + invocation_output=success_response, request_id="test-request-id-2" + ), ] # Mock execution creation and store behavior @@ -1439,7 +1462,9 @@ def test_should_retry_when_response_has_unexpected_status( mock_invoker.create_invocation_input.return_value = mock_invocation_input unexpected_response = Mock() unexpected_response.status = "UNKNOWN_STATUS" - mock_invoker.invoke.return_value = (unexpected_response, "test-request-id") + mock_invoker.invoke.return_value = InvokeResponse( + invocation_output=unexpected_response, request_id="test-request-id" + ) # Mock execution creation and store behavior with patch( @@ -1484,7 +1509,9 @@ def test_invoke_handler_execution_completed_during_invocation_async( mock_invocation_input = Mock() mock_invoker.create_invocation_input.return_value = mock_invocation_input mock_response = Mock() - mock_invoker.invoke.return_value = (mock_response, "test-request-id") + mock_invoker.invoke.return_value = InvokeResponse( + invocation_output=mock_response, request_id="test-request-id" + ) # Mock execution creation with patch( @@ -1570,7 +1597,9 @@ def test_invoke_handler_general_exception_async( success_response = DurableExecutionInvocationOutput( status=InvocationStatus.SUCCEEDED, result="success" ) - mock_invoker.invoke.return_value = (success_response, "test-request-id") + mock_invoker.invoke.return_value = InvokeResponse( + invocation_output=success_response, request_id="test-request-id" + ) # Mock execution creation and store behavior with patch( diff --git a/tests/invoker_test.py b/tests/invoker_test.py index dd942f4..09c62a6 100644 --- a/tests/invoker_test.py +++ b/tests/invoker_test.py @@ -87,12 +87,12 @@ def test_in_process_invoker_invoke(): initial_execution_state=InitialExecutionState(operations=[], next_marker=""), ) - result, request_id = invoker.invoke("test-function", input_data) + response = invoker.invoke("test-function", input_data) - assert isinstance(result, DurableExecutionInvocationOutput) - assert result.status == InvocationStatus.SUCCEEDED - assert result.result == "test-result" - assert isinstance(request_id, str) + assert isinstance(response.invocation_output, DurableExecutionInvocationOutput) + assert response.invocation_output.status == InvocationStatus.SUCCEEDED + assert response.invocation_output.result == "test-result" + assert isinstance(response.request_id, str) # Verify handler was called with correct arguments handler.assert_called_once() @@ -174,12 +174,12 @@ def test_lambda_invoker_invoke_success(): initial_execution_state=InitialExecutionState(operations=[], next_marker=""), ) - result, request_id = invoker.invoke("test-function", input_data) + response = invoker.invoke("test-function", input_data) - assert isinstance(result, DurableExecutionInvocationOutput) - assert result.status == InvocationStatus.SUCCEEDED - assert result.result == "lambda-result" - assert request_id == "test-request-id" + assert isinstance(response.invocation_output, DurableExecutionInvocationOutput) + assert response.invocation_output.status == InvocationStatus.SUCCEEDED + assert response.invocation_output.result == "lambda-result" + assert response.request_id == "test-request-id" # Verify lambda client was called correctly lambda_client.invoke.assert_called_once_with( @@ -240,11 +240,11 @@ def test_in_process_invoker_invoke_with_execution_operations(): execution.start() # This adds operations invocation_input = invoker.create_invocation_input(execution) - result, request_id = invoker.invoke("test-function", invocation_input) + response = invoker.invoke("test-function", invocation_input) - assert isinstance(result, DurableExecutionInvocationOutput) - assert isinstance(request_id, str) - assert result.status == InvocationStatus.SUCCEEDED + assert isinstance(response.invocation_output, DurableExecutionInvocationOutput) + assert isinstance(response.request_id, str) + assert response.invocation_output.status == InvocationStatus.SUCCEEDED assert len(invocation_input.initial_execution_state.operations) > 0 @@ -339,9 +339,9 @@ def test_lambda_invoker_invoke_status_202(): initial_execution_state=InitialExecutionState(operations=[], next_marker=""), ) - result, request_id = invoker.invoke("test-function", input_data) - assert isinstance(result, DurableExecutionInvocationOutput) - assert request_id == "test-request-id-202" + response = invoker.invoke("test-function", input_data) + assert isinstance(response.invocation_output, DurableExecutionInvocationOutput) + assert response.request_id == "test-request-id-202" def test_lambda_invoker_invoke_function_error(): From de37171aea427726d37e10c062718cd9f356985f Mon Sep 17 00:00:00 2001 From: Brent Champion Date: Mon, 1 Dec 2025 22:32:22 -0500 Subject: [PATCH 3/4] chore: use create_invocation_completed method --- src/aws_durable_execution_sdk_python_testing/executor.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/aws_durable_execution_sdk_python_testing/executor.py b/src/aws_durable_execution_sdk_python_testing/executor.py index 3d3c4f6..928f2e8 100644 --- a/src/aws_durable_execution_sdk_python_testing/executor.py +++ b/src/aws_durable_execution_sdk_python_testing/executor.py @@ -417,13 +417,14 @@ def get_execution_history( # Add InvocationCompleted events for completion in execution.invocation_completions: - invocation_event = HistoryEvent( + invocation_event = HistoryEvent.create_invocation_completed( event_id=0, # Temporary, will be reassigned - event_type=EventType.INVOCATION_COMPLETED.value, event_timestamp=datetime.fromtimestamp( completion.end_timestamp, tz=UTC ), - invocation_completed_details=completion, + start_timestamp=completion.start_timestamp, + end_timestamp=completion.end_timestamp, + request_id=completion.request_id, ) all_events.append(invocation_event) From 3744185fbcf6c6edadb9f107b18f15ee1a3023f6 Mon Sep 17 00:00:00 2001 From: Brent Champion Date: Tue, 2 Dec 2025 00:22:10 -0500 Subject: [PATCH 4/4] fix: use datetime for InvocationCompletedDetails timestamps --- src/aws_durable_execution_sdk_python_testing/execution.py | 2 +- src/aws_durable_execution_sdk_python_testing/executor.py | 8 +++----- src/aws_durable_execution_sdk_python_testing/model.py | 8 ++++---- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/src/aws_durable_execution_sdk_python_testing/execution.py b/src/aws_durable_execution_sdk_python_testing/execution.py index c13a14d..c6be55f 100644 --- a/src/aws_durable_execution_sdk_python_testing/execution.py +++ b/src/aws_durable_execution_sdk_python_testing/execution.py @@ -225,7 +225,7 @@ def has_pending_operations(self, execution: Execution) -> bool: return False def record_invocation_completion( - self, start_timestamp: float, end_timestamp: float, request_id: str + self, start_timestamp: datetime, end_timestamp: datetime, request_id: str ) -> None: """Record an invocation completion event.""" self.invocation_completions.append( diff --git a/src/aws_durable_execution_sdk_python_testing/executor.py b/src/aws_durable_execution_sdk_python_testing/executor.py index 928f2e8..70bcfa5 100644 --- a/src/aws_durable_execution_sdk_python_testing/executor.py +++ b/src/aws_durable_execution_sdk_python_testing/executor.py @@ -419,9 +419,7 @@ def get_execution_history( for completion in execution.invocation_completions: invocation_event = HistoryEvent.create_invocation_completed( event_id=0, # Temporary, will be reassigned - event_timestamp=datetime.fromtimestamp( - completion.end_timestamp, tz=UTC - ), + event_timestamp=completion.end_timestamp, start_timestamp=completion.start_timestamp, end_timestamp=completion.end_timestamp, request_id=completion.request_id, @@ -784,13 +782,13 @@ async def invoke() -> None: self._store.save(execution) - invocation_start = time.time() + invocation_start = datetime.now(UTC) invoke_response = self._invoker.invoke( execution.start_input.function_name, invocation_input, execution.start_input.lambda_endpoint, ) - invocation_end = time.time() + invocation_end = datetime.now(UTC) # Reload execution after invocation in case it was completed via checkpoint execution = self._store.load(execution_arn) diff --git a/src/aws_durable_execution_sdk_python_testing/model.py b/src/aws_durable_execution_sdk_python_testing/model.py index 7d482a3..0353870 100644 --- a/src/aws_durable_execution_sdk_python_testing/model.py +++ b/src/aws_durable_execution_sdk_python_testing/model.py @@ -1227,8 +1227,8 @@ def to_dict(self) -> dict[str, Any]: class InvocationCompletedDetails: """Invocation completed event details.""" - start_timestamp: float - end_timestamp: float + start_timestamp: datetime.datetime + end_timestamp: datetime.datetime request_id: str @classmethod @@ -2261,8 +2261,8 @@ def create_invocation_completed( cls, event_id: int, event_timestamp: datetime.datetime, - start_timestamp: float, - end_timestamp: float, + start_timestamp: datetime.datetime, + end_timestamp: datetime.datetime, request_id: str, ) -> Event: """Create invocation completed event."""