diff --git a/src/aws_durable_execution_sdk_python_testing/executor.py b/src/aws_durable_execution_sdk_python_testing/executor.py index 70bcfa5..aa90c32 100644 --- a/src/aws_durable_execution_sdk_python_testing/executor.py +++ b/src/aws_durable_execution_sdk_python_testing/executor.py @@ -90,6 +90,7 @@ def __init__( self._completion_events: dict[str, Event] = {} self._callback_timeouts: dict[str, Future] = {} self._callback_heartbeats: dict[str, Future] = {} + self._execution_timeout: Future | None = None def start_execution( self, @@ -118,6 +119,21 @@ def start_execution( completion_event = self._scheduler.create_event() self._completion_events[execution.durable_execution_arn] = completion_event + # Schedule execution timeout + if input.execution_timeout_seconds > 0: + + def timeout_handler(): + error = ErrorObject.from_message( + f"Execution timed out after {input.execution_timeout_seconds} seconds." + ) + self.on_timed_out(execution.durable_execution_arn, error) + + self._execution_timeout = self._scheduler.call_later( + timeout_handler, + delay=input.execution_timeout_seconds, + completion_event=completion_event, + ) + # Schedule initial invocation to run immediately self._invoke_execution(execution.durable_execution_arn) @@ -897,6 +913,9 @@ def _complete_events(self, execution_arn: str): # complete doesn't actually checkpoint explicitly if event := self._completion_events.get(execution_arn): event.set() + if self._execution_timeout: + self._execution_timeout.cancel() + self._execution_timeout = None def wait_until_complete( self, execution_arn: str, timeout: float | None = None diff --git a/tests/executor_test.py b/tests/executor_test.py index 295248f..8e50f2a 100644 --- a/tests/executor_test.py +++ b/tests/executor_test.py @@ -216,6 +216,13 @@ def test_start_execution( mock_execution.start.assert_called_once() mock_store.save.assert_called_once_with(mock_execution) mock_scheduler.create_event.assert_called_once() + + # Verify execution timeout was scheduled + assert mock_scheduler.call_later.called + timeout_call = mock_scheduler.call_later.call_args + assert timeout_call.kwargs["delay"] == start_input.execution_timeout_seconds + assert timeout_call.kwargs["completion_event"] == mock_event + mock_invoke.assert_called_once_with("test-arn") assert result.execution_arn == "test-arn" @@ -303,8 +310,8 @@ def test_should_complete_workflow_with_error_when_invocation_fails( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic import asyncio @@ -349,8 +356,8 @@ def test_should_complete_workflow_with_result_when_invocation_succeeds( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic import asyncio @@ -392,8 +399,8 @@ def test_should_handle_pending_status_when_operations_exist( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic import asyncio @@ -432,8 +439,8 @@ def test_should_ignore_response_when_execution_already_complete( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic import asyncio @@ -475,8 +482,8 @@ def test_should_retry_when_response_has_no_status( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic asyncio.run(handler()) @@ -484,8 +491,8 @@ def test_should_retry_when_response_has_no_status( # Assert - verify retry was triggered due to validation error assert mock_execution.consecutive_failed_invocation_attempts == 1 mock_store.save.assert_called_with(mock_execution) - # Verify retry was scheduled (call_later should be called twice: initial + retry) - assert mock_scheduler.call_later.call_count == 2 + # Verify retry was scheduled (call_later should be called 3 times: timeout + initial + retry) + assert mock_scheduler.call_later.call_count == 3 def test_should_retry_when_failed_response_has_result( @@ -520,8 +527,8 @@ def test_should_retry_when_failed_response_has_result( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic asyncio.run(handler()) @@ -529,8 +536,8 @@ def test_should_retry_when_failed_response_has_result( # Assert - verify retry was triggered due to validation error assert mock_execution.consecutive_failed_invocation_attempts == 1 mock_store.save.assert_called_with(mock_execution) - # Verify retry was scheduled (call_later should be called twice: initial + retry) - assert mock_scheduler.call_later.call_count == 2 + # Verify retry was scheduled (call_later should be called 3 times: timeout + initial + retry) + assert mock_scheduler.call_later.call_count == 3 def test_should_retry_when_success_response_has_error( @@ -566,8 +573,8 @@ def test_should_retry_when_success_response_has_error( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic asyncio.run(handler()) @@ -575,8 +582,8 @@ def test_should_retry_when_success_response_has_error( # Assert - verify retry was triggered due to validation error assert mock_execution.consecutive_failed_invocation_attempts == 1 mock_store.save.assert_called_with(mock_execution) - # Verify retry was scheduled (call_later should be called twice: initial + retry) - assert mock_scheduler.call_later.call_count == 2 + # Verify retry was scheduled (call_later should be called 3 times: timeout + initial + retry) + assert mock_scheduler.call_later.call_count == 3 def test_should_retry_when_pending_response_has_no_operations( @@ -610,8 +617,8 @@ def test_should_retry_when_pending_response_has_no_operations( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic asyncio.run(handler()) @@ -619,8 +626,8 @@ def test_should_retry_when_pending_response_has_no_operations( # Assert - verify retry was triggered due to validation error assert mock_execution.consecutive_failed_invocation_attempts == 1 mock_store.save.assert_called_with(mock_execution) - # Verify retry was scheduled (call_later should be called twice: initial + retry) - assert mock_scheduler.call_later.call_count == 2 + # Verify retry was scheduled (call_later should be called 3 times: timeout + initial + retry) + assert mock_scheduler.call_later.call_count == 3 def test_invoke_handler_success( @@ -653,8 +660,8 @@ def test_invoke_handler_success( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic asyncio.run(handler()) @@ -689,8 +696,8 @@ def test_invoke_handler_execution_already_complete( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic asyncio.run(handler()) @@ -735,8 +742,8 @@ def test_invoke_handler_execution_completed_during_invocation( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic asyncio.run(handler()) @@ -772,8 +779,8 @@ def test_invoke_handler_resource_not_found( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic asyncio.run(handler()) @@ -813,8 +820,8 @@ def test_invoke_handler_general_exception( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic asyncio.run(handler()) @@ -822,8 +829,8 @@ def test_invoke_handler_general_exception( # Assert - verify retry was scheduled through observable behavior assert mock_execution.consecutive_failed_invocation_attempts == 1 mock_store.save.assert_called_with(mock_execution) - # Verify retry was scheduled (call_later should be called twice: initial + retry) - assert mock_scheduler.call_later.call_count == 2 + # Verify retry was scheduled (call_later should be called 3 times: timeout + initial + retry) + assert mock_scheduler.call_later.call_count == 3 def test_invoke_execution_through_start_execution( @@ -936,8 +943,8 @@ def test_should_fail_execution_when_function_not_found( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic import asyncio @@ -980,8 +987,8 @@ def test_should_fail_execution_when_retries_exhausted( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic import asyncio @@ -1024,7 +1031,7 @@ def test_should_prevent_multiple_workflow_failures_on_complete_execution( # Act & Assert - triggering workflow failure on completed execution should raise exception executor.start_execution(start_input) - handler = mock_scheduler.call_later.call_args[0][0] + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic - this should raise the exception with pytest.raises( @@ -1077,14 +1084,16 @@ def test_should_retry_invocation_when_under_limit_through_public_api( executor.start_execution(start_input) # Simulate scheduler executing the initial invocation handler - initial_handler = mock_scheduler.call_later.call_args[0][0] + initial_handler = mock_scheduler.call_later.call_args_list[-1][0][0] import asyncio asyncio.run(initial_handler()) # Verify retry was scheduled due to validation error - assert mock_scheduler.call_later.call_count == 2 # Initial + retry - retry_call = mock_scheduler.call_later.call_args_list[1] + assert mock_scheduler.call_later.call_count == 3 # timeout + initial + retry + retry_call = mock_scheduler.call_later.call_args_list[ + 2 + ] # Third call is the retry retry_handler = retry_call[0][0] retry_delay = retry_call[1]["delay"] @@ -1127,8 +1136,8 @@ def test_should_fail_workflow_when_retry_limit_exceeded( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic asyncio.run(handler()) @@ -1155,6 +1164,10 @@ def test_complete_events_through_complete_execution( mock_event = Mock() mock_scheduler.create_event.return_value = mock_event + # Mock the timeout future that will be created + mock_timeout_future = Mock() + mock_scheduler.call_later.return_value = mock_timeout_future + with patch( "aws_durable_execution_sdk_python_testing.executor.Execution" ) as mock_execution_class: @@ -1163,13 +1176,15 @@ def test_complete_events_through_complete_execution( mock_execution_class.new.return_value = mock_exec start_input = Mock() + start_input.execution_timeout_seconds = 300 executor.start_execution(start_input) - # Now complete the execution - this should trigger event.set() + # Now complete the execution - this should trigger event.set() and cancel timeout executor.complete_execution("test-arn", "result") - # Verify the event was set through observable behavior + # Verify the event was set and timeout was cancelled mock_event.set.assert_called_once() + mock_timeout_future.cancel.assert_called_once() def test_complete_events_no_event_through_public_api(executor, mock_store): @@ -1198,6 +1213,7 @@ def test_wait_until_complete_success(executor, mock_scheduler): mock_execution_class.new.return_value = mock_execution start_input = Mock() + start_input.execution_timeout_seconds = 0 executor.start_execution(start_input) result = executor.wait_until_complete("test-arn", timeout=10) @@ -1221,6 +1237,7 @@ def test_wait_until_complete_timeout(executor, mock_scheduler): mock_execution_class.new.return_value = mock_execution start_input = Mock() + start_input.execution_timeout_seconds = 0 executor.start_execution(start_input) result = executor.wait_until_complete("test-arn", timeout=10) @@ -1275,6 +1292,7 @@ def test_should_schedule_wait_timer_correctly(executor, mock_scheduler): mock_execution_class.new.return_value = mock_execution start_input = Mock() + start_input.execution_timeout_seconds = 0 executor.start_execution(start_input) # Act - schedule wait timer through public method @@ -1429,6 +1447,7 @@ def test_on_wait_timer_scheduled(executor, mock_scheduler): mock_execution_class.new.return_value = mock_execution start_input = Mock() + start_input.execution_timeout_seconds = 0 executor.start_execution(start_input) with patch.object(executor, "_on_wait_succeeded"): @@ -1477,8 +1496,8 @@ def test_should_retry_when_response_has_unexpected_status( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic asyncio.run(handler()) @@ -1486,8 +1505,8 @@ def test_should_retry_when_response_has_unexpected_status( # Assert - verify retry was triggered due to validation error assert mock_execution.consecutive_failed_invocation_attempts == 1 mock_store.save.assert_called_with(mock_execution) - # Verify retry was scheduled (call_later should be called twice: initial + retry) - assert mock_scheduler.call_later.call_count == 2 + # Verify retry was scheduled (call_later should be called 3 times: timeout + initial + retry) + assert mock_scheduler.call_later.call_count == 3 def test_invoke_handler_execution_completed_during_invocation_async( @@ -1523,8 +1542,8 @@ def test_invoke_handler_execution_completed_during_invocation_async( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic asyncio.run(handler()) @@ -1560,8 +1579,8 @@ def test_invoke_handler_resource_not_found_async( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic asyncio.run(handler()) @@ -1612,8 +1631,8 @@ def test_invoke_handler_general_exception_async( executor.start_execution(start_input) # Get the handler that was passed to the scheduler and execute it manually - mock_scheduler.call_later.assert_called_once() - handler = mock_scheduler.call_later.call_args[0][0] + assert mock_scheduler.call_later.call_count >= 1 + handler = mock_scheduler.call_later.call_args_list[-1][0][0] # Execute the handler to trigger the invocation logic asyncio.run(handler()) @@ -1621,8 +1640,8 @@ def test_invoke_handler_general_exception_async( # Assert - verify retry was scheduled through observable behavior assert mock_execution.consecutive_failed_invocation_attempts == 1 mock_store.save.assert_called_with(mock_execution) - # Verify retry was scheduled (call_later should be called twice: initial + retry) - assert mock_scheduler.call_later.call_count == 2 + # Verify retry was scheduled (call_later should be called 3 times: timeout + initial + retry) + assert mock_scheduler.call_later.call_count == 3 def test_invoke_execution_with_delay_through_wait_timer(executor, mock_scheduler): @@ -1639,6 +1658,7 @@ def test_invoke_execution_with_delay_through_wait_timer(executor, mock_scheduler mock_execution_class.new.return_value = mock_execution start_input = Mock() + start_input.execution_timeout_seconds = 0 executor.start_execution(start_input) # Test delay behavior through wait timer scheduling @@ -1666,6 +1686,7 @@ def test_invoke_execution_no_delay_through_start_execution(executor, mock_schedu mock_execution_class.new.return_value = mock_execution start_input = Mock() + start_input.execution_timeout_seconds = 0 executor.start_execution(start_input) # Verify scheduler was called with no delay for initial execution @@ -1689,6 +1710,7 @@ def test_on_step_retry_scheduled(executor, mock_scheduler): mock_execution_class.new.return_value = mock_execution start_input = Mock() + start_input.execution_timeout_seconds = 0 executor.start_execution(start_input) with patch.object(executor, "_on_retry_ready"): @@ -1718,6 +1740,7 @@ def test_wait_handler_execution(executor, mock_scheduler): mock_execution_class.new.return_value = mock_execution start_input = Mock() + start_input.execution_timeout_seconds = 0 executor.start_execution(start_input) with patch.object(executor, "_on_wait_succeeded") as mock_wait: @@ -1749,6 +1772,7 @@ def test_retry_handler_execution(executor, mock_scheduler): mock_execution_class.new.return_value = mock_execution start_input = Mock() + start_input.execution_timeout_seconds = 0 executor.start_execution(start_input) with patch.object(executor, "_on_retry_ready") as mock_retry: