From c3927b707378a4ae8449228b3f97fa096e0d1c14 Mon Sep 17 00:00:00 2001 From: Brent Champion Date: Fri, 7 Nov 2025 20:40:05 -0500 Subject: [PATCH 1/2] feat: integrate checkpoint processor in execution response --- .../executor.py | 47 ++++++++++++++----- .../runner.py | 18 +++++-- tests/executor_test.py | 15 ++++-- tests/runner_web_test.py | 19 +++++--- 4 files changed, 75 insertions(+), 24 deletions(-) diff --git a/src/aws_durable_execution_sdk_python_testing/executor.py b/src/aws_durable_execution_sdk_python_testing/executor.py index ed015b1..f489fec 100644 --- a/src/aws_durable_execution_sdk_python_testing/executor.py +++ b/src/aws_durable_execution_sdk_python_testing/executor.py @@ -23,6 +23,7 @@ from aws_durable_execution_sdk_python_testing.execution import Execution from aws_durable_execution_sdk_python_testing.model import ( CheckpointDurableExecutionResponse, + CheckpointUpdatedExecutionState, GetDurableExecutionHistoryResponse, GetDurableExecutionResponse, GetDurableExecutionStateResponse, @@ -47,6 +48,9 @@ if TYPE_CHECKING: from collections.abc import Awaitable, Callable + from aws_durable_execution_sdk_python_testing.checkpoint.processor import ( + CheckpointProcessor, + ) from aws_durable_execution_sdk_python_testing.invoker import Invoker from aws_durable_execution_sdk_python_testing.scheduler import Event, Scheduler from aws_durable_execution_sdk_python_testing.stores.base import ExecutionStore @@ -58,10 +62,17 @@ class Executor(ExecutionObserver): MAX_CONSECUTIVE_FAILED_ATTEMPTS = 5 RETRY_BACKOFF_SECONDS = 5 - def __init__(self, store: ExecutionStore, scheduler: Scheduler, invoker: Invoker): + def __init__( + self, + store: ExecutionStore, + scheduler: Scheduler, + invoker: Invoker, + checkpoint_processor: CheckpointProcessor, + ): self._store = store self._scheduler = scheduler self._invoker = invoker + self._checkpoint_processor = checkpoint_processor self._completion_events: dict[str, Event] = {} def start_execution( @@ -464,8 +475,8 @@ def checkpoint_execution( self, execution_arn: str, checkpoint_token: str, - updates: list[OperationUpdate] | None = None, # noqa: ARG002 - client_token: str | None = None, # noqa: ARG002 + updates: list[OperationUpdate] | None = None, + client_token: str | None = None, ) -> CheckpointDurableExecutionResponse: """Process checkpoint for an execution. @@ -489,19 +500,33 @@ def checkpoint_execution( msg: str = f"Invalid checkpoint token: {checkpoint_token}" raise InvalidParameterValueException(msg) - # TODO: Process operation updates using the checkpoint processor - # This would integrate with the existing checkpoint processing pipeline + # Process operation updates using the checkpoint processor + if updates: + checkpoint_output = self._checkpoint_processor.process_checkpoint( + checkpoint_token=checkpoint_token, + updates=updates, + client_token=client_token, + ) - # Generate new checkpoint token - new_checkpoint_token = execution.get_new_checkpoint_token() + # Convert SDK CheckpointUpdatedExecutionState to testing library version + new_execution_state = None + if checkpoint_output.new_execution_state: + new_execution_state = CheckpointUpdatedExecutionState( + operations=checkpoint_output.new_execution_state.operations, + next_marker=checkpoint_output.new_execution_state.next_marker, + ) + + return CheckpointDurableExecutionResponse( + checkpoint_token=checkpoint_output.checkpoint_token, + new_execution_state=new_execution_state, + ) - # Get current execution state - for now return None (simplified implementation) - # In a full implementation, this would return CheckpointUpdatedExecutionState with operations - new_execution_state = None + # Generate new checkpoint token for case with no updates + new_checkpoint_token = execution.get_new_checkpoint_token() return CheckpointDurableExecutionResponse( checkpoint_token=new_checkpoint_token, - new_execution_state=new_execution_state, + new_execution_state=None, ) def send_callback_success( diff --git a/src/aws_durable_execution_sdk_python_testing/runner.py b/src/aws_durable_execution_sdk_python_testing/runner.py index b93badd..74f6186 100644 --- a/src/aws_durable_execution_sdk_python_testing/runner.py +++ b/src/aws_durable_execution_sdk_python_testing/runner.py @@ -500,7 +500,10 @@ def __init__(self, handler: Callable): self._service_client = InMemoryServiceClient(self._checkpoint_processor) self._invoker = InProcessInvoker(handler, self._service_client) self._executor = Executor( - store=self._store, scheduler=self._scheduler, invoker=self._invoker + store=self._store, + scheduler=self._scheduler, + invoker=self._invoker, + checkpoint_processor=self._checkpoint_processor, ) # Wire up observer pattern - CheckpointProcessor uses this to notify executor of state changes @@ -631,11 +634,20 @@ def start(self) -> None: self._scheduler = Scheduler() self._invoker = LambdaInvoker(self._create_boto3_client()) - # Create executor with all dependencies + # Create shared CheckpointProcessor + checkpoint_processor = CheckpointProcessor(self._store, self._scheduler) + + # Create executor with all dependencies including checkpoint processor self._executor = Executor( - store=self._store, scheduler=self._scheduler, invoker=self._invoker + store=self._store, + scheduler=self._scheduler, + invoker=self._invoker, + checkpoint_processor=checkpoint_processor, ) + # Add executor as observer to the checkpoint processor + checkpoint_processor.add_execution_observer(self._executor) + # Start the scheduler self._scheduler.start() diff --git a/tests/executor_test.py b/tests/executor_test.py index 8fdde25..75d6de1 100644 --- a/tests/executor_test.py +++ b/tests/executor_test.py @@ -90,8 +90,13 @@ def mock_invoker(): @pytest.fixture -def executor(mock_store, mock_scheduler, mock_invoker): - return Executor(mock_store, mock_scheduler, mock_invoker) +def mock_checkpoint_processor(): + return Mock() + + +@pytest.fixture +def executor(mock_store, mock_scheduler, mock_invoker, mock_checkpoint_processor): + return Executor(mock_store, mock_scheduler, mock_invoker, mock_checkpoint_processor) @pytest.fixture @@ -117,10 +122,12 @@ def mock_execution(): return execution -def test_init(mock_store, mock_scheduler, mock_invoker): +def test_init(mock_store, mock_scheduler, mock_invoker, mock_checkpoint_processor): # Test that Executor can be constructed with dependencies # Dependency injection is implementation detail - test behavior instead - executor = Executor(mock_store, mock_scheduler, mock_invoker) + executor = Executor( + mock_store, mock_scheduler, mock_invoker, mock_checkpoint_processor + ) # Verify executor is properly initialized by testing it can perform basic operations assert executor is not None diff --git a/tests/runner_web_test.py b/tests/runner_web_test.py index 6174091..711fc47 100644 --- a/tests/runner_web_test.py +++ b/tests/runner_web_test.py @@ -687,9 +687,13 @@ def test_should_create_all_required_dependencies_during_start(): mock_store_class.assert_called_once() mock_scheduler_class.assert_called_once() mock_invoker_class.assert_called_once_with(mock_client) - mock_executor_class.assert_called_once_with( - store=mock_store, scheduler=mock_scheduler, invoker=mock_invoker - ) + # Verify Executor was called with the expected parameters including checkpoint_processor + assert mock_executor_class.call_count == 1 + call_args = mock_executor_class.call_args + assert call_args.kwargs["store"] == mock_store + assert call_args.kwargs["scheduler"] == mock_scheduler + assert call_args.kwargs["invoker"] == mock_invoker + assert "checkpoint_processor" in call_args.kwargs mock_web_server_class.assert_called_once_with( config=web_config, executor=mock_executor ) @@ -825,9 +829,12 @@ def test_should_wire_dependencies_correctly_in_executor(): runner.start() # Assert - Verify Executor was created with correct dependencies - mock_executor_class.assert_called_once_with( - store=mock_store, scheduler=mock_scheduler, invoker=mock_invoker - ) + assert mock_executor_class.call_count == 1 + call_args = mock_executor_class.call_args + assert call_args.kwargs["store"] == mock_store + assert call_args.kwargs["scheduler"] == mock_scheduler + assert call_args.kwargs["invoker"] == mock_invoker + assert "checkpoint_processor" in call_args.kwargs # Cleanup runner.stop() From 8e4ef3993b5d22e239fe2cb03a46a382a9954ac1 Mon Sep 17 00:00:00 2001 From: Brent Champion Date: Sun, 9 Nov 2025 17:28:04 -0500 Subject: [PATCH 2/2] inline new execution token --- src/aws_durable_execution_sdk_python_testing/executor.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/aws_durable_execution_sdk_python_testing/executor.py b/src/aws_durable_execution_sdk_python_testing/executor.py index f489fec..76d614d 100644 --- a/src/aws_durable_execution_sdk_python_testing/executor.py +++ b/src/aws_durable_execution_sdk_python_testing/executor.py @@ -500,7 +500,6 @@ def checkpoint_execution( msg: str = f"Invalid checkpoint token: {checkpoint_token}" raise InvalidParameterValueException(msg) - # Process operation updates using the checkpoint processor if updates: checkpoint_output = self._checkpoint_processor.process_checkpoint( checkpoint_token=checkpoint_token, @@ -508,7 +507,6 @@ def checkpoint_execution( client_token=client_token, ) - # Convert SDK CheckpointUpdatedExecutionState to testing library version new_execution_state = None if checkpoint_output.new_execution_state: new_execution_state = CheckpointUpdatedExecutionState( @@ -521,11 +519,8 @@ def checkpoint_execution( new_execution_state=new_execution_state, ) - # Generate new checkpoint token for case with no updates - new_checkpoint_token = execution.get_new_checkpoint_token() - return CheckpointDurableExecutionResponse( - checkpoint_token=new_checkpoint_token, + checkpoint_token=execution.get_new_checkpoint_token(), new_execution_state=None, )