diff --git a/examples/src/step/step_with_retry.py b/examples/src/step/step_with_retry.py index 1f70385..5f8cb78 100644 --- a/examples/src/step/step_with_retry.py +++ b/examples/src/step/step_with_retry.py @@ -1,4 +1,4 @@ -from random import random +from itertools import count from typing import Any from aws_durable_execution_sdk_python.config import StepConfig @@ -14,13 +14,19 @@ ) +# Counter for deterministic behavior across retries +_attempts = count(1) # starts from 1 + + @durable_step def unreliable_operation( _step_context: StepContext, ) -> str: - failure_threshold = 0.5 - if random() > failure_threshold: # noqa: S311 - msg = "Random error occurred" + # Use counter for deterministic behavior + # Will fail on first attempt, succeed on second + attempt = next(_attempts) + if attempt < 2: + msg = f"Attempt {attempt} failed" raise RuntimeError(msg) return "Operation succeeded" diff --git a/examples/src/step/steps_with_retry.py b/examples/src/step/steps_with_retry.py index 0fd6277..3d2bb33 100644 --- a/examples/src/step/steps_with_retry.py +++ b/examples/src/step/steps_with_retry.py @@ -1,10 +1,10 @@ """Example demonstrating multiple steps with retry logic.""" -from random import random +from itertools import count from typing import Any -from aws_durable_execution_sdk_python.config import StepConfig, Duration -from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.config import Duration, StepConfig +from aws_durable_execution_sdk_python.context import DurableContext, StepContext from aws_durable_execution_sdk_python.execution import durable_execution from aws_durable_execution_sdk_python.retries import ( RetryStrategyConfig, @@ -12,18 +12,26 @@ ) -def simulated_get_item(name: str) -> dict[str, Any] | None: - """Simulate getting an item that may fail randomly.""" - # Fail 50% of the time - if random() < 0.5: # noqa: S311 +# Counter for deterministic behavior across retries +_attempts = count(1) # starts from 1 + + +def simulated_get_item(_step_context: StepContext, name: str) -> dict[str, Any] | None: + """Simulate getting an item with deterministic counter-based behavior.""" + # Use counter for deterministic behavior + attempt = next(_attempts) + + # Fail on first attempt + if attempt == 1: msg = "Random failure" raise RuntimeError(msg) - # Simulate finding item after some attempts - if random() > 0.3: # noqa: S311 - return {"id": name, "data": "item data"} + # Return None on second attempt (poll 1) + if attempt == 2: + return None - return None + # Return item on third attempt (poll 2, after retry) + return {"id": name, "data": "item data"} @durable_execution @@ -49,7 +57,7 @@ def handler(event: Any, context: DurableContext) -> dict[str, Any]: # Try to get the item with retry get_response = context.step( - lambda _, n=name: simulated_get_item(n), + lambda _, n=name: simulated_get_item(_, n), name=f"get_item_poll_{poll_count}", config=step_config, ) diff --git a/examples/test/step/test_step_with_retry.py b/examples/test/step/test_step_with_retry.py index cf7bc8d..bb6ba8b 100644 --- a/examples/test/step/test_step_with_retry.py +++ b/examples/test/step/test_step_with_retry.py @@ -3,7 +3,6 @@ import pytest from aws_durable_execution_sdk_python.execution import InvocationStatus from aws_durable_execution_sdk_python.lambda_service import OperationType - from src.step import step_with_retry from test.conftest import deserialize_operation_payload @@ -14,20 +13,28 @@ lambda_function_name="step with retry", ) def test_step_with_retry(durable_runner): - """Test step with retry configuration.""" + """Test step with retry configuration. + + With counter-based deterministic behavior: + - Attempt 1: counter = 1 < 2 → raises RuntimeError ❌ + - Attempt 2: counter = 2 >= 2 → succeeds ✓ + + The function deterministically fails once then succeeds on the second attempt. + """ with durable_runner: result = durable_runner.run(input="test", timeout=30) - # The function uses random() so it may succeed or fail - # We just verify it completes and has retry configuration - assert result.status in [InvocationStatus.SUCCEEDED, InvocationStatus.FAILED] + # With counter-based deterministic behavior, succeeds on attempt 2 + assert result.status is InvocationStatus.SUCCEEDED + assert deserialize_operation_payload(result.result) == "Operation succeeded" - # Verify step operation exists + # Verify step operation exists with retry details step_ops = [ op for op in result.operations if op.operation_type == OperationType.STEP ] - assert len(step_ops) >= 1 + assert len(step_ops) == 1 - # If it succeeded, verify the result - if result.status is InvocationStatus.SUCCEEDED: - assert deserialize_operation_payload(result.result) == "Operation succeeded" + # The step should have succeeded on attempt 2 (after 1 failure) + # Attempt numbering: 1 (initial attempt), 2 (first retry) + step_op = step_ops[0] + assert step_op.attempt == 2 # Succeeded on first retry (1-indexed: 2=first retry) diff --git a/examples/test/step/test_steps_with_retry.py b/examples/test/step/test_steps_with_retry.py index 88b8b8b..17ad8dc 100644 --- a/examples/test/step/test_steps_with_retry.py +++ b/examples/test/step/test_steps_with_retry.py @@ -3,7 +3,6 @@ import pytest from aws_durable_execution_sdk_python.execution import InvocationStatus from aws_durable_execution_sdk_python.lambda_service import OperationType - from src.step import steps_with_retry from test.conftest import deserialize_operation_payload @@ -14,20 +13,40 @@ lambda_function_name="steps with retry", ) def test_steps_with_retry(durable_runner): - """Test steps_with_retry pattern.""" + """Test steps_with_retry pattern. + + With counter-based deterministic behavior: + - Poll 1, Attempt 1: counter = 1 → raises RuntimeError ❌ + - Poll 1, Attempt 2: counter = 2 → returns None + - Poll 2, Attempt 1: counter = 3 → returns item ✓ + + The function finds the item on poll 2 after 1 retry on poll 1. + """ with durable_runner: result = durable_runner.run(input={"name": "test-item"}, timeout=30) assert result.status is InvocationStatus.SUCCEEDED - # Result should be either success with item or error - assert isinstance(deserialize_operation_payload(result.result), dict) - assert "success" in deserialize_operation_payload( - result.result - ) or "error" in deserialize_operation_payload(result.result) + # With counter-based deterministic behavior, finds item on poll 2 + result_data = deserialize_operation_payload(result.result) + assert isinstance(result_data, dict) + assert result_data.get("success") is True + assert result_data.get("pollsRequired") == 2 + assert "item" in result_data + assert result_data["item"]["id"] == "test-item" - # Verify step operations exist (polling steps) + # Verify step operations exist step_ops = [ op for op in result.operations if op.operation_type == OperationType.STEP ] - assert len(step_ops) >= 1 + # Should have exactly 2 step operations (poll 1 and poll 2) + assert len(step_ops) == 2 + + # Poll 1: succeeded after 1 retry (returned None) + assert step_ops[0].name == "get_item_poll_1" + assert step_ops[0].result == "null" + assert step_ops[0].attempt == 2 # 1 retry occurred (1-indexed: 2=first retry) + + # Poll 2: succeeded immediately (returned item) + assert step_ops[1].name == "get_item_poll_2" + assert step_ops[1].attempt == 1 # No retries needed (1-indexed: 1=initial) diff --git a/src/aws_durable_execution_sdk_python_testing/checkpoint/processors/base.py b/src/aws_durable_execution_sdk_python_testing/checkpoint/processors/base.py index f7991a6..82d40c7 100644 --- a/src/aws_durable_execution_sdk_python_testing/checkpoint/processors/base.py +++ b/src/aws_durable_execution_sdk_python_testing/checkpoint/processors/base.py @@ -12,6 +12,7 @@ ContextDetails, ExecutionDetails, Operation, + OperationAction, OperationStatus, OperationType, OperationUpdate, @@ -72,9 +73,15 @@ def _create_context_details(self, update: OperationUpdate) -> ContextDetails | N ) def _create_step_details( - self, update: OperationUpdate, current_operation: Operation | None = None + self, + update: OperationUpdate, + current_operation: Operation | None = None, ) -> StepDetails | None: - """Create StepDetails from OperationUpdate.""" + """Create StepDetails from OperationUpdate. + + Automatically increments attempt count for RETRY, SUCCEED, and FAIL actions. + """ + attempt: int = 0 next_attempt_timestamp: datetime.datetime | None = None @@ -84,6 +91,13 @@ def _create_step_details( next_attempt_timestamp = ( current_operation.step_details.next_attempt_timestamp ) + # Increment attempt for RETRY, SUCCEED, and FAIL actions + if update.action in { + OperationAction.RETRY, + OperationAction.SUCCEED, + OperationAction.FAIL, + }: + attempt += 1 return StepDetails( attempt=attempt, next_attempt_timestamp=next_attempt_timestamp, diff --git a/tests/checkpoint/processors/step_test.py b/tests/checkpoint/processors/step_test.py index 53b12e8..6ba5cc6 100644 --- a/tests/checkpoint/processors/step_test.py +++ b/tests/checkpoint/processors/step_test.py @@ -230,6 +230,7 @@ def test_process_succeed_action_with_current_operation(): current_op = Mock() current_op.start_timestamp = datetime.now(UTC) + current_op.step_details = StepDetails() update = OperationUpdate( operation_id="step-123", @@ -243,6 +244,7 @@ def test_process_succeed_action_with_current_operation(): assert result.start_timestamp == current_op.start_timestamp assert result.status == OperationStatus.SUCCEEDED + assert result.step_details.attempt == 1 def test_process_fail_action(): @@ -274,6 +276,7 @@ def test_process_fail_action_with_current_operation(): current_op = Mock() current_op.start_timestamp = datetime.now(UTC) + current_op.step_details = StepDetails() error = ErrorObject.from_message("step failed") update = OperationUpdate( @@ -288,6 +291,7 @@ def test_process_fail_action_with_current_operation(): assert result.start_timestamp == current_op.start_timestamp assert result.status == OperationStatus.FAILED + assert result.step_details.attempt == 1 def test_process_invalid_action():