From a24c5fb20e1fc52393be5b439c169149d1e940c8 Mon Sep 17 00:00:00 2001 From: Rares Polenciuc Date: Thu, 20 Nov 2025 14:09:45 +0000 Subject: [PATCH] fix: callback timeout handling --- .../checkpoint/processors/callback.py | 18 ++- .../execution.py | 26 ++++ .../executor.py | 76 ++++------- .../observer.py | 18 ++- tests/executor_test.py | 126 +++++------------ tests/how-to-run-from-term.txt | 1 + tests/observer_test.py | 10 +- tests/pending_operation_test.py | 129 ++++++++++++++++++ 8 files changed, 255 insertions(+), 149 deletions(-) create mode 100644 tests/how-to-run-from-term.txt create mode 100644 tests/pending_operation_test.py diff --git a/src/aws_durable_execution_sdk_python_testing/checkpoint/processors/callback.py b/src/aws_durable_execution_sdk_python_testing/checkpoint/processors/callback.py index c1a0ec7..48c2f01 100644 --- a/src/aws_durable_execution_sdk_python_testing/checkpoint/processors/callback.py +++ b/src/aws_durable_execution_sdk_python_testing/checkpoint/processors/callback.py @@ -12,6 +12,7 @@ OperationUpdate, CallbackDetails, OperationType, + CallbackOptions, ) from aws_durable_execution_sdk_python_testing.checkpoint.processors.base import ( OperationProcessor, @@ -43,12 +44,6 @@ def process( operation_id=update.operation_id, ) - notifier.notify_callback_created( - execution_arn=execution_arn, - operation_id=update.operation_id, - callback_token=callback_token, - ) - callback_id: str = callback_token.to_str() callback_details: CallbackDetails | None = ( @@ -60,11 +55,15 @@ def process( if update.operation_type == OperationType.CALLBACK else None ) + status: OperationStatus = OperationStatus.STARTED + start_time: datetime.datetime | None = self._get_start_time(current_op) + end_time: datetime.datetime | None = self._get_end_time( current_op, status ) + operation: Operation = Operation( operation_id=update.operation_id, parent_id=update.parent_id, @@ -76,7 +75,14 @@ def process( sub_type=update.sub_type, callback_details=callback_details, ) + callback_options: CallbackOptions | None = update.callback_options + notifier.notify_callback_created( + execution_arn=execution_arn, + operation_id=update.operation_id, + callback_options=callback_options, + callback_token=callback_token, + ) return operation case _: msg: str = "Invalid action for CALLBACK operation." diff --git a/src/aws_durable_execution_sdk_python_testing/execution.py b/src/aws_durable_execution_sdk_python_testing/execution.py index 8583efb..113db5b 100644 --- a/src/aws_durable_execution_sdk_python_testing/execution.py +++ b/src/aws_durable_execution_sdk_python_testing/execution.py @@ -381,6 +381,32 @@ def complete_callback_failure( ) return self.operations[index] + def complete_callback_timeout( + self, callback_id: str, error: ErrorObject + ) -> Operation: + """Complete CALLBACK operation with timeout.""" + index, operation = self.find_callback_operation(callback_id) + + if operation.status != OperationStatus.STARTED: + msg: str = f"Callback operation [{callback_id}] is not in STARTED state" + raise IllegalStateException(msg) + + with self._state_lock: + self._token_sequence += 1 + updated_callback_details = None + if operation.callback_details: + updated_callback_details = replace( + operation.callback_details, error=error + ) + + self.operations[index] = replace( + operation, + status=OperationStatus.TIMED_OUT, + end_timestamp=datetime.now(UTC), + callback_details=updated_callback_details, + ) + return self.operations[index] + def _end_execution(self, status: OperationStatus) -> None: """Set the end_timestamp on the main EXECUTION operation when execution completes.""" execution_op: Operation = self.get_operation_execution_started() diff --git a/src/aws_durable_execution_sdk_python_testing/executor.py b/src/aws_durable_execution_sdk_python_testing/executor.py index 3c398f6..e707366 100644 --- a/src/aws_durable_execution_sdk_python_testing/executor.py +++ b/src/aws_durable_execution_sdk_python_testing/executor.py @@ -19,6 +19,7 @@ OperationUpdate, OperationStatus, OperationType, + CallbackOptions, ) from aws_durable_execution_sdk_python_testing.exceptions import ( @@ -57,6 +58,7 @@ if TYPE_CHECKING: from collections.abc import Awaitable, Callable + from concurrent.futures import Future from aws_durable_execution_sdk_python_testing.checkpoint.processor import ( CheckpointProcessor, @@ -84,10 +86,8 @@ def __init__( self._invoker = invoker self._checkpoint_processor = checkpoint_processor self._completion_events: dict[str, Event] = {} - self._callback_timeouts: dict[str, Event] = {} # callback_id -> timeout event - self._callback_heartbeats: dict[ - str, Event - ] = {} # callback_id -> heartbeat event + self._callback_timeouts: dict[str, Future] = {} + self._callback_heartbeats: dict[str, Future] = {} def start_execution( self, @@ -1011,7 +1011,11 @@ def retry_handler() -> None: ) def on_callback_created( - self, execution_arn: str, operation_id: str, callback_token: CallbackToken + self, + execution_arn: str, + operation_id: str, + callback_options: CallbackOptions | None, + callback_token: CallbackToken, ) -> None: """Handle callback creation. Observer method triggered by notifier.""" callback_id = callback_token.to_str() @@ -1023,34 +1027,19 @@ def on_callback_created( ) # Schedule callback timeouts if configured - self._schedule_callback_timeouts(execution_arn, operation_id, callback_id) + self._schedule_callback_timeouts(execution_arn, callback_options, callback_id) # endregion ExecutionObserver # region Callback Timeouts def _schedule_callback_timeouts( - self, execution_arn: str, operation_id: str, callback_id: str + self, + execution_arn: str, + callback_options: CallbackOptions | None, + callback_id: str, ) -> None: """Schedule callback timeout and heartbeat timeout if configured.""" try: - execution = self.get_execution(execution_arn) - _, operation = execution.find_operation(operation_id) - - if not operation.callback_details: - return - - # Find the callback options from the operation update that created this callback - # We need to look at the checkpoint updates to find the original callback options - callback_options = None - for update in execution.updates: - if ( - update.operation_id == operation_id - and update.callback_options - and update.action.value == "START" - ): - callback_options = update.callback_options - break - if not callback_options: return @@ -1062,13 +1051,12 @@ def _schedule_callback_timeouts( def timeout_handler(): self._on_callback_timeout(execution_arn, callback_id) - timeout_event = self._scheduler.create_event() - self._callback_timeouts[callback_id] = timeout_event - self._scheduler.call_later( + timeout_future = self._scheduler.call_later( timeout_handler, delay=callback_options.timeout_seconds, completion_event=completion_event, ) + self._callback_timeouts[callback_id] = timeout_future # Schedule heartbeat timeout if configured if callback_options.heartbeat_timeout_seconds > 0: @@ -1076,13 +1064,12 @@ def timeout_handler(): def heartbeat_timeout_handler(): self._on_callback_heartbeat_timeout(execution_arn, callback_id) - heartbeat_event = self._scheduler.create_event() - self._callback_heartbeats[callback_id] = heartbeat_event - self._scheduler.call_later( + heartbeat_future = self._scheduler.call_later( heartbeat_timeout_handler, delay=callback_options.heartbeat_timeout_seconds, completion_event=completion_event, ) + self._callback_heartbeats[callback_id] = heartbeat_future except Exception: logger.exception( @@ -1096,16 +1083,14 @@ def _reset_callback_heartbeat_timeout( ) -> None: """Reset the heartbeat timeout for a callback.""" # Cancel existing heartbeat timeout - if heartbeat_event := self._callback_heartbeats.get(callback_id): - heartbeat_event.remove() - del self._callback_heartbeats[callback_id] + if heartbeat_future := self._callback_heartbeats.pop(callback_id, None): + heartbeat_future.cancel() # Find callback options to reschedule heartbeat timeout try: callback_token = CallbackToken.from_str(callback_id) execution = self.get_execution(callback_token.execution_arn) - # Find callback options from updates callback_options = None for update in execution.updates: if ( @@ -1122,13 +1107,14 @@ def heartbeat_timeout_handler(): self._on_callback_heartbeat_timeout(execution_arn, callback_id) completion_event = self._completion_events.get(execution_arn) - heartbeat_event = self._scheduler.create_event() - self._callback_heartbeats[callback_id] = heartbeat_event - self._scheduler.call_later( + + heartbeat_future = self._scheduler.call_later( heartbeat_timeout_handler, delay=callback_options.heartbeat_timeout_seconds, completion_event=completion_event, ) + self._callback_heartbeats[callback_id] = heartbeat_future + except Exception: logger.exception( "[%s] Error resetting callback heartbeat timeout for %s", @@ -1139,14 +1125,12 @@ def heartbeat_timeout_handler(): def _cleanup_callback_timeouts(self, callback_id: str) -> None: """Clean up timeout events for a completed callback.""" # Clean up main timeout - if timeout_event := self._callback_timeouts.get(callback_id): - timeout_event.remove() - del self._callback_timeouts[callback_id] + if timeout_future := self._callback_timeouts.pop(callback_id, None): + timeout_future.cancel() # Clean up heartbeat timeout - if heartbeat_event := self._callback_heartbeats.get(callback_id): - heartbeat_event.remove() - del self._callback_heartbeats[callback_id] + if heartbeat_future := self._callback_heartbeats.pop(callback_id, None): + heartbeat_future.cancel() def _on_callback_timeout(self, execution_arn: str, callback_id: str) -> None: """Handle callback timeout.""" @@ -1161,7 +1145,7 @@ def _on_callback_timeout(self, execution_arn: str, callback_id: str) -> None: timeout_error = ErrorObject.from_message( f"Callback timed out: {CallbackTimeoutType.TIMEOUT.value}" ) - execution.complete_callback_failure(callback_id, timeout_error) + execution.complete_callback_timeout(callback_id, timeout_error) execution.complete_fail(timeout_error) self._store.update(execution) logger.warning("[%s] Callback %s timed out", execution_arn, callback_id) @@ -1188,7 +1172,7 @@ def _on_callback_heartbeat_timeout( heartbeat_error = ErrorObject.from_message( f"Callback heartbeat timed out: {CallbackTimeoutType.HEARTBEAT.value}" ) - execution.complete_callback_failure(callback_id, heartbeat_error) + execution.complete_callback_timeout(callback_id, heartbeat_error) execution.complete_fail(heartbeat_error) self._store.update(execution) logger.warning( diff --git a/src/aws_durable_execution_sdk_python_testing/observer.py b/src/aws_durable_execution_sdk_python_testing/observer.py index 3a21fd5..1b518ce 100644 --- a/src/aws_durable_execution_sdk_python_testing/observer.py +++ b/src/aws_durable_execution_sdk_python_testing/observer.py @@ -11,7 +11,10 @@ if TYPE_CHECKING: from collections.abc import Callable - from aws_durable_execution_sdk_python.lambda_service import ErrorObject + from aws_durable_execution_sdk_python.lambda_service import ( + ErrorObject, + CallbackOptions, + ) class ExecutionObserver(ABC): @@ -47,7 +50,11 @@ def on_step_retry_scheduled( @abstractmethod def on_callback_created( - self, execution_arn: str, operation_id: str, callback_token: CallbackToken + self, + execution_arn: str, + operation_id: str, + callback_options: CallbackOptions | None, + callback_token: CallbackToken, ) -> None: """Called when callback is created.""" @@ -119,13 +126,18 @@ def notify_step_retry_scheduled( ) def notify_callback_created( - self, execution_arn: str, operation_id: str, callback_token: CallbackToken + self, + execution_arn: str, + operation_id: str, + callback_options: CallbackOptions | None, + callback_token: CallbackToken, ) -> None: """Notify observers about callback creation.""" self._notify_observers( ExecutionObserver.on_callback_created, execution_arn=execution_arn, operation_id=operation_id, + callback_options=callback_options, callback_token=callback_token, ) diff --git a/tests/executor_test.py b/tests/executor_test.py index 008a4a0..1d8e0f6 100644 --- a/tests/executor_test.py +++ b/tests/executor_test.py @@ -84,7 +84,11 @@ def on_step_retry_scheduled( } def on_callback_created( - self, execution_arn: str, operation_id: str, callback_token: CallbackToken + self, + execution_arn: str, + operation_id: str, + callback_options: CallbackOptions | None, + callback_token: CallbackToken, ) -> None: """Capture callback creation events.""" self.callback_creations[execution_arn] = { @@ -2491,41 +2495,17 @@ def test_complete_events_no_event(executor): def test_callback_timeout_scheduling(executor, mock_store, mock_scheduler): """Test that callback timeouts are scheduled when callback is created.""" - # Create mock execution with callback operation and updates - mock_execution = Mock() - mock_execution.durable_execution_arn = "test-arn" - - # Create callback operation with details - callback_operation = Operation( - operation_id="op-123", - operation_type=OperationType.CALLBACK, - status=OperationStatus.STARTED, - callback_details=CallbackDetails(callback_id="callback-id"), - ) - mock_execution.find_operation.return_value = (0, callback_operation) - - # Create callback update with timeout options + # Create callback options with both timeouts callback_options = CallbackOptions(timeout_seconds=60, heartbeat_timeout_seconds=30) - update = OperationUpdate( - operation_id="op-123", - operation_type=OperationType.CALLBACK, - action=OperationAction.START, - callback_options=callback_options, - ) - mock_execution.updates = [update] - - mock_store.load.return_value = mock_execution - mock_scheduler.create_event.return_value = Mock() # Set up completion event executor._completion_events["test-arn"] = Mock() - # Test the timeout scheduling directly - executor._schedule_callback_timeouts("test-arn", "op-123", "callback-id") + # Test the timeout scheduling directly with correct parameters + executor._schedule_callback_timeouts("test-arn", callback_options, "callback-id") # Verify scheduler was called for both timeouts assert mock_scheduler.call_later.call_count == 2 # main timeout + heartbeat timeout - assert mock_scheduler.create_event.call_count == 2 # events for both timeouts def test_callback_timeout_cleanup(executor, mock_store): @@ -2540,9 +2520,9 @@ def test_callback_timeout_cleanup(executor, mock_store): # Trigger cleanup executor._cleanup_callback_timeouts("callback-id") - # Verify events were removed and cleaned up - timeout_event.remove.assert_called_once() - heartbeat_event.remove.assert_called_once() + # Verify events were cancelled and removed + timeout_event.cancel.assert_called_once() + heartbeat_event.cancel.assert_called_once() assert "callback-id" not in executor._callback_timeouts assert "callback-id" not in executor._callback_heartbeats @@ -2575,8 +2555,8 @@ def test_callback_heartbeat_timeout_reset(executor, mock_store, mock_scheduler): # Reset heartbeat timeout executor._reset_callback_heartbeat_timeout(callback_id, "test-arn") - # Verify old event was removed and new one scheduled - old_event.remove.assert_called_once() + # Verify old event was cancelled and new one scheduled + old_event.cancel.assert_called_once() mock_scheduler.call_later.assert_called() @@ -2591,22 +2571,26 @@ def test_callback_timeout_handlers(executor, mock_store): mock_execution.is_complete = False mock_store.load.return_value = mock_execution - with patch.object(executor, "_invoke_execution"): - # Test main timeout handler - executor._on_callback_timeout("test-arn", callback_id) + # Test main timeout handler + executor._on_callback_timeout("test-arn", callback_id) + + # Verify callback was failed with timeout error + mock_execution.complete_callback_timeout.assert_called() + timeout_error = mock_execution.complete_callback_timeout.call_args[0][1] + assert "Callback timed out" in str(timeout_error.message) + mock_execution.complete_fail.assert_called() - # Verify callback was failed with timeout error - mock_execution.complete_callback_failure.assert_called() - timeout_error = mock_execution.complete_callback_failure.call_args[0][1] - assert "Callback.Timeout" in str(timeout_error.message) + # Reset mocks for heartbeat test + mock_execution.reset_mock() - # Test heartbeat timeout handler - executor._on_callback_heartbeat_timeout("test-arn", callback_id) + # Test heartbeat timeout handler + executor._on_callback_heartbeat_timeout("test-arn", callback_id) - # Verify callback was failed with heartbeat timeout error - assert mock_execution.complete_callback_failure.call_count == 2 - heartbeat_error = mock_execution.complete_callback_failure.call_args[0][1] - assert "Callback.Heartbeat" in str(heartbeat_error.message) + # Verify callback was failed with heartbeat timeout error + mock_execution.complete_callback_timeout.assert_called() + heartbeat_error = mock_execution.complete_callback_timeout.call_args[0][1] + assert "Callback heartbeat timed out" in str(heartbeat_error.message) + mock_execution.complete_fail.assert_called() def test_callback_timeout_completed_execution(executor, mock_store): @@ -2626,7 +2610,7 @@ def test_callback_timeout_completed_execution(executor, mock_store): executor._on_callback_heartbeat_timeout("test-arn", callback_id) # Verify no callback operations were performed - mock_execution.complete_callback_failure.assert_not_called() + mock_execution.complete_callback_timeout.assert_not_called() mock_store.update.assert_not_called() @@ -2717,32 +2701,12 @@ def test_schedule_callback_timeouts_only_main_timeout( ): """Test _schedule_callback_timeouts with only main timeout configured.""" - # Create operation with callback details - operation = Operation( - operation_id="op-123", - operation_type=OperationType.CALLBACK, - status=OperationStatus.STARTED, - callback_details=CallbackDetails(callback_id="callback-id"), - ) - - mock_execution = Mock() - mock_execution.find_operation.return_value = (0, operation) - - # Create update with only main timeout + # Create callback options with only main timeout callback_options = CallbackOptions(timeout_seconds=60, heartbeat_timeout_seconds=0) - update = OperationUpdate( - operation_id="op-123", - operation_type=OperationType.CALLBACK, - action=OperationAction.START, - callback_options=callback_options, - ) - mock_execution.updates = [update] - mock_store.load.return_value = mock_execution - mock_scheduler.create_event.return_value = Mock() executor._completion_events["test-arn"] = Mock() - executor._schedule_callback_timeouts("test-arn", "op-123", "callback-id") + executor._schedule_callback_timeouts("test-arn", callback_options, "callback-id") # Only main timeout should be scheduled assert mock_scheduler.call_later.call_count == 1 @@ -2754,32 +2718,12 @@ def test_schedule_callback_timeouts_only_heartbeat_timeout( executor, mock_store, mock_scheduler ): """Test _schedule_callback_timeouts with only heartbeat timeout configured.""" - # Create operation with callback details - operation = Operation( - operation_id="op-123", - operation_type=OperationType.CALLBACK, - status=OperationStatus.STARTED, - callback_details=CallbackDetails(callback_id="callback-id"), - ) - - mock_execution = Mock() - mock_execution.find_operation.return_value = (0, operation) - - # Create update with only heartbeat timeout + # Create callback options with only heartbeat timeout callback_options = CallbackOptions(timeout_seconds=0, heartbeat_timeout_seconds=30) - update = OperationUpdate( - operation_id="op-123", - operation_type=OperationType.CALLBACK, - action=OperationAction.START, - callback_options=callback_options, - ) - mock_execution.updates = [update] - mock_store.load.return_value = mock_execution - mock_scheduler.create_event.return_value = Mock() executor._completion_events["test-arn"] = Mock() - executor._schedule_callback_timeouts("test-arn", "op-123", "callback-id") + executor._schedule_callback_timeouts("test-arn", callback_options, "callback-id") # Only heartbeat timeout should be scheduled assert mock_scheduler.call_later.call_count == 1 diff --git a/tests/how-to-run-from-term.txt b/tests/how-to-run-from-term.txt new file mode 100644 index 0000000..1301cdd --- /dev/null +++ b/tests/how-to-run-from-term.txt @@ -0,0 +1 @@ +source /Users/rarepolz/workspace/aws-durable-execution/venv/bin/activate && pip install -e . --no-deps && pytest tests/event_conversion_test.py -v \ No newline at end of file diff --git a/tests/observer_test.py b/tests/observer_test.py index 4847eee..193f395 100644 --- a/tests/observer_test.py +++ b/tests/observer_test.py @@ -5,7 +5,7 @@ from unittest.mock import Mock import pytest -from aws_durable_execution_sdk_python.lambda_service import ErrorObject +from aws_durable_execution_sdk_python.lambda_service import ErrorObject, CallbackOptions from aws_durable_execution_sdk_python_testing.observer import ( ExecutionNotifier, @@ -49,10 +49,14 @@ def on_step_retry_scheduled( self.on_step_retry_scheduled_calls.append((execution_arn, operation_id, delay)) def on_callback_created( - self, execution_arn: str, operation_id: str, callback_token: CallbackToken + self, + execution_arn: str, + operation_id: str, + callback_options: CallbackOptions | None, + callback_token: CallbackToken, ) -> None: self.on_callback_created_calls.append( - (execution_arn, operation_id, callback_token) + (execution_arn, operation_id, callback_options, callback_token) ) diff --git a/tests/pending_operation_test.py b/tests/pending_operation_test.py new file mode 100644 index 0000000..d508fbc --- /dev/null +++ b/tests/pending_operation_test.py @@ -0,0 +1,129 @@ +# """Test for pending operation handling in get_execution_history.""" +# +# from datetime import UTC, datetime +# from unittest.mock import Mock +# +# from aws_durable_execution_sdk_python.lambda_service import ( +# OperationStatus, +# OperationType, +# ) +# +# from aws_durable_execution_sdk_python_testing.executor import Executor +# from aws_durable_execution_sdk_python_testing.model import StartDurableExecutionInput +# +# +# def test_get_execution_history_with_pending_chained_invoke(): +# """Test get_execution_history handles pending CHAINED_INVOKE operations correctly.""" +# # Create mocks +# mock_store = Mock() +# mock_scheduler = Mock() +# mock_invoker = Mock() +# mock_checkpoint_processor = Mock() +# +# executor = Executor(mock_store, mock_scheduler, mock_invoker, mock_checkpoint_processor) +# +# # Create mock execution +# mock_execution = Mock() +# mock_execution.durable_execution_arn = "test-arn" +# mock_execution.start_input = StartDurableExecutionInput( +# account_id="123", +# function_name="test", +# function_qualifier="$LATEST", +# execution_name="test", +# execution_timeout_seconds=300, +# execution_retention_period_days=7, +# ) +# mock_execution.result = None +# mock_execution.updates = [] +# +# # Create a pending CHAINED_INVOKE operation with start_timestamp +# pending_op = Mock() +# pending_op.operation_id = "invoke-1" +# pending_op.operation_type = OperationType.CHAINED_INVOKE +# pending_op.status = OperationStatus.PENDING +# pending_op.start_timestamp = datetime.now(UTC) +# pending_op.end_timestamp = None +# +# # Create a non-CHAINED_INVOKE pending operation (should be skipped) +# pending_step = Mock() +# pending_step.operation_id = "step-1" +# pending_step.operation_type = OperationType.STEP +# pending_step.status = OperationStatus.PENDING +# pending_step.start_timestamp = datetime.now(UTC) +# pending_step.end_timestamp = None +# +# # Create a CHAINED_INVOKE pending operation without start_timestamp (should be skipped) +# pending_invoke_no_timestamp = Mock() +# pending_invoke_no_timestamp.operation_id = "invoke-2" +# pending_invoke_no_timestamp.operation_type = OperationType.CHAINED_INVOKE +# pending_invoke_no_timestamp.status = OperationStatus.PENDING +# pending_invoke_no_timestamp.start_timestamp = None +# pending_invoke_no_timestamp.end_timestamp = None +# +# mock_execution.operations = [pending_op, pending_step, pending_invoke_no_timestamp] +# mock_store.load.return_value = mock_execution +# +# # Call get_execution_history +# result = executor.get_execution_history("test-arn", include_execution_data=True) +# +# # Should have 2 events: 1 pending event + 1 started event for the valid pending CHAINED_INVOKE +# assert len(result.events) == 2 +# +# # First event should be the pending event +# assert result.events[0].event_type == "ChainedInvokeStarted" +# assert result.events[0].operation_id == "invoke-1" +# assert result.events[0].chained_invoke_pending_details is not None +# +# # Second event should be the started event +# assert result.events[1].event_type == "ChainedInvokeStarted" +# assert result.events[1].operation_id == "invoke-1" +# assert result.events[1].chained_invoke_started_details is not None +# +# +# def test_get_execution_history_skips_invalid_pending_operations(): +# """Test that invalid pending operations are skipped.""" +# # Create mocks +# mock_store = Mock() +# mock_scheduler = Mock() +# mock_invoker = Mock() +# mock_checkpoint_processor = Mock() +# +# executor = Executor(mock_store, mock_scheduler, mock_invoker, mock_checkpoint_processor) +# +# # Create mock execution +# mock_execution = Mock() +# mock_execution.durable_execution_arn = "test-arn" +# mock_execution.start_input = StartDurableExecutionInput( +# account_id="123", +# function_name="test", +# function_qualifier="$LATEST", +# execution_name="test", +# execution_timeout_seconds=300, +# execution_retention_period_days=7, +# ) +# mock_execution.result = None +# mock_execution.updates = [] +# +# # Create operations that should be skipped +# # 1. Non-CHAINED_INVOKE pending operation +# pending_step = Mock() +# pending_step.operation_id = "step-1" +# pending_step.operation_type = OperationType.STEP +# pending_step.status = OperationStatus.PENDING +# pending_step.start_timestamp = datetime.now(UTC) +# +# # 2. CHAINED_INVOKE pending operation without start_timestamp +# pending_invoke_no_timestamp = Mock() +# pending_invoke_no_timestamp.operation_id = "invoke-1" +# pending_invoke_no_timestamp.operation_type = OperationType.CHAINED_INVOKE +# pending_invoke_no_timestamp.status = OperationStatus.PENDING +# pending_invoke_no_timestamp.start_timestamp = None +# +# mock_execution.operations = [pending_step, pending_invoke_no_timestamp] +# mock_store.load.return_value = mock_execution +# +# # Call get_execution_history +# result = executor.get_execution_history("test-arn") +# +# # Should have no events since all pending operations are invalid +# assert len(result.events) == 0