Skip to content

Commit a3cda91

Browse files
authored
fix: make retry examples deterministic with counter
1 parent 8ba9700 commit a3cda91

File tree

6 files changed

+95
-37
lines changed

6 files changed

+95
-37
lines changed

examples/src/step/step_with_retry.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from random import random
1+
from itertools import count
22
from typing import Any
33

44
from aws_durable_execution_sdk_python.config import StepConfig
@@ -14,13 +14,19 @@
1414
)
1515

1616

17+
# Counter for deterministic behavior across retries
18+
_attempts = count(1) # starts from 1
19+
20+
1721
@durable_step
1822
def unreliable_operation(
1923
_step_context: StepContext,
2024
) -> str:
21-
failure_threshold = 0.5
22-
if random() > failure_threshold: # noqa: S311
23-
msg = "Random error occurred"
25+
# Use counter for deterministic behavior
26+
# Will fail on first attempt, succeed on second
27+
attempt = next(_attempts)
28+
if attempt < 2:
29+
msg = f"Attempt {attempt} failed"
2430
raise RuntimeError(msg)
2531
return "Operation succeeded"
2632

examples/src/step/steps_with_retry.py

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,37 @@
11
"""Example demonstrating multiple steps with retry logic."""
22

3-
from random import random
3+
from itertools import count
44
from typing import Any
55

6-
from aws_durable_execution_sdk_python.config import StepConfig, Duration
7-
from aws_durable_execution_sdk_python.context import DurableContext
6+
from aws_durable_execution_sdk_python.config import Duration, StepConfig
7+
from aws_durable_execution_sdk_python.context import DurableContext, StepContext
88
from aws_durable_execution_sdk_python.execution import durable_execution
99
from aws_durable_execution_sdk_python.retries import (
1010
RetryStrategyConfig,
1111
create_retry_strategy,
1212
)
1313

1414

15-
def simulated_get_item(name: str) -> dict[str, Any] | None:
16-
"""Simulate getting an item that may fail randomly."""
17-
# Fail 50% of the time
18-
if random() < 0.5: # noqa: S311
15+
# Counter for deterministic behavior across retries
16+
_attempts = count(1) # starts from 1
17+
18+
19+
def simulated_get_item(_step_context: StepContext, name: str) -> dict[str, Any] | None:
20+
"""Simulate getting an item with deterministic counter-based behavior."""
21+
# Use counter for deterministic behavior
22+
attempt = next(_attempts)
23+
24+
# Fail on first attempt
25+
if attempt == 1:
1926
msg = "Random failure"
2027
raise RuntimeError(msg)
2128

22-
# Simulate finding item after some attempts
23-
if random() > 0.3: # noqa: S311
24-
return {"id": name, "data": "item data"}
29+
# Return None on second attempt (poll 1)
30+
if attempt == 2:
31+
return None
2532

26-
return None
33+
# Return item on third attempt (poll 2, after retry)
34+
return {"id": name, "data": "item data"}
2735

2836

2937
@durable_execution
@@ -49,7 +57,7 @@ def handler(event: Any, context: DurableContext) -> dict[str, Any]:
4957

5058
# Try to get the item with retry
5159
get_response = context.step(
52-
lambda _, n=name: simulated_get_item(n),
60+
lambda _, n=name: simulated_get_item(_, n),
5361
name=f"get_item_poll_{poll_count}",
5462
config=step_config,
5563
)

examples/test/step/test_step_with_retry.py

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import pytest
44
from aws_durable_execution_sdk_python.execution import InvocationStatus
55
from aws_durable_execution_sdk_python.lambda_service import OperationType
6-
76
from src.step import step_with_retry
87
from test.conftest import deserialize_operation_payload
98

@@ -14,20 +13,28 @@
1413
lambda_function_name="step with retry",
1514
)
1615
def test_step_with_retry(durable_runner):
17-
"""Test step with retry configuration."""
16+
"""Test step with retry configuration.
17+
18+
With counter-based deterministic behavior:
19+
- Attempt 1: counter = 1 < 2 → raises RuntimeError ❌
20+
- Attempt 2: counter = 2 >= 2 → succeeds ✓
21+
22+
The function deterministically fails once then succeeds on the second attempt.
23+
"""
1824
with durable_runner:
1925
result = durable_runner.run(input="test", timeout=30)
2026

21-
# The function uses random() so it may succeed or fail
22-
# We just verify it completes and has retry configuration
23-
assert result.status in [InvocationStatus.SUCCEEDED, InvocationStatus.FAILED]
27+
# With counter-based deterministic behavior, succeeds on attempt 2
28+
assert result.status is InvocationStatus.SUCCEEDED
29+
assert deserialize_operation_payload(result.result) == "Operation succeeded"
2430

25-
# Verify step operation exists
31+
# Verify step operation exists with retry details
2632
step_ops = [
2733
op for op in result.operations if op.operation_type == OperationType.STEP
2834
]
29-
assert len(step_ops) >= 1
35+
assert len(step_ops) == 1
3036

31-
# If it succeeded, verify the result
32-
if result.status is InvocationStatus.SUCCEEDED:
33-
assert deserialize_operation_payload(result.result) == "Operation succeeded"
37+
# The step should have succeeded on attempt 2 (after 1 failure)
38+
# Attempt numbering: 1 (initial attempt), 2 (first retry)
39+
step_op = step_ops[0]
40+
assert step_op.attempt == 2 # Succeeded on first retry (1-indexed: 2=first retry)

examples/test/step/test_steps_with_retry.py

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import pytest
44
from aws_durable_execution_sdk_python.execution import InvocationStatus
55
from aws_durable_execution_sdk_python.lambda_service import OperationType
6-
76
from src.step import steps_with_retry
87
from test.conftest import deserialize_operation_payload
98

@@ -14,20 +13,40 @@
1413
lambda_function_name="steps with retry",
1514
)
1615
def test_steps_with_retry(durable_runner):
17-
"""Test steps_with_retry pattern."""
16+
"""Test steps_with_retry pattern.
17+
18+
With counter-based deterministic behavior:
19+
- Poll 1, Attempt 1: counter = 1 → raises RuntimeError ❌
20+
- Poll 1, Attempt 2: counter = 2 → returns None
21+
- Poll 2, Attempt 1: counter = 3 → returns item ✓
22+
23+
The function finds the item on poll 2 after 1 retry on poll 1.
24+
"""
1825
with durable_runner:
1926
result = durable_runner.run(input={"name": "test-item"}, timeout=30)
2027

2128
assert result.status is InvocationStatus.SUCCEEDED
2229

23-
# Result should be either success with item or error
24-
assert isinstance(deserialize_operation_payload(result.result), dict)
25-
assert "success" in deserialize_operation_payload(
26-
result.result
27-
) or "error" in deserialize_operation_payload(result.result)
30+
# With counter-based deterministic behavior, finds item on poll 2
31+
result_data = deserialize_operation_payload(result.result)
32+
assert isinstance(result_data, dict)
33+
assert result_data.get("success") is True
34+
assert result_data.get("pollsRequired") == 2
35+
assert "item" in result_data
36+
assert result_data["item"]["id"] == "test-item"
2837

29-
# Verify step operations exist (polling steps)
38+
# Verify step operations exist
3039
step_ops = [
3140
op for op in result.operations if op.operation_type == OperationType.STEP
3241
]
33-
assert len(step_ops) >= 1
42+
# Should have exactly 2 step operations (poll 1 and poll 2)
43+
assert len(step_ops) == 2
44+
45+
# Poll 1: succeeded after 1 retry (returned None)
46+
assert step_ops[0].name == "get_item_poll_1"
47+
assert step_ops[0].result == "null"
48+
assert step_ops[0].attempt == 2 # 1 retry occurred (1-indexed: 2=first retry)
49+
50+
# Poll 2: succeeded immediately (returned item)
51+
assert step_ops[1].name == "get_item_poll_2"
52+
assert step_ops[1].attempt == 1 # No retries needed (1-indexed: 1=initial)

src/aws_durable_execution_sdk_python_testing/checkpoint/processors/base.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
ContextDetails,
1313
ExecutionDetails,
1414
Operation,
15+
OperationAction,
1516
OperationStatus,
1617
OperationType,
1718
OperationUpdate,
@@ -72,9 +73,15 @@ def _create_context_details(self, update: OperationUpdate) -> ContextDetails | N
7273
)
7374

7475
def _create_step_details(
75-
self, update: OperationUpdate, current_operation: Operation | None = None
76+
self,
77+
update: OperationUpdate,
78+
current_operation: Operation | None = None,
7679
) -> StepDetails | None:
77-
"""Create StepDetails from OperationUpdate."""
80+
"""Create StepDetails from OperationUpdate.
81+
82+
Automatically increments attempt count for RETRY, SUCCEED, and FAIL actions.
83+
"""
84+
7885
attempt: int = 0
7986
next_attempt_timestamp: datetime.datetime | None = None
8087

@@ -84,6 +91,13 @@ def _create_step_details(
8491
next_attempt_timestamp = (
8592
current_operation.step_details.next_attempt_timestamp
8693
)
94+
# Increment attempt for RETRY, SUCCEED, and FAIL actions
95+
if update.action in {
96+
OperationAction.RETRY,
97+
OperationAction.SUCCEED,
98+
OperationAction.FAIL,
99+
}:
100+
attempt += 1
87101
return StepDetails(
88102
attempt=attempt,
89103
next_attempt_timestamp=next_attempt_timestamp,

tests/checkpoint/processors/step_test.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ def test_process_succeed_action_with_current_operation():
230230

231231
current_op = Mock()
232232
current_op.start_timestamp = datetime.now(UTC)
233+
current_op.step_details = StepDetails()
233234

234235
update = OperationUpdate(
235236
operation_id="step-123",
@@ -243,6 +244,7 @@ def test_process_succeed_action_with_current_operation():
243244

244245
assert result.start_timestamp == current_op.start_timestamp
245246
assert result.status == OperationStatus.SUCCEEDED
247+
assert result.step_details.attempt == 1
246248

247249

248250
def test_process_fail_action():
@@ -274,6 +276,7 @@ def test_process_fail_action_with_current_operation():
274276

275277
current_op = Mock()
276278
current_op.start_timestamp = datetime.now(UTC)
279+
current_op.step_details = StepDetails()
277280

278281
error = ErrorObject.from_message("step failed")
279282
update = OperationUpdate(
@@ -288,6 +291,7 @@ def test_process_fail_action_with_current_operation():
288291

289292
assert result.start_timestamp == current_op.start_timestamp
290293
assert result.status == OperationStatus.FAILED
294+
assert result.step_details.attempt == 1
291295

292296

293297
def test_process_invalid_action():

0 commit comments

Comments
 (0)