From 6921dd6ee8afd6d805792574fa371471e233ae0b Mon Sep 17 00:00:00 2001 From: Brent Champion Date: Tue, 2 Dec 2025 01:19:25 -0500 Subject: [PATCH 1/2] fix: timeout executions based on execution timeout --- .../executor.py | 24 ++++ tests/executor_test.py | 135 ++++++++++-------- 2 files changed, 99 insertions(+), 60 deletions(-) diff --git a/src/aws_durable_execution_sdk_python_testing/executor.py b/src/aws_durable_execution_sdk_python_testing/executor.py index 70bcfa5..0afdfcc 100644 --- a/src/aws_durable_execution_sdk_python_testing/executor.py +++ b/src/aws_durable_execution_sdk_python_testing/executor.py @@ -90,6 +90,7 @@ def __init__( self._completion_events: dict[str, Event] = {} self._callback_timeouts: dict[str, Future] = {} self._callback_heartbeats: dict[str, Future] = {} + self._execution_timeout: Future | None = None def start_execution( self, @@ -118,6 +119,26 @@ def start_execution( completion_event = self._scheduler.create_event() self._completion_events[execution.durable_execution_arn] = completion_event + # Schedule execution timeout + try: + timeout_seconds = input.execution_timeout_seconds + if timeout_seconds and timeout_seconds > 0: + + def timeout_handler(): + error = ErrorObject.from_message( + f"Execution timed out after {timeout_seconds} seconds." + ) + self.on_timed_out(execution.durable_execution_arn, error) + + self._execution_timeout = self._scheduler.call_later( + timeout_handler, + delay=timeout_seconds, + completion_event=completion_event, + ) + except (AttributeError, TypeError): + # Handle Mock objects or invalid timeout values in tests + pass + # Schedule initial invocation to run immediately self._invoke_execution(execution.durable_execution_arn) @@ -897,6 +918,9 @@ def _complete_events(self, execution_arn: str): # complete doesn't actually checkpoint explicitly if event := self._completion_events.get(execution_arn): event.set() + if self._execution_timeout: + self._execution_timeout.cancel() + self._execution_timeout = None def wait_until_complete( self, execution_arn: str, timeout: float | None = None diff --git a/tests/executor_test.py b/tests/executor_test.py index 295248f..9ff34e5 100644 --- a/tests/executor_test.py +++ b/tests/executor_test.py @@ -216,6 +216,13 @@ def test_start_execution( mock_execution.start.assert_called_once() mock_store.save.assert_called_once_with(mock_execution) mock_scheduler.create_event.assert_called_once() + + # Verify execution timeout was scheduled + assert mock_scheduler.call_later.called + timeout_call = mock_scheduler.call_later.call_args + assert timeout_call.kwargs["delay"] == start_input.execution_timeout_seconds + assert timeout_call.kwargs["completion_event"] == mock_event + mock_invoke.assert_called_once_with("test-arn") assert result.execution_arn == "test-arn" @@ -303,8 +310,8 @@ def test_should_complete_workflow_with_error_when_invocation_fails( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic import asyncio @@ -349,8 +356,8 @@ def test_should_complete_workflow_with_result_when_invocation_succeeds( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic import asyncio @@ -392,8 +399,8 @@ def test_should_handle_pending_status_when_operations_exist( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic import asyncio @@ -432,8 +439,8 @@ def test_should_ignore_response_when_execution_already_complete( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic import asyncio @@ -475,8 +482,8 @@ def test_should_retry_when_response_has_no_status( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic asyncio.run(handler()) @@ -484,8 +491,8 @@ def test_should_retry_when_response_has_no_status( # Assert - verify retry was triggered due to validation error assert mock_execution.consecutive_failed_invocation_attempts == 1 mock_store.save.assert_called_with(mock_execution) - # Verify retry was scheduled (call_later should be called twice: initial + retry) - assert mock_scheduler.call_later.call_count == 2 + # Verify retry was scheduled (call_later should be called 3 times: timeout + initial + retry) + assert mock_scheduler.call_later.call_count == 3 def test_should_retry_when_failed_response_has_result( @@ -520,8 +527,8 @@ def test_should_retry_when_failed_response_has_result( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic asyncio.run(handler()) @@ -529,8 +536,8 @@ def test_should_retry_when_failed_response_has_result( # Assert - verify retry was triggered due to validation error assert mock_execution.consecutive_failed_invocation_attempts == 1 mock_store.save.assert_called_with(mock_execution) - # Verify retry was scheduled (call_later should be called twice: initial + retry) - assert mock_scheduler.call_later.call_count == 2 + # Verify retry was scheduled (call_later should be called 3 times: timeout + initial + retry) + assert mock_scheduler.call_later.call_count == 3 def test_should_retry_when_success_response_has_error( @@ -566,8 +573,8 @@ def test_should_retry_when_success_response_has_error( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic asyncio.run(handler()) @@ -575,8 +582,8 @@ def test_should_retry_when_success_response_has_error( # Assert - verify retry was triggered due to validation error assert mock_execution.consecutive_failed_invocation_attempts == 1 mock_store.save.assert_called_with(mock_execution) - # Verify retry was scheduled (call_later should be called twice: initial + retry) - assert mock_scheduler.call_later.call_count == 2 + # Verify retry was scheduled (call_later should be called 3 times: timeout + initial + retry) + assert mock_scheduler.call_later.call_count == 3 def test_should_retry_when_pending_response_has_no_operations( @@ -610,8 +617,8 @@ def test_should_retry_when_pending_response_has_no_operations( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic asyncio.run(handler()) @@ -619,8 +626,8 @@ def test_should_retry_when_pending_response_has_no_operations( # Assert - verify retry was triggered due to validation error assert mock_execution.consecutive_failed_invocation_attempts == 1 mock_store.save.assert_called_with(mock_execution) - # Verify retry was scheduled (call_later should be called twice: initial + retry) - assert mock_scheduler.call_later.call_count == 2 + # Verify retry was scheduled (call_later should be called 3 times: timeout + initial + retry) + assert mock_scheduler.call_later.call_count == 3 def test_invoke_handler_success( @@ -653,8 +660,8 @@ def test_invoke_handler_success( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic asyncio.run(handler()) @@ -689,8 +696,8 @@ def test_invoke_handler_execution_already_complete( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic asyncio.run(handler()) @@ -735,8 +742,8 @@ def test_invoke_handler_execution_completed_during_invocation( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic asyncio.run(handler()) @@ -772,8 +779,8 @@ def test_invoke_handler_resource_not_found( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic asyncio.run(handler()) @@ -813,8 +820,8 @@ def test_invoke_handler_general_exception( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic asyncio.run(handler()) @@ -822,8 +829,8 @@ def test_invoke_handler_general_exception( # Assert - verify retry was scheduled through observable behavior assert mock_execution.consecutive_failed_invocation_attempts == 1 mock_store.save.assert_called_with(mock_execution) - # Verify retry was scheduled (call_later should be called twice: initial + retry) - assert mock_scheduler.call_later.call_count == 2 + # Verify retry was scheduled (call_later should be called 3 times: timeout + initial + retry) + assert mock_scheduler.call_later.call_count == 3 def test_invoke_execution_through_start_execution( @@ -936,8 +943,8 @@ def test_should_fail_execution_when_function_not_found( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic import asyncio @@ -980,8 +987,8 @@ def test_should_fail_execution_when_retries_exhausted( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic import asyncio @@ -1024,7 +1031,7 @@ def test_should_prevent_multiple_workflow_failures_on_complete_execution( # Act & Assert - triggering workflow failure on completed execution should raise exception executor.start_execution(start_input) - handler = mock_scheduler.call_later.call_args[0][0] + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic - this should raise the exception with pytest.raises( @@ -1077,14 +1084,16 @@ def test_should_retry_invocation_when_under_limit_through_public_api( executor.start_execution(start_input) # Simulate scheduler executing the initial invocation handler - initial_handler = mock_scheduler.call_later.call_args[0][0] + initial_handler = mock_scheduler.call_later.call_args_list[-1][0][0] import asyncio asyncio.run(initial_handler()) # Verify retry was scheduled due to validation error - assert mock_scheduler.call_later.call_count == 2 # Initial + retry - retry_call = mock_scheduler.call_later.call_args_list[1] + assert mock_scheduler.call_later.call_count == 3 # timeout + initial + retry + retry_call = mock_scheduler.call_later.call_args_list[ + 2 + ] # Third call is the retry retry_handler = retry_call[0][0] retry_delay = retry_call[1]["delay"] @@ -1127,8 +1136,8 @@ def test_should_fail_workflow_when_retry_limit_exceeded( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic asyncio.run(handler()) @@ -1155,6 +1164,10 @@ def test_complete_events_through_complete_execution( mock_event = Mock() mock_scheduler.create_event.return_value = mock_event + # Mock the timeout future that will be created + mock_timeout_future = Mock() + mock_scheduler.call_later.return_value = mock_timeout_future + with patch( "aws_durable_execution_sdk_python_testing.executor.Execution" ) as mock_execution_class: @@ -1163,13 +1176,15 @@ def test_complete_events_through_complete_execution( mock_execution_class.new.return_value = mock_exec start_input = Mock() + start_input.execution_timeout_seconds = 300 executor.start_execution(start_input) - # Now complete the execution - this should trigger event.set() + # Now complete the execution - this should trigger event.set() and cancel timeout executor.complete_execution("test-arn", "result") - # Verify the event was set through observable behavior + # Verify the event was set and timeout was cancelled mock_event.set.assert_called_once() + mock_timeout_future.cancel.assert_called_once() def test_complete_events_no_event_through_public_api(executor, mock_store): @@ -1477,8 +1492,8 @@ def test_should_retry_when_response_has_unexpected_status( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic asyncio.run(handler()) @@ -1486,8 +1501,8 @@ def test_should_retry_when_response_has_unexpected_status( # Assert - verify retry was triggered due to validation error assert mock_execution.consecutive_failed_invocation_attempts == 1 mock_store.save.assert_called_with(mock_execution) - # Verify retry was scheduled (call_later should be called twice: initial + retry) - assert mock_scheduler.call_later.call_count == 2 + # Verify retry was scheduled (call_later should be called 3 times: timeout + initial + retry) + assert mock_scheduler.call_later.call_count == 3 def test_invoke_handler_execution_completed_during_invocation_async( @@ -1523,8 +1538,8 @@ def test_invoke_handler_execution_completed_during_invocation_async( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic asyncio.run(handler()) @@ -1560,8 +1575,8 @@ def test_invoke_handler_resource_not_found_async( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic asyncio.run(handler()) @@ -1612,8 +1627,8 @@ def test_invoke_handler_general_exception_async( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic asyncio.run(handler()) @@ -1621,8 +1636,8 @@ def test_invoke_handler_general_exception_async( # Assert - verify retry was scheduled through observable behavior assert mock_execution.consecutive_failed_invocation_attempts == 1 mock_store.save.assert_called_with(mock_execution) - # Verify retry was scheduled (call_later should be called twice: initial + retry) - assert mock_scheduler.call_later.call_count == 2 + # Verify retry was scheduled (call_later should be called 3 times: timeout + initial + retry) + assert mock_scheduler.call_later.call_count == 3 def test_invoke_execution_with_delay_through_wait_timer(executor, mock_scheduler): From 5ab91f25d956ec8d15b99b01e6959f6aa63225fd Mon Sep 17 00:00:00 2001 From: Brent Champion Date: Tue, 2 Dec 2025 10:08:32 -0500 Subject: [PATCH 2/2] chore: fix mocks for start input to have execution timeout --- .../executor.py | 27 ++++++++----------- tests/executor_test.py | 9 +++++++ 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/src/aws_durable_execution_sdk_python_testing/executor.py b/src/aws_durable_execution_sdk_python_testing/executor.py index 0afdfcc..aa90c32 100644 --- a/src/aws_durable_execution_sdk_python_testing/executor.py +++ b/src/aws_durable_execution_sdk_python_testing/executor.py @@ -120,24 +120,19 @@ def start_execution( self._completion_events[execution.durable_execution_arn] = completion_event # Schedule execution timeout - try: - timeout_seconds = input.execution_timeout_seconds - if timeout_seconds and timeout_seconds > 0: - - def timeout_handler(): - error = ErrorObject.from_message( - f"Execution timed out after {timeout_seconds} seconds." - ) - self.on_timed_out(execution.durable_execution_arn, error) + if input.execution_timeout_seconds > 0: - self._execution_timeout = self._scheduler.call_later( - timeout_handler, - delay=timeout_seconds, - completion_event=completion_event, + def timeout_handler(): + error = ErrorObject.from_message( + f"Execution timed out after {input.execution_timeout_seconds} seconds." ) - except (AttributeError, TypeError): - # Handle Mock objects or invalid timeout values in tests - pass + self.on_timed_out(execution.durable_execution_arn, error) + + self._execution_timeout = self._scheduler.call_later( + timeout_handler, + delay=input.execution_timeout_seconds, + completion_event=completion_event, + ) # Schedule initial invocation to run immediately self._invoke_execution(execution.durable_execution_arn) diff --git a/tests/executor_test.py b/tests/executor_test.py index 9ff34e5..8e50f2a 100644 --- a/tests/executor_test.py +++ b/tests/executor_test.py @@ -1213,6 +1213,7 @@ def test_wait_until_complete_success(executor, mock_scheduler): mock_execution_class.new.return_value = mock_execution start_input = Mock() + start_input.execution_timeout_seconds = 0 executor.start_execution(start_input) result = executor.wait_until_complete("test-arn", timeout=10) @@ -1236,6 +1237,7 @@ def test_wait_until_complete_timeout(executor, mock_scheduler): mock_execution_class.new.return_value = mock_execution start_input = Mock() + start_input.execution_timeout_seconds = 0 executor.start_execution(start_input) result = executor.wait_until_complete("test-arn", timeout=10) @@ -1290,6 +1292,7 @@ def test_should_schedule_wait_timer_correctly(executor, mock_scheduler): mock_execution_class.new.return_value = mock_execution start_input = Mock() + start_input.execution_timeout_seconds = 0 executor.start_execution(start_input) # Act - schedule wait timer through public method @@ -1444,6 +1447,7 @@ def test_on_wait_timer_scheduled(executor, mock_scheduler): mock_execution_class.new.return_value = mock_execution start_input = Mock() + start_input.execution_timeout_seconds = 0 executor.start_execution(start_input) with patch.object(executor, "_on_wait_succeeded"): @@ -1654,6 +1658,7 @@ def test_invoke_execution_with_delay_through_wait_timer(executor, mock_scheduler mock_execution_class.new.return_value = mock_execution start_input = Mock() + start_input.execution_timeout_seconds = 0 executor.start_execution(start_input) # Test delay behavior through wait timer scheduling @@ -1681,6 +1686,7 @@ def test_invoke_execution_no_delay_through_start_execution(executor, mock_schedu mock_execution_class.new.return_value = mock_execution start_input = Mock() + start_input.execution_timeout_seconds = 0 executor.start_execution(start_input) # Verify scheduler was called with no delay for initial execution @@ -1704,6 +1710,7 @@ def test_on_step_retry_scheduled(executor, mock_scheduler): mock_execution_class.new.return_value = mock_execution start_input = Mock() + start_input.execution_timeout_seconds = 0 executor.start_execution(start_input) with patch.object(executor, "_on_retry_ready"): @@ -1733,6 +1740,7 @@ def test_wait_handler_execution(executor, mock_scheduler): mock_execution_class.new.return_value = mock_execution start_input = Mock() + start_input.execution_timeout_seconds = 0 executor.start_execution(start_input) with patch.object(executor, "_on_wait_succeeded") as mock_wait: @@ -1764,6 +1772,7 @@ def test_retry_handler_execution(executor, mock_scheduler): mock_execution_class.new.return_value = mock_execution start_input = Mock() + start_input.execution_timeout_seconds = 0 executor.start_execution(start_input) with patch.object(executor, "_on_retry_ready") as mock_retry: