Skip to content

Commit 18e8e4e

Browse files
authored
feat: integrate checkpoint processor for web svc
* feat: integrate checkpoint processor in execution response * inline new execution token
1 parent da497a0 commit 18e8e4e

File tree

4 files changed

+71
-25
lines changed

4 files changed

+71
-25
lines changed

src/aws_durable_execution_sdk_python_testing/executor.py

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from aws_durable_execution_sdk_python_testing.execution import Execution
2424
from aws_durable_execution_sdk_python_testing.model import (
2525
CheckpointDurableExecutionResponse,
26+
CheckpointUpdatedExecutionState,
2627
GetDurableExecutionHistoryResponse,
2728
GetDurableExecutionResponse,
2829
GetDurableExecutionStateResponse,
@@ -47,6 +48,9 @@
4748
if TYPE_CHECKING:
4849
from collections.abc import Awaitable, Callable
4950

51+
from aws_durable_execution_sdk_python_testing.checkpoint.processor import (
52+
CheckpointProcessor,
53+
)
5054
from aws_durable_execution_sdk_python_testing.invoker import Invoker
5155
from aws_durable_execution_sdk_python_testing.scheduler import Event, Scheduler
5256
from aws_durable_execution_sdk_python_testing.stores.base import ExecutionStore
@@ -58,10 +62,17 @@ class Executor(ExecutionObserver):
5862
MAX_CONSECUTIVE_FAILED_ATTEMPTS = 5
5963
RETRY_BACKOFF_SECONDS = 5
6064

61-
def __init__(self, store: ExecutionStore, scheduler: Scheduler, invoker: Invoker):
65+
def __init__(
66+
self,
67+
store: ExecutionStore,
68+
scheduler: Scheduler,
69+
invoker: Invoker,
70+
checkpoint_processor: CheckpointProcessor,
71+
):
6272
self._store = store
6373
self._scheduler = scheduler
6474
self._invoker = invoker
75+
self._checkpoint_processor = checkpoint_processor
6576
self._completion_events: dict[str, Event] = {}
6677

6778
def start_execution(
@@ -464,8 +475,8 @@ def checkpoint_execution(
464475
self,
465476
execution_arn: str,
466477
checkpoint_token: str,
467-
updates: list[OperationUpdate] | None = None, # noqa: ARG002
468-
client_token: str | None = None, # noqa: ARG002
478+
updates: list[OperationUpdate] | None = None,
479+
client_token: str | None = None,
469480
) -> CheckpointDurableExecutionResponse:
470481
"""Process checkpoint for an execution.
471482
@@ -489,19 +500,28 @@ def checkpoint_execution(
489500
msg: str = f"Invalid checkpoint token: {checkpoint_token}"
490501
raise InvalidParameterValueException(msg)
491502

492-
# TODO: Process operation updates using the checkpoint processor
493-
# This would integrate with the existing checkpoint processing pipeline
503+
if updates:
504+
checkpoint_output = self._checkpoint_processor.process_checkpoint(
505+
checkpoint_token=checkpoint_token,
506+
updates=updates,
507+
client_token=client_token,
508+
)
494509

495-
# Generate new checkpoint token
496-
new_checkpoint_token = execution.get_new_checkpoint_token()
510+
new_execution_state = None
511+
if checkpoint_output.new_execution_state:
512+
new_execution_state = CheckpointUpdatedExecutionState(
513+
operations=checkpoint_output.new_execution_state.operations,
514+
next_marker=checkpoint_output.new_execution_state.next_marker,
515+
)
497516

498-
# Get current execution state - for now return None (simplified implementation)
499-
# In a full implementation, this would return CheckpointUpdatedExecutionState with operations
500-
new_execution_state = None
517+
return CheckpointDurableExecutionResponse(
518+
checkpoint_token=checkpoint_output.checkpoint_token,
519+
new_execution_state=new_execution_state,
520+
)
501521

502522
return CheckpointDurableExecutionResponse(
503-
checkpoint_token=new_checkpoint_token,
504-
new_execution_state=new_execution_state,
523+
checkpoint_token=execution.get_new_checkpoint_token(),
524+
new_execution_state=None,
505525
)
506526

507527
def send_callback_success(

src/aws_durable_execution_sdk_python_testing/runner.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -500,7 +500,10 @@ def __init__(self, handler: Callable):
500500
self._service_client = InMemoryServiceClient(self._checkpoint_processor)
501501
self._invoker = InProcessInvoker(handler, self._service_client)
502502
self._executor = Executor(
503-
store=self._store, scheduler=self._scheduler, invoker=self._invoker
503+
store=self._store,
504+
scheduler=self._scheduler,
505+
invoker=self._invoker,
506+
checkpoint_processor=self._checkpoint_processor,
504507
)
505508

506509
# Wire up observer pattern - CheckpointProcessor uses this to notify executor of state changes
@@ -631,11 +634,20 @@ def start(self) -> None:
631634
self._scheduler = Scheduler()
632635
self._invoker = LambdaInvoker(self._create_boto3_client())
633636

634-
# Create executor with all dependencies
637+
# Create shared CheckpointProcessor
638+
checkpoint_processor = CheckpointProcessor(self._store, self._scheduler)
639+
640+
# Create executor with all dependencies including checkpoint processor
635641
self._executor = Executor(
636-
store=self._store, scheduler=self._scheduler, invoker=self._invoker
642+
store=self._store,
643+
scheduler=self._scheduler,
644+
invoker=self._invoker,
645+
checkpoint_processor=checkpoint_processor,
637646
)
638647

648+
# Add executor as observer to the checkpoint processor
649+
checkpoint_processor.add_execution_observer(self._executor)
650+
639651
# Start the scheduler
640652
self._scheduler.start()
641653

tests/executor_test.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,13 @@ def mock_invoker():
9090

9191

9292
@pytest.fixture
93-
def executor(mock_store, mock_scheduler, mock_invoker):
94-
return Executor(mock_store, mock_scheduler, mock_invoker)
93+
def mock_checkpoint_processor():
94+
return Mock()
95+
96+
97+
@pytest.fixture
98+
def executor(mock_store, mock_scheduler, mock_invoker, mock_checkpoint_processor):
99+
return Executor(mock_store, mock_scheduler, mock_invoker, mock_checkpoint_processor)
95100

96101

97102
@pytest.fixture
@@ -117,10 +122,12 @@ def mock_execution():
117122
return execution
118123

119124

120-
def test_init(mock_store, mock_scheduler, mock_invoker):
125+
def test_init(mock_store, mock_scheduler, mock_invoker, mock_checkpoint_processor):
121126
# Test that Executor can be constructed with dependencies
122127
# Dependency injection is implementation detail - test behavior instead
123-
executor = Executor(mock_store, mock_scheduler, mock_invoker)
128+
executor = Executor(
129+
mock_store, mock_scheduler, mock_invoker, mock_checkpoint_processor
130+
)
124131

125132
# Verify executor is properly initialized by testing it can perform basic operations
126133
assert executor is not None

tests/runner_web_test.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -687,9 +687,13 @@ def test_should_create_all_required_dependencies_during_start():
687687
mock_store_class.assert_called_once()
688688
mock_scheduler_class.assert_called_once()
689689
mock_invoker_class.assert_called_once_with(mock_client)
690-
mock_executor_class.assert_called_once_with(
691-
store=mock_store, scheduler=mock_scheduler, invoker=mock_invoker
692-
)
690+
# Verify Executor was called with the expected parameters including checkpoint_processor
691+
assert mock_executor_class.call_count == 1
692+
call_args = mock_executor_class.call_args
693+
assert call_args.kwargs["store"] == mock_store
694+
assert call_args.kwargs["scheduler"] == mock_scheduler
695+
assert call_args.kwargs["invoker"] == mock_invoker
696+
assert "checkpoint_processor" in call_args.kwargs
693697
mock_web_server_class.assert_called_once_with(
694698
config=web_config, executor=mock_executor
695699
)
@@ -825,9 +829,12 @@ def test_should_wire_dependencies_correctly_in_executor():
825829
runner.start()
826830

827831
# Assert - Verify Executor was created with correct dependencies
828-
mock_executor_class.assert_called_once_with(
829-
store=mock_store, scheduler=mock_scheduler, invoker=mock_invoker
830-
)
832+
assert mock_executor_class.call_count == 1
833+
call_args = mock_executor_class.call_args
834+
assert call_args.kwargs["store"] == mock_store
835+
assert call_args.kwargs["scheduler"] == mock_scheduler
836+
assert call_args.kwargs["invoker"] == mock_invoker
837+
assert "checkpoint_processor" in call_args.kwargs
831838

832839
# Cleanup
833840
runner.stop()

0 commit comments

Comments
 (0)