Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 32 additions & 12 deletions src/aws_durable_execution_sdk_python_testing/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from aws_durable_execution_sdk_python_testing.execution import Execution
from aws_durable_execution_sdk_python_testing.model import (
CheckpointDurableExecutionResponse,
CheckpointUpdatedExecutionState,
GetDurableExecutionHistoryResponse,
GetDurableExecutionResponse,
GetDurableExecutionStateResponse,
Expand All @@ -47,6 +48,9 @@
if TYPE_CHECKING:
from collections.abc import Awaitable, Callable

from aws_durable_execution_sdk_python_testing.checkpoint.processor import (
CheckpointProcessor,
)
from aws_durable_execution_sdk_python_testing.invoker import Invoker
from aws_durable_execution_sdk_python_testing.scheduler import Event, Scheduler
from aws_durable_execution_sdk_python_testing.stores.base import ExecutionStore
Expand All @@ -58,10 +62,17 @@ class Executor(ExecutionObserver):
MAX_CONSECUTIVE_FAILED_ATTEMPTS = 5
RETRY_BACKOFF_SECONDS = 5

def __init__(self, store: ExecutionStore, scheduler: Scheduler, invoker: Invoker):
def __init__(
self,
store: ExecutionStore,
scheduler: Scheduler,
invoker: Invoker,
checkpoint_processor: CheckpointProcessor,
):
self._store = store
self._scheduler = scheduler
self._invoker = invoker
self._checkpoint_processor = checkpoint_processor
self._completion_events: dict[str, Event] = {}

def start_execution(
Expand Down Expand Up @@ -464,8 +475,8 @@ def checkpoint_execution(
self,
execution_arn: str,
checkpoint_token: str,
updates: list[OperationUpdate] | None = None, # noqa: ARG002
client_token: str | None = None, # noqa: ARG002
updates: list[OperationUpdate] | None = None,
client_token: str | None = None,
) -> CheckpointDurableExecutionResponse:
"""Process checkpoint for an execution.

Expand All @@ -489,19 +500,28 @@ def checkpoint_execution(
msg: str = f"Invalid checkpoint token: {checkpoint_token}"
raise InvalidParameterValueException(msg)

# TODO: Process operation updates using the checkpoint processor
# This would integrate with the existing checkpoint processing pipeline
if updates:
checkpoint_output = self._checkpoint_processor.process_checkpoint(
checkpoint_token=checkpoint_token,
updates=updates,
client_token=client_token,
)

# Generate new checkpoint token
new_checkpoint_token = execution.get_new_checkpoint_token()
new_execution_state = None
if checkpoint_output.new_execution_state:
new_execution_state = CheckpointUpdatedExecutionState(
operations=checkpoint_output.new_execution_state.operations,
next_marker=checkpoint_output.new_execution_state.next_marker,
)

# Get current execution state - for now return None (simplified implementation)
# In a full implementation, this would return CheckpointUpdatedExecutionState with operations
new_execution_state = None
return CheckpointDurableExecutionResponse(
checkpoint_token=checkpoint_output.checkpoint_token,
new_execution_state=new_execution_state,
)

return CheckpointDurableExecutionResponse(
checkpoint_token=new_checkpoint_token,
new_execution_state=new_execution_state,
checkpoint_token=execution.get_new_checkpoint_token(),
new_execution_state=None,
)

def send_callback_success(
Expand Down
18 changes: 15 additions & 3 deletions src/aws_durable_execution_sdk_python_testing/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,10 @@ def __init__(self, handler: Callable):
self._service_client = InMemoryServiceClient(self._checkpoint_processor)
self._invoker = InProcessInvoker(handler, self._service_client)
self._executor = Executor(
store=self._store, scheduler=self._scheduler, invoker=self._invoker
store=self._store,
scheduler=self._scheduler,
invoker=self._invoker,
checkpoint_processor=self._checkpoint_processor,
)

# Wire up observer pattern - CheckpointProcessor uses this to notify executor of state changes
Expand Down Expand Up @@ -631,11 +634,20 @@ def start(self) -> None:
self._scheduler = Scheduler()
self._invoker = LambdaInvoker(self._create_boto3_client())

# Create executor with all dependencies
# Create shared CheckpointProcessor
checkpoint_processor = CheckpointProcessor(self._store, self._scheduler)

# Create executor with all dependencies including checkpoint processor
self._executor = Executor(
store=self._store, scheduler=self._scheduler, invoker=self._invoker
store=self._store,
scheduler=self._scheduler,
invoker=self._invoker,
checkpoint_processor=checkpoint_processor,
)

# Add executor as observer to the checkpoint processor
checkpoint_processor.add_execution_observer(self._executor)

# Start the scheduler
self._scheduler.start()

Expand Down
15 changes: 11 additions & 4 deletions tests/executor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,13 @@ def mock_invoker():


@pytest.fixture
def executor(mock_store, mock_scheduler, mock_invoker):
return Executor(mock_store, mock_scheduler, mock_invoker)
def mock_checkpoint_processor():
return Mock()


@pytest.fixture
def executor(mock_store, mock_scheduler, mock_invoker, mock_checkpoint_processor):
return Executor(mock_store, mock_scheduler, mock_invoker, mock_checkpoint_processor)


@pytest.fixture
Expand All @@ -117,10 +122,12 @@ def mock_execution():
return execution


def test_init(mock_store, mock_scheduler, mock_invoker):
def test_init(mock_store, mock_scheduler, mock_invoker, mock_checkpoint_processor):
# Test that Executor can be constructed with dependencies
# Dependency injection is implementation detail - test behavior instead
executor = Executor(mock_store, mock_scheduler, mock_invoker)
executor = Executor(
mock_store, mock_scheduler, mock_invoker, mock_checkpoint_processor
)

# Verify executor is properly initialized by testing it can perform basic operations
assert executor is not None
Expand Down
19 changes: 13 additions & 6 deletions tests/runner_web_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,9 +687,13 @@ def test_should_create_all_required_dependencies_during_start():
mock_store_class.assert_called_once()
mock_scheduler_class.assert_called_once()
mock_invoker_class.assert_called_once_with(mock_client)
mock_executor_class.assert_called_once_with(
store=mock_store, scheduler=mock_scheduler, invoker=mock_invoker
)
# Verify Executor was called with the expected parameters including checkpoint_processor
assert mock_executor_class.call_count == 1
call_args = mock_executor_class.call_args
assert call_args.kwargs["store"] == mock_store
assert call_args.kwargs["scheduler"] == mock_scheduler
assert call_args.kwargs["invoker"] == mock_invoker
assert "checkpoint_processor" in call_args.kwargs
mock_web_server_class.assert_called_once_with(
config=web_config, executor=mock_executor
)
Expand Down Expand Up @@ -825,9 +829,12 @@ def test_should_wire_dependencies_correctly_in_executor():
runner.start()

# Assert - Verify Executor was created with correct dependencies
mock_executor_class.assert_called_once_with(
store=mock_store, scheduler=mock_scheduler, invoker=mock_invoker
)
assert mock_executor_class.call_count == 1
call_args = mock_executor_class.call_args
assert call_args.kwargs["store"] == mock_store
assert call_args.kwargs["scheduler"] == mock_scheduler
assert call_args.kwargs["invoker"] == mock_invoker
assert "checkpoint_processor" in call_args.kwargs

# Cleanup
runner.stop()
Expand Down
Loading