From a24c5fb20e1fc52393be5b439c169149d1e940c8 Mon Sep 17 00:00:00 2001 From: Rares Polenciuc Date: Thu, 20 Nov 2025 14:09:45 +0000 Subject: [PATCH 1/2] fix: callback timeout handling --- .../checkpoint/processors/callback.py | 18 ++- .../execution.py | 26 ++++ .../executor.py | 76 ++++------- .../observer.py | 18 ++- tests/executor_test.py | 126 +++++------------ tests/how-to-run-from-term.txt | 1 + tests/observer_test.py | 10 +- tests/pending_operation_test.py | 129 ++++++++++++++++++ 8 files changed, 255 insertions(+), 149 deletions(-) create mode 100644 tests/how-to-run-from-term.txt create mode 100644 tests/pending_operation_test.py diff --git a/src/aws_durable_execution_sdk_python_testing/checkpoint/processors/callback.py b/src/aws_durable_execution_sdk_python_testing/checkpoint/processors/callback.py index c1a0ec7..48c2f01 100644 --- a/src/aws_durable_execution_sdk_python_testing/checkpoint/processors/callback.py +++ b/src/aws_durable_execution_sdk_python_testing/checkpoint/processors/callback.py @@ -12,6 +12,7 @@ OperationUpdate, CallbackDetails, OperationType, + CallbackOptions, ) from aws_durable_execution_sdk_python_testing.checkpoint.processors.base import ( OperationProcessor, @@ -43,12 +44,6 @@ def process( operation_id=update.operation_id, ) - notifier.notify_callback_created( - execution_arn=execution_arn, - operation_id=update.operation_id, - callback_token=callback_token, - ) - callback_id: str = callback_token.to_str() callback_details: CallbackDetails | None = ( @@ -60,11 +55,15 @@ def process( if update.operation_type == OperationType.CALLBACK else None ) + status: OperationStatus = OperationStatus.STARTED + start_time: datetime.datetime | None = self._get_start_time(current_op) + end_time: datetime.datetime | None = self._get_end_time( current_op, status ) + operation: Operation = Operation( operation_id=update.operation_id, parent_id=update.parent_id, @@ -76,7 +75,14 @@ def process( sub_type=update.sub_type, callback_details=callback_details, ) + callback_options: CallbackOptions | None = update.callback_options + notifier.notify_callback_created( + execution_arn=execution_arn, + operation_id=update.operation_id, + callback_options=callback_options, + callback_token=callback_token, + ) return operation case _: msg: str = "Invalid action for CALLBACK operation." diff --git a/src/aws_durable_execution_sdk_python_testing/execution.py b/src/aws_durable_execution_sdk_python_testing/execution.py index 8583efb..113db5b 100644 --- a/src/aws_durable_execution_sdk_python_testing/execution.py +++ b/src/aws_durable_execution_sdk_python_testing/execution.py @@ -381,6 +381,32 @@ def complete_callback_failure( ) return self.operations[index] + def complete_callback_timeout( + self, callback_id: str, error: ErrorObject + ) -> Operation: + """Complete CALLBACK operation with timeout.""" + index, operation = self.find_callback_operation(callback_id) + + if operation.status != OperationStatus.STARTED: + msg: str = f"Callback operation [{callback_id}] is not in STARTED state" + raise IllegalStateException(msg) + + with self._state_lock: + self._token_sequence += 1 + updated_callback_details = None + if operation.callback_details: + updated_callback_details = replace( + operation.callback_details, error=error + ) + + self.operations[index] = replace( + operation, + status=OperationStatus.TIMED_OUT, + end_timestamp=datetime.now(UTC), + callback_details=updated_callback_details, + ) + return self.operations[index] + def _end_execution(self, status: OperationStatus) -> None: """Set the end_timestamp on the main EXECUTION operation when execution completes.""" execution_op: Operation = self.get_operation_execution_started() diff --git a/src/aws_durable_execution_sdk_python_testing/executor.py b/src/aws_durable_execution_sdk_python_testing/executor.py index 3c398f6..e707366 100644 --- a/src/aws_durable_execution_sdk_python_testing/executor.py +++ b/src/aws_durable_execution_sdk_python_testing/executor.py @@ -19,6 +19,7 @@ OperationUpdate, OperationStatus, OperationType, + CallbackOptions, ) from aws_durable_execution_sdk_python_testing.exceptions import ( @@ -57,6 +58,7 @@ if TYPE_CHECKING: from collections.abc import Awaitable, Callable + from concurrent.futures import Future from aws_durable_execution_sdk_python_testing.checkpoint.processor import ( CheckpointProcessor, @@ -84,10 +86,8 @@ def __init__( self._invoker = invoker self._checkpoint_processor = checkpoint_processor self._completion_events: dict[str, Event] = {} - self._callback_timeouts: dict[str, Event] = {} # callback_id -> timeout event - self._callback_heartbeats: dict[ - str, Event - ] = {} # callback_id -> heartbeat event + self._callback_timeouts: dict[str, Future] = {} + self._callback_heartbeats: dict[str, Future] = {} def start_execution( self, @@ -1011,7 +1011,11 @@ def retry_handler() -> None: ) def on_callback_created( - self, execution_arn: str, operation_id: str, callback_token: CallbackToken + self, + execution_arn: str, + operation_id: str, + callback_options: CallbackOptions | None, + callback_token: CallbackToken, ) -> None: """Handle callback creation. Observer method triggered by notifier.""" callback_id = callback_token.to_str() @@ -1023,34 +1027,19 @@ def on_callback_created( ) # Schedule callback timeouts if configured - self._schedule_callback_timeouts(execution_arn, operation_id, callback_id) + self._schedule_callback_timeouts(execution_arn, callback_options, callback_id) # endregion ExecutionObserver # region Callback Timeouts def _schedule_callback_timeouts( - self, execution_arn: str, operation_id: str, callback_id: str + self, + execution_arn: str, + callback_options: CallbackOptions | None, + callback_id: str, ) -> None: """Schedule callback timeout and heartbeat timeout if configured.""" try: - execution = self.get_execution(execution_arn) - _, operation = execution.find_operation(operation_id) - - if not operation.callback_details: - return - - # Find the callback options from the operation update that created this callback - # We need to look at the checkpoint updates to find the original callback options - callback_options = None - for update in execution.updates: - if ( - update.operation_id == operation_id - and update.callback_options - and update.action.value == "START" - ): - callback_options = update.callback_options - break - if not callback_options: return @@ -1062,13 +1051,12 @@ def _schedule_callback_timeouts( def timeout_handler(): self._on_callback_timeout(execution_arn, callback_id) - timeout_event = self._scheduler.create_event() - self._callback_timeouts[callback_id] = timeout_event - self._scheduler.call_later( + timeout_future = self._scheduler.call_later( timeout_handler, delay=callback_options.timeout_seconds, completion_event=completion_event, ) + self._callback_timeouts[callback_id] = timeout_future # Schedule heartbeat timeout if configured if callback_options.heartbeat_timeout_seconds > 0: @@ -1076,13 +1064,12 @@ def timeout_handler(): def heartbeat_timeout_handler(): self._on_callback_heartbeat_timeout(execution_arn, callback_id) - heartbeat_event = self._scheduler.create_event() - self._callback_heartbeats[callback_id] = heartbeat_event - self._scheduler.call_later( + heartbeat_future = self._scheduler.call_later( heartbeat_timeout_handler, delay=callback_options.heartbeat_timeout_seconds, completion_event=completion_event, ) + self._callback_heartbeats[callback_id] = heartbeat_future except Exception: logger.exception( @@ -1096,16 +1083,14 @@ def _reset_callback_heartbeat_timeout( ) -> None: """Reset the heartbeat timeout for a callback.""" # Cancel existing heartbeat timeout - if heartbeat_event := self._callback_heartbeats.get(callback_id): - heartbeat_event.remove() - del self._callback_heartbeats[callback_id] + if heartbeat_future := self._callback_heartbeats.pop(callback_id, None): + heartbeat_future.cancel() # Find callback options to reschedule heartbeat timeout try: callback_token = CallbackToken.from_str(callback_id) execution = self.get_execution(callback_token.execution_arn) - # Find callback options from updates callback_options = None for update in execution.updates: if ( @@ -1122,13 +1107,14 @@ def heartbeat_timeout_handler(): self._on_callback_heartbeat_timeout(execution_arn, callback_id) completion_event = self._completion_events.get(execution_arn) - heartbeat_event = self._scheduler.create_event() - self._callback_heartbeats[callback_id] = heartbeat_event - self._scheduler.call_later( + + heartbeat_future = self._scheduler.call_later( heartbeat_timeout_handler, delay=callback_options.heartbeat_timeout_seconds, completion_event=completion_event, ) + self._callback_heartbeats[callback_id] = heartbeat_future + except Exception: logger.exception( "[%s] Error resetting callback heartbeat timeout for %s", @@ -1139,14 +1125,12 @@ def heartbeat_timeout_handler(): def _cleanup_callback_timeouts(self, callback_id: str) -> None: """Clean up timeout events for a completed callback.""" # Clean up main timeout - if timeout_event := self._callback_timeouts.get(callback_id): - timeout_event.remove() - del self._callback_timeouts[callback_id] + if timeout_future := self._callback_timeouts.pop(callback_id, None): + timeout_future.cancel() # Clean up heartbeat timeout - if heartbeat_event := self._callback_heartbeats.get(callback_id): - heartbeat_event.remove() - del self._callback_heartbeats[callback_id] + if heartbeat_future := self._callback_heartbeats.pop(callback_id, None): + heartbeat_future.cancel() def _on_callback_timeout(self, execution_arn: str, callback_id: str) -> None: """Handle callback timeout.""" @@ -1161,7 +1145,7 @@ def _on_callback_timeout(self, execution_arn: str, callback_id: str) -> None: timeout_error = ErrorObject.from_message( f"Callback timed out: {CallbackTimeoutType.TIMEOUT.value}" ) - execution.complete_callback_failure(callback_id, timeout_error) + execution.complete_callback_timeout(callback_id, timeout_error) execution.complete_fail(timeout_error) self._store.update(execution) logger.warning("[%s] Callback %s timed out", execution_arn, callback_id) @@ -1188,7 +1172,7 @@ def _on_callback_heartbeat_timeout( heartbeat_error = ErrorObject.from_message( f"Callback heartbeat timed out: {CallbackTimeoutType.HEARTBEAT.value}" ) - execution.complete_callback_failure(callback_id, heartbeat_error) + execution.complete_callback_timeout(callback_id, heartbeat_error) execution.complete_fail(heartbeat_error) self._store.update(execution) logger.warning( diff --git a/src/aws_durable_execution_sdk_python_testing/observer.py b/src/aws_durable_execution_sdk_python_testing/observer.py index 3a21fd5..1b518ce 100644 --- a/src/aws_durable_execution_sdk_python_testing/observer.py +++ b/src/aws_durable_execution_sdk_python_testing/observer.py @@ -11,7 +11,10 @@ if TYPE_CHECKING: from collections.abc import Callable - from aws_durable_execution_sdk_python.lambda_service import ErrorObject + from aws_durable_execution_sdk_python.lambda_service import ( + ErrorObject, + CallbackOptions, + ) class ExecutionObserver(ABC): @@ -47,7 +50,11 @@ def on_step_retry_scheduled( @abstractmethod def on_callback_created( - self, execution_arn: str, operation_id: str, callback_token: CallbackToken + self, + execution_arn: str, + operation_id: str, + callback_options: CallbackOptions | None, + callback_token: CallbackToken, ) -> None: """Called when callback is created.""" @@ -119,13 +126,18 @@ def notify_step_retry_scheduled( ) def notify_callback_created( - self, execution_arn: str, operation_id: str, callback_token: CallbackToken + self, + execution_arn: str, + operation_id: str, + callback_options: CallbackOptions | None, + callback_token: CallbackToken, ) -> None: """Notify observers about callback creation.""" self._notify_observers( ExecutionObserver.on_callback_created, execution_arn=execution_arn, operation_id=operation_id, + callback_options=callback_options, callback_token=callback_token, ) diff --git a/tests/executor_test.py b/tests/executor_test.py index 008a4a0..1d8e0f6 100644 --- a/tests/executor_test.py +++ b/tests/executor_test.py @@ -84,7 +84,11 @@ def on_step_retry_scheduled( } def on_callback_created( - self, execution_arn: str, operation_id: str, callback_token: CallbackToken + self, + execution_arn: str, + operation_id: str, + callback_options: CallbackOptions | None, + callback_token: CallbackToken, ) -> None: """Capture callback creation events.""" self.callback_creations[execution_arn] = { @@ -2491,41 +2495,17 @@ def test_complete_events_no_event(executor): def test_callback_timeout_scheduling(executor, mock_store, mock_scheduler): """Test that callback timeouts are scheduled when callback is created.""" - # Create mock execution with callback operation and updates - mock_execution = Mock() - mock_execution.durable_execution_arn = "test-arn" - - # Create callback operation with details - callback_operation = Operation( - operation_id="op-123", - operation_type=OperationType.CALLBACK, - status=OperationStatus.STARTED, - callback_details=CallbackDetails(callback_id="callback-id"), - ) - mock_execution.find_operation.return_value = (0, callback_operation) - - # Create callback update with timeout options + # Create callback options with both timeouts callback_options = CallbackOptions(timeout_seconds=60, heartbeat_timeout_seconds=30) - update = OperationUpdate( - operation_id="op-123", - operation_type=OperationType.CALLBACK, - action=OperationAction.START, - callback_options=callback_options, - ) - mock_execution.updates = [update] - - mock_store.load.return_value = mock_execution - mock_scheduler.create_event.return_value = Mock() # Set up completion event executor._completion_events["test-arn"] = Mock() - # Test the timeout scheduling directly - executor._schedule_callback_timeouts("test-arn", "op-123", "callback-id") + # Test the timeout scheduling directly with correct parameters + executor._schedule_callback_timeouts("test-arn", callback_options, "callback-id") # Verify scheduler was called for both timeouts assert mock_scheduler.call_later.call_count == 2 # main timeout + heartbeat timeout - assert mock_scheduler.create_event.call_count == 2 # events for both timeouts def test_callback_timeout_cleanup(executor, mock_store): @@ -2540,9 +2520,9 @@ def test_callback_timeout_cleanup(executor, mock_store): # Trigger cleanup executor._cleanup_callback_timeouts("callback-id") - # Verify events were removed and cleaned up - timeout_event.remove.assert_called_once() - heartbeat_event.remove.assert_called_once() + # Verify events were cancelled and removed + timeout_event.cancel.assert_called_once() + heartbeat_event.cancel.assert_called_once() assert "callback-id" not in executor._callback_timeouts assert "callback-id" not in executor._callback_heartbeats @@ -2575,8 +2555,8 @@ def test_callback_heartbeat_timeout_reset(executor, mock_store, mock_scheduler): # Reset heartbeat timeout executor._reset_callback_heartbeat_timeout(callback_id, "test-arn") - # Verify old event was removed and new one scheduled - old_event.remove.assert_called_once() + # Verify old event was cancelled and new one scheduled + old_event.cancel.assert_called_once() mock_scheduler.call_later.assert_called() @@ -2591,22 +2571,26 @@ def test_callback_timeout_handlers(executor, mock_store): mock_execution.is_complete = False mock_store.load.return_value = mock_execution - with patch.object(executor, "_invoke_execution"): - # Test main timeout handler - executor._on_callback_timeout("test-arn", callback_id) + # Test main timeout handler + executor._on_callback_timeout("test-arn", callback_id) + + # Verify callback was failed with timeout error + mock_execution.complete_callback_timeout.assert_called() + timeout_error = mock_execution.complete_callback_timeout.call_args[0][1] + assert "Callback timed out" in str(timeout_error.message) + mock_execution.complete_fail.assert_called() - # Verify callback was failed with timeout error - mock_execution.complete_callback_failure.assert_called() - timeout_error = mock_execution.complete_callback_failure.call_args[0][1] - assert "Callback.Timeout" in str(timeout_error.message) + # Reset mocks for heartbeat test + mock_execution.reset_mock() - # Test heartbeat timeout handler - executor._on_callback_heartbeat_timeout("test-arn", callback_id) + # Test heartbeat timeout handler + executor._on_callback_heartbeat_timeout("test-arn", callback_id) - # Verify callback was failed with heartbeat timeout error - assert mock_execution.complete_callback_failure.call_count == 2 - heartbeat_error = mock_execution.complete_callback_failure.call_args[0][1] - assert "Callback.Heartbeat" in str(heartbeat_error.message) + # Verify callback was failed with heartbeat timeout error + mock_execution.complete_callback_timeout.assert_called() + heartbeat_error = mock_execution.complete_callback_timeout.call_args[0][1] + assert "Callback heartbeat timed out" in str(heartbeat_error.message) + mock_execution.complete_fail.assert_called() def test_callback_timeout_completed_execution(executor, mock_store): @@ -2626,7 +2610,7 @@ def test_callback_timeout_completed_execution(executor, mock_store): executor._on_callback_heartbeat_timeout("test-arn", callback_id) # Verify no callback operations were performed - mock_execution.complete_callback_failure.assert_not_called() + mock_execution.complete_callback_timeout.assert_not_called() mock_store.update.assert_not_called() @@ -2717,32 +2701,12 @@ def test_schedule_callback_timeouts_only_main_timeout( ): """Test _schedule_callback_timeouts with only main timeout configured.""" - # Create operation with callback details - operation = Operation( - operation_id="op-123", - operation_type=OperationType.CALLBACK, - status=OperationStatus.STARTED, - callback_details=CallbackDetails(callback_id="callback-id"), - ) - - mock_execution = Mock() - mock_execution.find_operation.return_value = (0, operation) - - # Create update with only main timeout + # Create callback options with only main timeout callback_options = CallbackOptions(timeout_seconds=60, heartbeat_timeout_seconds=0) - update = OperationUpdate( - operation_id="op-123", - operation_type=OperationType.CALLBACK, - action=OperationAction.START, - callback_options=callback_options, - ) - mock_execution.updates = [update] - mock_store.load.return_value = mock_execution - mock_scheduler.create_event.return_value = Mock() executor._completion_events["test-arn"] = Mock() - executor._schedule_callback_timeouts("test-arn", "op-123", "callback-id") + executor._schedule_callback_timeouts("test-arn", callback_options, "callback-id") # Only main timeout should be scheduled assert mock_scheduler.call_later.call_count == 1 @@ -2754,32 +2718,12 @@ def test_schedule_callback_timeouts_only_heartbeat_timeout( executor, mock_store, mock_scheduler ): """Test _schedule_callback_timeouts with only heartbeat timeout configured.""" - # Create operation with callback details - operation = Operation( - operation_id="op-123", - operation_type=OperationType.CALLBACK, - status=OperationStatus.STARTED, - callback_details=CallbackDetails(callback_id="callback-id"), - ) - - mock_execution = Mock() - mock_execution.find_operation.return_value = (0, operation) - - # Create update with only heartbeat timeout + # Create callback options with only heartbeat timeout callback_options = CallbackOptions(timeout_seconds=0, heartbeat_timeout_seconds=30) - update = OperationUpdate( - operation_id="op-123", - operation_type=OperationType.CALLBACK, - action=OperationAction.START, - callback_options=callback_options, - ) - mock_execution.updates = [update] - mock_store.load.return_value = mock_execution - mock_scheduler.create_event.return_value = Mock() executor._completion_events["test-arn"] = Mock() - executor._schedule_callback_timeouts("test-arn", "op-123", "callback-id") + executor._schedule_callback_timeouts("test-arn", callback_options, "callback-id") # Only heartbeat timeout should be scheduled assert mock_scheduler.call_later.call_count == 1 diff --git a/tests/how-to-run-from-term.txt b/tests/how-to-run-from-term.txt new file mode 100644 index 0000000..1301cdd --- /dev/null +++ b/tests/how-to-run-from-term.txt @@ -0,0 +1 @@ +source /Users/rarepolz/workspace/aws-durable-execution/venv/bin/activate && pip install -e . --no-deps && pytest tests/event_conversion_test.py -v \ No newline at end of file diff --git a/tests/observer_test.py b/tests/observer_test.py index 4847eee..193f395 100644 --- a/tests/observer_test.py +++ b/tests/observer_test.py @@ -5,7 +5,7 @@ from unittest.mock import Mock import pytest -from aws_durable_execution_sdk_python.lambda_service import ErrorObject +from aws_durable_execution_sdk_python.lambda_service import ErrorObject, CallbackOptions from aws_durable_execution_sdk_python_testing.observer import ( ExecutionNotifier, @@ -49,10 +49,14 @@ def on_step_retry_scheduled( self.on_step_retry_scheduled_calls.append((execution_arn, operation_id, delay)) def on_callback_created( - self, execution_arn: str, operation_id: str, callback_token: CallbackToken + self, + execution_arn: str, + operation_id: str, + callback_options: CallbackOptions | None, + callback_token: CallbackToken, ) -> None: self.on_callback_created_calls.append( - (execution_arn, operation_id, callback_token) + (execution_arn, operation_id, callback_options, callback_token) ) diff --git a/tests/pending_operation_test.py b/tests/pending_operation_test.py new file mode 100644 index 0000000..d508fbc --- /dev/null +++ b/tests/pending_operation_test.py @@ -0,0 +1,129 @@ +# """Test for pending operation handling in get_execution_history.""" +# +# from datetime import UTC, datetime +# from unittest.mock import Mock +# +# from aws_durable_execution_sdk_python.lambda_service import ( +# OperationStatus, +# OperationType, +# ) +# +# from aws_durable_execution_sdk_python_testing.executor import Executor +# from aws_durable_execution_sdk_python_testing.model import StartDurableExecutionInput +# +# +# def test_get_execution_history_with_pending_chained_invoke(): +# """Test get_execution_history handles pending CHAINED_INVOKE operations correctly.""" +# # Create mocks +# mock_store = Mock() +# mock_scheduler = Mock() +# mock_invoker = Mock() +# mock_checkpoint_processor = Mock() +# +# executor = Executor(mock_store, mock_scheduler, mock_invoker, mock_checkpoint_processor) +# +# # Create mock execution +# mock_execution = Mock() +# mock_execution.durable_execution_arn = "test-arn" +# mock_execution.start_input = StartDurableExecutionInput( +# account_id="123", +# function_name="test", +# function_qualifier="$LATEST", +# execution_name="test", +# execution_timeout_seconds=300, +# execution_retention_period_days=7, +# ) +# mock_execution.result = None +# mock_execution.updates = [] +# +# # Create a pending CHAINED_INVOKE operation with start_timestamp +# pending_op = Mock() +# pending_op.operation_id = "invoke-1" +# pending_op.operation_type = OperationType.CHAINED_INVOKE +# pending_op.status = OperationStatus.PENDING +# pending_op.start_timestamp = datetime.now(UTC) +# pending_op.end_timestamp = None +# +# # Create a non-CHAINED_INVOKE pending operation (should be skipped) +# pending_step = Mock() +# pending_step.operation_id = "step-1" +# pending_step.operation_type = OperationType.STEP +# pending_step.status = OperationStatus.PENDING +# pending_step.start_timestamp = datetime.now(UTC) +# pending_step.end_timestamp = None +# +# # Create a CHAINED_INVOKE pending operation without start_timestamp (should be skipped) +# pending_invoke_no_timestamp = Mock() +# pending_invoke_no_timestamp.operation_id = "invoke-2" +# pending_invoke_no_timestamp.operation_type = OperationType.CHAINED_INVOKE +# pending_invoke_no_timestamp.status = OperationStatus.PENDING +# pending_invoke_no_timestamp.start_timestamp = None +# pending_invoke_no_timestamp.end_timestamp = None +# +# mock_execution.operations = [pending_op, pending_step, pending_invoke_no_timestamp] +# mock_store.load.return_value = mock_execution +# +# # Call get_execution_history +# result = executor.get_execution_history("test-arn", include_execution_data=True) +# +# # Should have 2 events: 1 pending event + 1 started event for the valid pending CHAINED_INVOKE +# assert len(result.events) == 2 +# +# # First event should be the pending event +# assert result.events[0].event_type == "ChainedInvokeStarted" +# assert result.events[0].operation_id == "invoke-1" +# assert result.events[0].chained_invoke_pending_details is not None +# +# # Second event should be the started event +# assert result.events[1].event_type == "ChainedInvokeStarted" +# assert result.events[1].operation_id == "invoke-1" +# assert result.events[1].chained_invoke_started_details is not None +# +# +# def test_get_execution_history_skips_invalid_pending_operations(): +# """Test that invalid pending operations are skipped.""" +# # Create mocks +# mock_store = Mock() +# mock_scheduler = Mock() +# mock_invoker = Mock() +# mock_checkpoint_processor = Mock() +# +# executor = Executor(mock_store, mock_scheduler, mock_invoker, mock_checkpoint_processor) +# +# # Create mock execution +# mock_execution = Mock() +# mock_execution.durable_execution_arn = "test-arn" +# mock_execution.start_input = StartDurableExecutionInput( +# account_id="123", +# function_name="test", +# function_qualifier="$LATEST", +# execution_name="test", +# execution_timeout_seconds=300, +# execution_retention_period_days=7, +# ) +# mock_execution.result = None +# mock_execution.updates = [] +# +# # Create operations that should be skipped +# # 1. Non-CHAINED_INVOKE pending operation +# pending_step = Mock() +# pending_step.operation_id = "step-1" +# pending_step.operation_type = OperationType.STEP +# pending_step.status = OperationStatus.PENDING +# pending_step.start_timestamp = datetime.now(UTC) +# +# # 2. CHAINED_INVOKE pending operation without start_timestamp +# pending_invoke_no_timestamp = Mock() +# pending_invoke_no_timestamp.operation_id = "invoke-1" +# pending_invoke_no_timestamp.operation_type = OperationType.CHAINED_INVOKE +# pending_invoke_no_timestamp.status = OperationStatus.PENDING +# pending_invoke_no_timestamp.start_timestamp = None +# +# mock_execution.operations = [pending_step, pending_invoke_no_timestamp] +# mock_store.load.return_value = mock_execution +# +# # Call get_execution_history +# result = executor.get_execution_history("test-arn") +# +# # Should have no events since all pending operations are invalid +# assert len(result.events) == 0 From 39ae87788d1367711481a82cc518c18c29411040 Mon Sep 17 00:00:00 2001 From: Alex Wang Date: Wed, 19 Nov 2025 16:09:13 -0800 Subject: [PATCH 2/2] examples: Add wait_for_callback examples - Wait for callback examples - Add a helper method in the runner to get all nested operations - Update sam template --- examples/examples-catalog.json | 99 ++++++++++++++ .../wait_for_callback_anonymous.py | 18 +++ .../wait_for_callback_child.py | 42 ++++++ .../wait_for_callback_heartbeat.py | 31 +++++ .../wait_for_callback_mixed_ops.py | 47 +++++++ .../wait_for_callback_multiple_invocations.py | 53 ++++++++ .../wait_for_callback_nested.py | 66 +++++++++ .../wait_for_callback_serdes.py | 89 +++++++++++++ .../wait_for_callback_submitter_failure.py | 44 ++++++ ...or_callback_submitter_failure_catchable.py | 52 ++++++++ examples/template.yaml | 126 ++++++++++++++++++ .../test_wait_for_callback_anonymous.py | 39 ++++++ .../test_wait_for_callback_child.py | 73 ++++++++++ .../test_wait_for_callback_heartbeat.py | 62 +++++++++ .../test_wait_for_callback_mixed_ops.py | 52 ++++++++ ..._wait_for_callback_multiple_invocations.py | 74 ++++++++++ .../test_wait_for_callback_nested.py | 101 ++++++++++++++ .../test_wait_for_callback_serdes.py | 66 +++++++++ ...est_wait_for_callback_submitter_failure.py | 32 +++++ ...or_callback_submitter_failure_catchable.py | 28 ++++ .../runner.py | 12 ++ 21 files changed, 1206 insertions(+) create mode 100644 examples/src/wait_for_callback/wait_for_callback_anonymous.py create mode 100644 examples/src/wait_for_callback/wait_for_callback_child.py create mode 100644 examples/src/wait_for_callback/wait_for_callback_heartbeat.py create mode 100644 examples/src/wait_for_callback/wait_for_callback_mixed_ops.py create mode 100644 examples/src/wait_for_callback/wait_for_callback_multiple_invocations.py create mode 100644 examples/src/wait_for_callback/wait_for_callback_nested.py create mode 100644 examples/src/wait_for_callback/wait_for_callback_serdes.py create mode 100644 examples/src/wait_for_callback/wait_for_callback_submitter_failure.py create mode 100644 examples/src/wait_for_callback/wait_for_callback_submitter_failure_catchable.py create mode 100644 examples/test/wait_for_callback/test_wait_for_callback_anonymous.py create mode 100644 examples/test/wait_for_callback/test_wait_for_callback_child.py create mode 100644 examples/test/wait_for_callback/test_wait_for_callback_heartbeat.py create mode 100644 examples/test/wait_for_callback/test_wait_for_callback_mixed_ops.py create mode 100644 examples/test/wait_for_callback/test_wait_for_callback_multiple_invocations.py create mode 100644 examples/test/wait_for_callback/test_wait_for_callback_nested.py create mode 100644 examples/test/wait_for_callback/test_wait_for_callback_serdes.py create mode 100644 examples/test/wait_for_callback/test_wait_for_callback_submitter_failure.py create mode 100644 examples/test/wait_for_callback/test_wait_for_callback_submitter_failure_catchable.py diff --git a/examples/examples-catalog.json b/examples/examples-catalog.json index d763497..fca9860 100644 --- a/examples/examples-catalog.json +++ b/examples/examples-catalog.json @@ -100,6 +100,105 @@ }, "path": "./src/wait_for_callback/wait_for_callback.py" }, + { + "name": "Wait For Callback Success Anonymous", + "description": "Usage of context.wait_for_callback() to wait for external system responses", + "handler": "wait_for_callback_anonymous.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/wait_for_callback/wait_for_callback_anonymous.py" + }, + { + "name": "Wait For Callback Heartbeat Sends", + "description": "Usage of context.wait_for_callback() to wait for external system responses", + "handler": "wait_for_callback_heartbeat.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/wait_for_callback/wait_for_callback_heartbeat.py" + }, + { + "name": "Wait For Callback With Child Context", + "description": "Usage of context.wait_for_callback() to wait for external system responses", + "handler": "wait_for_callback_child.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/wait_for_callback/wait_for_callback_child.py" + }, + { + "name": "Wait For Callback Mixed Ops", + "description": "Usage of context.wait_for_callback() to wait for external system responses", + "handler": "wait_for_callback_mixed_ops.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/wait_for_callback/wait_for_callback_mixed_ops.py" + }, + { + "name": "Wait For Callback Multiple Invocations", + "description": "Usage of context.wait_for_callback() to wait for external system responses", + "handler": "wait_for_callback_multiple_invocations.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/wait_for_callback/wait_for_callback_multiple_invocations.py" + }, + { + "name": "Wait For Callback Failing Submitter Catchable", + "description": "Usage of context.wait_for_callback() to wait for external system responses", + "handler": "wait_for_callback_submitter_failure_catchable.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/wait_for_callback/wait_for_callback_submitter_failure_catchable.py" + }, + { + "name": "Wait For Callback Submitter Failure", + "description": "Usage of context.wait_for_callback() to wait for external system responses", + "handler": "wait_for_callback_submitter_failure.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/wait_for_callback/wait_for_callback_submitter_failure.py" + }, + { + "name": "Wait For Callback Serdes", + "description": "Usage of context.wait_for_callback() to wait for external system responses", + "handler": "wait_for_callback_serdes.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/wait_for_callback/wait_for_callback_serdes.py" + }, + { + "name": "Wait For Callback Nested", + "description": "Usage of context.wait_for_callback() to wait for external system responses", + "handler": "wait_for_callback_nested.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/wait_for_callback/wait_for_callback_nested.py" + }, { "name": "Run in Child Context", "description": "Usage of context.run_in_child_context() to execute operations in isolated contexts", diff --git a/examples/src/wait_for_callback/wait_for_callback_anonymous.py b/examples/src/wait_for_callback/wait_for_callback_anonymous.py new file mode 100644 index 0000000..9327ac7 --- /dev/null +++ b/examples/src/wait_for_callback/wait_for_callback_anonymous.py @@ -0,0 +1,18 @@ +"""Demonstrates waitForCallback with anonymous (inline) submitter function.""" + +import time +from typing import Any + +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> dict[str, Any]: + """Handler demonstrating waitForCallback with anonymous submitter.""" + result: str = context.wait_for_callback(lambda _: time.sleep(1)) + + return { + "callbackResult": result, + "completed": True, + } diff --git a/examples/src/wait_for_callback/wait_for_callback_child.py b/examples/src/wait_for_callback/wait_for_callback_child.py new file mode 100644 index 0000000..2f50c67 --- /dev/null +++ b/examples/src/wait_for_callback/wait_for_callback_child.py @@ -0,0 +1,42 @@ +"""Demonstrates waitForCallback operations within child contexts.""" + +from typing import Any + +from aws_durable_execution_sdk_python.context import ( + DurableContext, + durable_with_child_context, +) +from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.config import Duration + + +@durable_with_child_context +def child_context_with_callback(child_context: DurableContext) -> dict[str, Any]: + """Child context containing wait and callback operations.""" + child_context.wait(Duration.from_seconds(1), name="child-wait") + + child_callback_result: str = child_context.wait_for_callback( + lambda _: None, name="child-callback-op" + ) + + return { + "childResult": child_callback_result, + "childProcessed": True, + } + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> dict[str, Any]: + """Handler demonstrating waitForCallback within child contexts.""" + parent_result: str = context.wait_for_callback( + lambda _: None, name="parent-callback-op" + ) + + child_context_result: dict[str, Any] = context.run_in_child_context( + child_context_with_callback(), name="child-context-with-callback" + ) + + return { + "parentResult": parent_result, + "childContextResult": child_context_result, + } diff --git a/examples/src/wait_for_callback/wait_for_callback_heartbeat.py b/examples/src/wait_for_callback/wait_for_callback_heartbeat.py new file mode 100644 index 0000000..ac4c398 --- /dev/null +++ b/examples/src/wait_for_callback/wait_for_callback_heartbeat.py @@ -0,0 +1,31 @@ +"""Demonstrates sending heartbeats during long-running callback processing.""" + +import time +from typing import Any + +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.config import Duration +from aws_durable_execution_sdk_python.config import WaitForCallbackConfig + + +def submitter(_callback_id: str) -> None: + """Simulate long-running submitter function.""" + time.sleep(5) + return None + + +@durable_execution +def handler(event: dict[str, Any], context: DurableContext) -> dict[str, Any]: + """Handler demonstrating waitForCallback with heartbeat timeout.""" + + config = WaitForCallbackConfig( + timeout=Duration.from_seconds(120), heartbeat_timeout=Duration.from_seconds(15) + ) + + result: str = context.wait_for_callback(submitter, config=config) + + return { + "callbackResult": result, + "completed": True, + } diff --git a/examples/src/wait_for_callback/wait_for_callback_mixed_ops.py b/examples/src/wait_for_callback/wait_for_callback_mixed_ops.py new file mode 100644 index 0000000..107ec19 --- /dev/null +++ b/examples/src/wait_for_callback/wait_for_callback_mixed_ops.py @@ -0,0 +1,47 @@ +"""Demonstrates waitForCallback combined with steps, waits, and other operations.""" + +import time +from typing import Any + +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.config import Duration + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> dict[str, Any]: + """Handler demonstrating waitForCallback mixed with other operations.""" + # Mix waitForCallback with other operation types + context.wait(Duration.from_seconds(1), name="initial-wait") + + step_result: dict[str, Any] = context.step( + lambda _: {"userId": 123, "name": "John Doe"}, + name="fetch-user-data", + ) + + def submitter(_) -> None: + """Submitter uses data from previous step.""" + time.sleep(0.1) + return None + + callback_result: str = context.wait_for_callback( + submitter, + name="wait-for-callback", + ) + + context.wait(Duration.from_seconds(2), name="final-wait") + + final_step: dict[str, Any] = context.step( + lambda _: { + "status": "completed", + "timestamp": int(time.time() * 1000), + }, + name="finalize-processing", + ) + + return { + "stepResult": step_result, + "callbackResult": callback_result, + "finalStep": final_step, + "workflowCompleted": True, + } diff --git a/examples/src/wait_for_callback/wait_for_callback_multiple_invocations.py b/examples/src/wait_for_callback/wait_for_callback_multiple_invocations.py new file mode 100644 index 0000000..3793adc --- /dev/null +++ b/examples/src/wait_for_callback/wait_for_callback_multiple_invocations.py @@ -0,0 +1,53 @@ +"""Demonstrates multiple invocations tracking with waitForCallback operations across different invocations.""" + +from typing import Any + +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.config import Duration + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> dict[str, Any]: + """Handler demonstrating multiple invocations with waitForCallback operations.""" + # First invocation - wait operation + context.wait(Duration.from_seconds(1), name="wait-invocation-1") + + # First callback operation + def first_submitter(callback_id: str) -> None: + """Submitter for first callback.""" + print(f"First callback submitted with ID: {callback_id}") + return None + + callback_result_1: str = context.wait_for_callback( + first_submitter, + name="first-callback", + ) + + # Step operation between callbacks + step_result: dict[str, Any] = context.step( + lambda _: {"processed": True, "step": 1}, + name="process-callback-data", + ) + + # Second invocation - another wait operation + context.wait(Duration.from_seconds(1), name="wait-invocation-2") + + # Second callback operation + def second_submitter(callback_id: str) -> None: + """Submitter for second callback.""" + print(f"Second callback submitted with ID: {callback_id}") + return None + + callback_result_2: str = context.wait_for_callback( + second_submitter, + name="second-callback", + ) + + # Final invocation returns complete result + return { + "firstCallback": callback_result_1, + "secondCallback": callback_result_2, + "stepResult": step_result, + "invocationCount": "multiple", + } diff --git a/examples/src/wait_for_callback/wait_for_callback_nested.py b/examples/src/wait_for_callback/wait_for_callback_nested.py new file mode 100644 index 0000000..f855ac3 --- /dev/null +++ b/examples/src/wait_for_callback/wait_for_callback_nested.py @@ -0,0 +1,66 @@ +"""Demonstrates nested waitForCallback operations across multiple child context levels.""" + +from typing import Any + +from aws_durable_execution_sdk_python.context import ( + DurableContext, + durable_with_child_context, +) +from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.config import Duration + + +@durable_with_child_context +def inner_child_context(inner_child_ctx: DurableContext) -> dict[str, Any]: + """Inner child context with deep nested callback.""" + inner_child_ctx.wait(Duration.from_seconds(5), name="deep-wait") + + nested_callback_result: str = inner_child_ctx.wait_for_callback( + lambda _: None, + name="nested-callback-op", + ) + + return { + "nestedCallback": nested_callback_result, + "deepLevel": "inner-child", + } + + +@durable_with_child_context +def outer_child_context(outer_child_ctx: DurableContext) -> dict[str, Any]: + """Outer child context with inner callback and nested context.""" + inner_result: str = outer_child_ctx.wait_for_callback( + lambda _: None, + name="inner-callback-op", + ) + + # Nested child context with another callback + deep_nested_result: dict[str, Any] = outer_child_ctx.run_in_child_context( + inner_child_context(), + name="inner-child-context", + ) + + return { + "innerCallback": inner_result, + "deepNested": deep_nested_result, + "level": "outer-child", + } + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> dict[str, Any]: + """Handler demonstrating nested waitForCallback operations across multiple levels.""" + outer_result: str = context.wait_for_callback( + lambda _: None, + name="outer-callback-op", + ) + + nested_result: dict[str, Any] = context.run_in_child_context( + outer_child_context(), + name="outer-child-context", + ) + + return { + "outerCallback": outer_result, + "nestedResults": nested_result, + } diff --git a/examples/src/wait_for_callback/wait_for_callback_serdes.py b/examples/src/wait_for_callback/wait_for_callback_serdes.py new file mode 100644 index 0000000..2cd53c0 --- /dev/null +++ b/examples/src/wait_for_callback/wait_for_callback_serdes.py @@ -0,0 +1,89 @@ +"""Demonstrates waitForCallback with custom serialization/deserialization.""" + +import json +from datetime import datetime +from typing import Any, Optional, TypedDict + +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.config import Duration, WaitForCallbackConfig + + +class CustomDataMetadata(TypedDict): + """Metadata for CustomData.""" + + version: str + processed: bool + + +class CustomData(TypedDict): + """Custom data structure with datetime.""" + + id: int + message: str + timestamp: datetime + metadata: CustomDataMetadata + + +class CustomSerdes: + """Custom serialization/deserialization for CustomData.""" + + @staticmethod + def serialize(data: Optional[CustomData], _=None) -> Optional[str]: + """Serialize CustomData to JSON string.""" + if data is None: + return None + + serialized_data = { + "id": data["id"], + "message": data["message"], + "timestamp": data["timestamp"].isoformat(), + "metadata": data["metadata"], + "_serializedBy": "custom-serdes-v1", + } + return json.dumps(serialized_data) + + @staticmethod + def deserialize(data_str: Optional[str], _=None) -> Optional[CustomData]: + """Deserialize JSON string to CustomData.""" + if data_str is None: + return None + + parsed = json.loads(data_str) + return CustomData( + id=parsed["id"], + message=parsed["message"], + timestamp=datetime.fromisoformat( + parsed["timestamp"].replace("Z", "+00:00") + ), + metadata=CustomDataMetadata( + version=parsed["metadata"]["version"], + processed=parsed["metadata"]["processed"], + ), + ) + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> dict[str, Any]: + """Handler demonstrating waitForCallback with custom serdes.""" + + config = WaitForCallbackConfig( + timeout=Duration.from_seconds(10), + heartbeat_timeout=Duration.from_seconds(20), + serdes=CustomSerdes(), + ) + + result: CustomData = context.wait_for_callback( + lambda _: None, + name="custom-serdes-callback", + config=config, + ) + + isDateObject = isinstance(result["timestamp"], datetime) + # convert timestamp to isoformat because lambda accepts defalut json as result + result["timestamp"] = result["timestamp"].isoformat() + + return { + "receivedData": result, + "isDateObject": isDateObject, + } diff --git a/examples/src/wait_for_callback/wait_for_callback_submitter_failure.py b/examples/src/wait_for_callback/wait_for_callback_submitter_failure.py new file mode 100644 index 0000000..500954f --- /dev/null +++ b/examples/src/wait_for_callback/wait_for_callback_submitter_failure.py @@ -0,0 +1,44 @@ +"""Demonstrates waitForCallback with submitter retry strategy using exponential backoff (0.5s, 1s, 2s).""" + +from typing import Any + +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.retries import ( + RetryStrategyConfig, + create_retry_strategy, +) +from aws_durable_execution_sdk_python.config import Duration, WaitForCallbackConfig + + +@durable_execution +def handler(event: dict[str, Any], context: DurableContext) -> dict[str, Any]: + """Handler demonstrating waitForCallback with submitter retry and exponential backoff.""" + + def submitter(callback_id: str) -> None: + """Submitter function that can fail based on event parameter.""" + print(f"Submitting callback to external system - callbackId: {callback_id}") + raise Exception("Simulated submitter failure") + + config = WaitForCallbackConfig( + timeout=Duration.from_seconds(10), + heartbeat_timeout=Duration.from_seconds(20), + retry_strategy=create_retry_strategy( + config=RetryStrategyConfig( + max_attempts=3, + initial_delay=Duration.from_seconds(1), + max_delay=Duration.from_seconds(1), + ) + ), + ) + + result: str = context.wait_for_callback( + submitter, + name="retry-submitter-callback", + config=config, + ) + + return { + "result": result, + "success": True, + } diff --git a/examples/src/wait_for_callback/wait_for_callback_submitter_failure_catchable.py b/examples/src/wait_for_callback/wait_for_callback_submitter_failure_catchable.py new file mode 100644 index 0000000..ec24ae4 --- /dev/null +++ b/examples/src/wait_for_callback/wait_for_callback_submitter_failure_catchable.py @@ -0,0 +1,52 @@ +"""Demonstrates waitForCallback with submitter function that fails.""" + +import time +from typing import Any + +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.retries import ( + RetryStrategyConfig, + create_retry_strategy, +) +from aws_durable_execution_sdk_python.config import Duration, WaitForCallbackConfig + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> dict[str, Any]: + """Handler demonstrating waitForCallback with failing submitter.""" + + def submitter(_) -> None: + """Submitter function that fails after a delay.""" + time.sleep(0.5) + # Submitter fails + raise Exception("Submitter failed") + + config = WaitForCallbackConfig( + timeout=Duration.from_seconds(10), + heartbeat_timeout=Duration.from_seconds(20), + retry_strategy=create_retry_strategy( + config=RetryStrategyConfig( + max_attempts=3, + initial_delay=Duration.from_seconds(1), + max_delay=Duration.from_seconds(1), + ) + ), + ) + + try: + result: str = context.wait_for_callback( + submitter, + name="failing-submitter-callback", + config=config, + ) + + return { + "callbackResult": result, + "success": True, + } + except Exception as error: + return { + "success": False, + "error": str(error), + } diff --git a/examples/template.yaml b/examples/template.yaml index 4e1a7ed..d674bf5 100644 --- a/examples/template.yaml +++ b/examples/template.yaml @@ -142,6 +142,132 @@ Resources: DurableConfig: RetentionPeriodInDays: 7 ExecutionTimeout: 300 + WaitForCallbackAnonymous: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: wait_for_callback_anonymous.handler + Description: Usage of context.wait_for_callback() to wait for external system + responses + Role: + Fn::GetAtt: + - DurableFunctionRole + - Arn + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 + WaitForCallbackHeartbeat: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: wait_for_callback_heartbeat.handler + Description: Usage of context.wait_for_callback() to wait for external system + responses + Role: + Fn::GetAtt: + - DurableFunctionRole + - Arn + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 + WaitForCallbackChild: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: wait_for_callback_child.handler + Description: Usage of context.wait_for_callback() to wait for external system + responses + Role: + Fn::GetAtt: + - DurableFunctionRole + - Arn + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 + WaitForCallbackMixedOps: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: wait_for_callback_mixed_ops.handler + Description: Usage of context.wait_for_callback() to wait for external system + responses + Role: + Fn::GetAtt: + - DurableFunctionRole + - Arn + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 + WaitForCallbackMultipleInvocations: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: wait_for_callback_multiple_invocations.handler + Description: Usage of context.wait_for_callback() to wait for external system + responses + Role: + Fn::GetAtt: + - DurableFunctionRole + - Arn + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 + WaitForCallbackSubmitterFailureCatchable: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: wait_for_callback_submitter_failure_catchable.handler + Description: Usage of context.wait_for_callback() to wait for external system + responses + Role: + Fn::GetAtt: + - DurableFunctionRole + - Arn + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 + WaitForCallbackSubmitterFailure: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: wait_for_callback_submitter_failure.handler + Description: Usage of context.wait_for_callback() to wait for external system + responses + Role: + Fn::GetAtt: + - DurableFunctionRole + - Arn + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 + WaitForCallbackSerdes: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: wait_for_callback_serdes.handler + Description: Usage of context.wait_for_callback() to wait for external system + responses + Role: + Fn::GetAtt: + - DurableFunctionRole + - Arn + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 + WaitForCallbackNested: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: wait_for_callback_nested.handler + Description: Usage of context.wait_for_callback() to wait for external system + responses + Role: + Fn::GetAtt: + - DurableFunctionRole + - Arn + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 RunInChildContext: Type: AWS::Serverless::Function Properties: diff --git a/examples/test/wait_for_callback/test_wait_for_callback_anonymous.py b/examples/test/wait_for_callback/test_wait_for_callback_anonymous.py new file mode 100644 index 0000000..d047da2 --- /dev/null +++ b/examples/test/wait_for_callback/test_wait_for_callback_anonymous.py @@ -0,0 +1,39 @@ +"""Tests for wait_for_callback_anonymous.""" + +import json + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus + +from src.wait_for_callback import wait_for_callback_anonymous +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=wait_for_callback_anonymous.handler, + lambda_function_name="Wait For Callback Success Anonymous", +) +def test_handle_basic_wait_for_callback_with_anonymous_submitter(durable_runner): + """Test basic waitForCallback with anonymous submitter.""" + with durable_runner: + execution_arn = durable_runner.run_async(input=None, timeout=30) + callback_id = durable_runner.wait_for_callback(execution_arn=execution_arn) + callback_result = json.dumps({"data": "callback_completed"}) + durable_runner.send_callback_success( + callback_id=callback_id, result=callback_result.encode() + ) + + result = durable_runner.wait_for_result(execution_arn=execution_arn) + + assert result.status is InvocationStatus.SUCCEEDED + + result_data = deserialize_operation_payload(result.result) + + assert result_data == { + "callbackResult": callback_result, + "completed": True, + } + + # Verify operations were tracked + assert len(result.operations) > 0 diff --git a/examples/test/wait_for_callback/test_wait_for_callback_child.py b/examples/test/wait_for_callback/test_wait_for_callback_child.py new file mode 100644 index 0000000..3016a36 --- /dev/null +++ b/examples/test/wait_for_callback/test_wait_for_callback_child.py @@ -0,0 +1,73 @@ +"""Tests for wait_for_callback_child_context.""" + +import json + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus + +from src.wait_for_callback import wait_for_callback_child +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=wait_for_callback_child.handler, + lambda_function_name="Wait For Callback With Child Context", +) +def test_handle_wait_for_callback_within_child_contexts(durable_runner): + """Test waitForCallback within child contexts.""" + test_payload = {"test": "child-context-callbacks"} + + with durable_runner: + execution_arn = durable_runner.run_async(input=test_payload, timeout=30) + # Wait for parent callback and get callback_id + parent_callback_id = durable_runner.wait_for_callback( + execution_arn=execution_arn + ) + # Send parent callback result + parent_callback_result = json.dumps({"parentData": "parent-completed"}) + durable_runner.send_callback_success( + callback_id=parent_callback_id, result=parent_callback_result.encode() + ) + # Wait for child callback and get callback_id + child_callback_id = durable_runner.wait_for_callback( + execution_arn=execution_arn, name="child-callback-op create callback id" + ) + # Send child callback result + child_callback_result = json.dumps({"childData": 42}) + durable_runner.send_callback_success( + callback_id=child_callback_id, result=child_callback_result.encode() + ) + # Wait for the execution to complete + result = durable_runner.wait_for_result(execution_arn=execution_arn) + + assert result.status is InvocationStatus.SUCCEEDED + result_data = deserialize_operation_payload(result.result) + assert result_data == { + "parentResult": parent_callback_result, + "childContextResult": { + "childResult": child_callback_result, + "childProcessed": True, + }, + } + + # Find the child context operation + child_context_ops = [ + op + for op in result.operations + if op.operation_type.value == "CONTEXT" + and op.name == "child-context-with-callback" + ] + assert len(child_context_ops) == 1 + child_context_op = child_context_ops[0] + + # Verify child operations are accessible + child_operations = child_context_op.child_operations + assert child_operations is not None + assert len(child_operations) == 2 # wait + waitForCallback + + all_ops = result.get_all_operations() + + # Verify completed operations count + completed_operations = [op for op in all_ops if op.status.value == "SUCCEEDED"] + assert len(completed_operations) == 8 diff --git a/examples/test/wait_for_callback/test_wait_for_callback_heartbeat.py b/examples/test/wait_for_callback/test_wait_for_callback_heartbeat.py new file mode 100644 index 0000000..bdbf627 --- /dev/null +++ b/examples/test/wait_for_callback/test_wait_for_callback_heartbeat.py @@ -0,0 +1,62 @@ +"""Tests for wait_for_callback_heartbeat_sends.""" + +import json +import time + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus + +from src.wait_for_callback import wait_for_callback_heartbeat +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=wait_for_callback_heartbeat.handler, + lambda_function_name="Wait For Callback Heartbeat Sends", +) +def test_handle_wait_for_callback_heartbeat_scenarios_during_long_running_submitter( + durable_runner, +): + """Test waitForCallback heartbeat scenarios during long-running submitter execution.""" + + with durable_runner: + # Start the execution (this will pause at the callback) + execution_arn = durable_runner.run_async( + input={"input": "test_payload"}, timeout=60 + ) + + # Wait for callback and get callback_id + callback_id = durable_runner.wait_for_callback(execution_arn=execution_arn) + + # Send heartbeat to keep the callback alive during processing + durable_runner.send_callback_heartbeat(callback_id=callback_id) + + # Wait a bit more to simulate callback processing time + wait_time = 7.0 + time.sleep(wait_time) + + # Send another heartbeat + durable_runner.send_callback_heartbeat(callback_id=callback_id) + + # Finally complete the callback + callback_result = json.dumps({"processed": 1000}) + durable_runner.send_callback_success( + callback_id=callback_id, result=callback_result.encode() + ) + + # Wait for the execution to complete + result = durable_runner.wait_for_result(execution_arn=execution_arn) + + assert result.status is InvocationStatus.SUCCEEDED + + result_data = deserialize_operation_payload(result.result) + + assert result_data["callbackResult"] == callback_result + assert result_data["completed"] is True + + # Should have completed operations with successful callback + completed_operations = [ + op for op in result.operations if op.status.value == "SUCCEEDED" + ] + assert len(completed_operations) > 0 diff --git a/examples/test/wait_for_callback/test_wait_for_callback_mixed_ops.py b/examples/test/wait_for_callback/test_wait_for_callback_mixed_ops.py new file mode 100644 index 0000000..4f4f982 --- /dev/null +++ b/examples/test/wait_for_callback/test_wait_for_callback_mixed_ops.py @@ -0,0 +1,52 @@ +"""Tests for wait_for_callback_mixed_ops.""" + +import json + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus + +from src.wait_for_callback import wait_for_callback_mixed_ops +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=wait_for_callback_mixed_ops.handler, + lambda_function_name="Wait For Callback Mixed Ops", +) +def test_handle_wait_for_callback_mixed_with_steps_waits_and_other_operations( + durable_runner, +): + """Test waitForCallback mixed with steps, waits, and other operations.""" + with durable_runner: + # Start the execution (this will pause at the callback) + execution_arn = durable_runner.run_async(input=None, timeout=30) + + # Wait for callback and get callback_id + callback_id = durable_runner.wait_for_callback(execution_arn=execution_arn) + + # Complete the callback + callback_result = json.dumps({"processed": True}) + durable_runner.send_callback_success( + callback_id=callback_id, result=callback_result.encode() + ) + + # Wait for the execution to complete + result = durable_runner.wait_for_result(execution_arn=execution_arn) + + assert result.status is InvocationStatus.SUCCEEDED + + result_data = deserialize_operation_payload(result.result) + + # Verify all expected fields + assert result_data["stepResult"] == {"userId": 123, "name": "John Doe"} + assert result_data["callbackResult"] == callback_result + assert result_data["finalStep"]["status"] == "completed" + assert isinstance(result_data["finalStep"]["timestamp"], int) + assert result_data["workflowCompleted"] is True + + # Verify all operations were tracked - should have wait, step, waitForCallback (context + callback + submitter), wait, step + completed_operations = [ + op for op in result.get_all_operations() if op.status.value == "SUCCEEDED" + ] + assert len(completed_operations) == 7 diff --git a/examples/test/wait_for_callback/test_wait_for_callback_multiple_invocations.py b/examples/test/wait_for_callback/test_wait_for_callback_multiple_invocations.py new file mode 100644 index 0000000..8c297dc --- /dev/null +++ b/examples/test/wait_for_callback/test_wait_for_callback_multiple_invocations.py @@ -0,0 +1,74 @@ +"""Tests for wait_for_callback_multiple_invocations.""" + +import json +import time + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus + +from src.wait_for_callback import ( + wait_for_callback_multiple_invocations, +) +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=wait_for_callback_multiple_invocations.handler, + lambda_function_name="Wait For Callback Multiple Invocations", +) +def test_handle_multiple_invocations_tracking_with_wait_for_callback_operations( + durable_runner, +): + """Test multiple invocations tracking with waitForCallback operations.""" + test_payload = {"test": "multiple-invocations"} + + with durable_runner: + # Start the execution (this will pause at callbacks) + execution_arn = durable_runner.run_async(input=test_payload, timeout=60) + + # Wait for first callback and get callback_id + first_callback_id = durable_runner.wait_for_callback( + execution_arn=execution_arn + ) + + # Complete first callback + first_callback_result = json.dumps({"step": 1}) + durable_runner.send_callback_success( + callback_id=first_callback_id, result=first_callback_result.encode() + ) + + # Wait for second callback and get callback_id + second_callback_id = durable_runner.wait_for_callback( + execution_arn=execution_arn, name="second-callback create callback id" + ) + + # Complete second callback + second_callback_result = json.dumps({"step": 2}) + durable_runner.send_callback_success( + callback_id=second_callback_id, result=second_callback_result.encode() + ) + + # Wait for the execution to complete + result = durable_runner.wait_for_result(execution_arn=execution_arn) + + assert result.status is InvocationStatus.SUCCEEDED + + result_data = deserialize_operation_payload(result.result) + + assert result_data == { + "firstCallback": '{"step": 1}', + "secondCallback": '{"step": 2}', + "stepResult": {"processed": True, "step": 1}, + "invocationCount": "multiple", + } + + # Verify invocations were tracked - should be exactly 5 invocations + # Note: Check if Python SDK provides invocations tracking + if hasattr(result, "invocations"): + invocations = result.invocations + assert len(invocations) == 5 + + # Verify operations were executed + operations = result.operations + assert len(operations) > 4 # wait + callback + step + wait + callback operations diff --git a/examples/test/wait_for_callback/test_wait_for_callback_nested.py b/examples/test/wait_for_callback/test_wait_for_callback_nested.py new file mode 100644 index 0000000..2c1c941 --- /dev/null +++ b/examples/test/wait_for_callback/test_wait_for_callback_nested.py @@ -0,0 +1,101 @@ +"""Tests for wait_for_callback_nested.""" + +import json + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus + +from src.wait_for_callback import wait_for_callback_nested +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=wait_for_callback_nested.handler, + lambda_function_name="Wait For Callback Nested", +) +def test_handle_nested_wait_for_callback_operations_in_child_contexts(durable_runner): + """Test nested waitForCallback operations in child contexts.""" + with durable_runner: + # Start the execution (this will pause at callbacks) + execution_arn = durable_runner.run_async(input=None, timeout=60) + + # Complete outer callback first + outer_callback_id = durable_runner.wait_for_callback( + execution_arn=execution_arn + ) + outer_callback_result = json.dumps({"level": "outer-completed"}) + durable_runner.send_callback_success( + callback_id=outer_callback_id, result=outer_callback_result.encode() + ) + + # Complete inner callback + inner_callback_id = durable_runner.wait_for_callback( + execution_arn=execution_arn, name="inner-callback-op create callback id" + ) + inner_callback_result = json.dumps({"level": "inner-completed"}) + durable_runner.send_callback_success( + callback_id=inner_callback_id, result=inner_callback_result.encode() + ) + + # Complete nested callback + nested_callback_id = durable_runner.wait_for_callback( + execution_arn=execution_arn, name="nested-callback-op create callback id" + ) + nested_callback_result = json.dumps({"level": "nested-completed"}) + durable_runner.send_callback_success( + callback_id=nested_callback_id, result=nested_callback_result.encode() + ) + + # Wait for the execution to complete + result = durable_runner.wait_for_result(execution_arn=execution_arn) + + assert result.status is InvocationStatus.SUCCEEDED + + result_data = deserialize_operation_payload(result.result) + + assert result_data == { + "outerCallback": outer_callback_result, + "nestedResults": { + "innerCallback": inner_callback_result, + "deepNested": { + "nestedCallback": nested_callback_result, + "deepLevel": "inner-child", + }, + "level": "outer-child", + }, + } + + # Get all operations including nested ones + all_ops = result.get_all_operations() + + # Find the outer context operation + outer_context_ops = [ + op + for op in result.operations + if op.operation_type.value == "CONTEXT" and op.name == "outer-child-context" + ] + assert len(outer_context_ops) == 1 + outer_context_op = outer_context_ops[0] + + # Verify outer child operations hierarchy + outer_children = outer_context_op.child_operations + assert outer_children is not None + assert len(outer_children) == 2 # inner callback + inner context + + # Find the inner context operation + inner_context_ops = [ + op + for op in all_ops + if op.operation_type.value == "CONTEXT" and op.name == "inner-child-context" + ] + assert len(inner_context_ops) == 1 + inner_context_op = inner_context_ops[0] + + # Verify inner child operations hierarchy + inner_children = inner_context_op.child_operations + assert inner_children is not None + assert len(inner_children) == 2 # deep wait + nested callback + + # Should have tracked all operations + assert len(all_ops) == 12 diff --git a/examples/test/wait_for_callback/test_wait_for_callback_serdes.py b/examples/test/wait_for_callback/test_wait_for_callback_serdes.py new file mode 100644 index 0000000..1333f88 --- /dev/null +++ b/examples/test/wait_for_callback/test_wait_for_callback_serdes.py @@ -0,0 +1,66 @@ +"""Tests for wait_for_callback_serdes.""" + +import json +from datetime import datetime, timezone + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus + +from src.wait_for_callback import wait_for_callback_serdes +from src.wait_for_callback.wait_for_callback_serdes import CustomSerdes +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=wait_for_callback_serdes.handler, + lambda_function_name="Wait For Callback Serdes", +) +def test_handle_wait_for_callback_with_custom_serdes_configuration(durable_runner): + """Test waitForCallback with custom serdes configuration.""" + with durable_runner: + # Start the execution (this will pause at the callback) + execution_arn = durable_runner.run_async(input=None, timeout=30) + + # Wait for callback and get callback_id + callback_id = durable_runner.wait_for_callback(execution_arn=execution_arn) + + # Send data that requires custom serialization + test_data = { + "id": 42, + "message": "Hello Custom Serdes", + "timestamp": datetime(2025, 6, 15, 12, 30, 45, tzinfo=timezone.utc), + "metadata": { + "version": "2.0.0", + "processed": True, + }, + } + + # Serialize the data using custom serdes for sending + custom_serdes = CustomSerdes() + serialized_data = custom_serdes.serialize(test_data) + durable_runner.send_callback_success( + callback_id=callback_id, result=serialized_data.encode() + ) + + # Wait for the execution to complete + result = durable_runner.wait_for_result(execution_arn=execution_arn) + + assert result.status is InvocationStatus.SUCCEEDED + + result_data = deserialize_operation_payload(result.result) + + # The result will always get stringified since it's the lambda response + # DateTime will be serialized to ISO string in the final result + assert result_data["receivedData"]["id"] == 42 + assert result_data["receivedData"]["message"] == "Hello Custom Serdes" + assert "2025-06-15T12:30:45" in result_data["receivedData"]["timestamp"] + assert result_data["receivedData"]["metadata"]["version"] == "2.0.0" + assert result_data["receivedData"]["metadata"]["processed"] is True + assert result_data["isDateObject"] is True + + # Should have completed operations with successful callback + completed_operations = [ + op for op in result.operations if op.status.value == "SUCCEEDED" + ] + assert len(completed_operations) > 0 diff --git a/examples/test/wait_for_callback/test_wait_for_callback_submitter_failure.py b/examples/test/wait_for_callback/test_wait_for_callback_submitter_failure.py new file mode 100644 index 0000000..e4463c8 --- /dev/null +++ b/examples/test/wait_for_callback/test_wait_for_callback_submitter_failure.py @@ -0,0 +1,32 @@ +"""Tests for wait_for_callback_submitter_retry_success.""" + +import json + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus + +from src.wait_for_callback import ( + wait_for_callback_submitter_failure, +) + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=wait_for_callback_submitter_failure.handler, + lambda_function_name="Wait For Callback Submitter Failure", +) +def test_fail_after_exhausting_retries_when_submitter_always_fails(durable_runner): + """Test that execution fails after exhausting retries when submitter always fails.""" + test_payload = {"shouldFail": True} + + with durable_runner: + execution_arn = durable_runner.run_async(input=test_payload, timeout=30) + result = durable_runner.wait_for_result(execution_arn=execution_arn) + + # Execution should fail after retries are exhausted + assert result.status is InvocationStatus.FAILED + + # Verify error details + error = result.error + assert error is not None + assert "Simulated submitter failure" in error.message diff --git a/examples/test/wait_for_callback/test_wait_for_callback_submitter_failure_catchable.py b/examples/test/wait_for_callback/test_wait_for_callback_submitter_failure_catchable.py new file mode 100644 index 0000000..b3458d0 --- /dev/null +++ b/examples/test/wait_for_callback/test_wait_for_callback_submitter_failure_catchable.py @@ -0,0 +1,28 @@ +"""Tests for wait_for_callback_failing_submitter.""" + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus + +from src.wait_for_callback import wait_for_callback_submitter_failure_catchable +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=wait_for_callback_submitter_failure_catchable.handler, + lambda_function_name="Wait For Callback Failing Submitter Catchable", +) +def test_handle_wait_for_callback_with_failing_submitter_function_errors( + durable_runner, +): + """Test waitForCallback with failing submitter function errors.""" + with durable_runner: + execution_arn = durable_runner.run_async(input=None, timeout=30) + result = durable_runner.wait_for_result(execution_arn=execution_arn) + + result_data = deserialize_operation_payload(result.result) + + assert result_data == { + "success": False, + "error": "Submitter failed", + } diff --git a/src/aws_durable_execution_sdk_python_testing/runner.py b/src/aws_durable_execution_sdk_python_testing/runner.py index aa3a39c..99cc049 100644 --- a/src/aws_durable_execution_sdk_python_testing/runner.py +++ b/src/aws_durable_execution_sdk_python_testing/runner.py @@ -555,6 +555,18 @@ def get_invoke(self, name: str) -> InvokeOperation: def get_execution(self, name: str) -> ExecutionOperation: return cast(ExecutionOperation, self.get_operation_by_name(name)) + def get_all_operations(self) -> list[Operation]: + """Recursively get all operations including nested ones.""" + all_ops = [] + stack = list(self.operations) + while stack: + op = stack.pop() + all_ops.append(op) + # Add child operations to stack (if they exist) + if hasattr(op, "child_operations") and op.child_operations: + stack.extend(op.child_operations) + return all_ops + class DurableFunctionTestRunner: def __init__(self, handler: Callable, poll_interval: float = 1.0):