Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions examples/src/step/step_with_retry.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from random import random
from itertools import count
from typing import Any

from aws_durable_execution_sdk_python.config import StepConfig
Expand All @@ -14,13 +14,19 @@
)


# Counter for deterministic behavior across retries
_attempts = count(1) # starts from 1


@durable_step
def unreliable_operation(
_step_context: StepContext,
) -> str:
failure_threshold = 0.5
if random() > failure_threshold: # noqa: S311
msg = "Random error occurred"
# Use counter for deterministic behavior
# Will fail on first attempt, succeed on second
attempt = next(_attempts)
if attempt < 2:
msg = f"Attempt {attempt} failed"
raise RuntimeError(msg)
return "Operation succeeded"

Expand Down
32 changes: 20 additions & 12 deletions examples/src/step/steps_with_retry.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,37 @@
"""Example demonstrating multiple steps with retry logic."""

from random import random
from itertools import count
from typing import Any

from aws_durable_execution_sdk_python.config import StepConfig, Duration
from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.config import Duration, StepConfig
from aws_durable_execution_sdk_python.context import DurableContext, StepContext
from aws_durable_execution_sdk_python.execution import durable_execution
from aws_durable_execution_sdk_python.retries import (
RetryStrategyConfig,
create_retry_strategy,
)


def simulated_get_item(name: str) -> dict[str, Any] | None:
"""Simulate getting an item that may fail randomly."""
# Fail 50% of the time
if random() < 0.5: # noqa: S311
# Counter for deterministic behavior across retries
_attempts = count(1) # starts from 1


def simulated_get_item(_step_context: StepContext, name: str) -> dict[str, Any] | None:
"""Simulate getting an item with deterministic counter-based behavior."""
# Use counter for deterministic behavior
attempt = next(_attempts)

# Fail on first attempt
if attempt == 1:
msg = "Random failure"
raise RuntimeError(msg)

# Simulate finding item after some attempts
if random() > 0.3: # noqa: S311
return {"id": name, "data": "item data"}
# Return None on second attempt (poll 1)
if attempt == 2:
return None

return None
# Return item on third attempt (poll 2, after retry)
return {"id": name, "data": "item data"}


@durable_execution
Expand All @@ -49,7 +57,7 @@ def handler(event: Any, context: DurableContext) -> dict[str, Any]:

# Try to get the item with retry
get_response = context.step(
lambda _, n=name: simulated_get_item(n),
lambda _, n=name: simulated_get_item(_, n),
name=f"get_item_poll_{poll_count}",
config=step_config,
)
Expand Down
27 changes: 17 additions & 10 deletions examples/test/step/test_step_with_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import pytest
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import OperationType

from src.step import step_with_retry
from test.conftest import deserialize_operation_payload

Expand All @@ -14,20 +13,28 @@
lambda_function_name="step with retry",
)
def test_step_with_retry(durable_runner):
"""Test step with retry configuration."""
"""Test step with retry configuration.

With counter-based deterministic behavior:
- Attempt 1: counter = 1 < 2 → raises RuntimeError ❌
- Attempt 2: counter = 2 >= 2 → succeeds ✓

The function deterministically fails once then succeeds on the second attempt.
"""
with durable_runner:
result = durable_runner.run(input="test", timeout=30)

# The function uses random() so it may succeed or fail
# We just verify it completes and has retry configuration
assert result.status in [InvocationStatus.SUCCEEDED, InvocationStatus.FAILED]
# With counter-based deterministic behavior, succeeds on attempt 2
assert result.status is InvocationStatus.SUCCEEDED
assert deserialize_operation_payload(result.result) == "Operation succeeded"

# Verify step operation exists
# Verify step operation exists with retry details
step_ops = [
op for op in result.operations if op.operation_type == OperationType.STEP
]
assert len(step_ops) >= 1
assert len(step_ops) == 1

# If it succeeded, verify the result
if result.status is InvocationStatus.SUCCEEDED:
assert deserialize_operation_payload(result.result) == "Operation succeeded"
# The step should have succeeded on attempt 2 (after 1 failure)
# Attempt numbering: 1 (initial attempt), 2 (first retry)
step_op = step_ops[0]
assert step_op.attempt == 2 # Succeeded on first retry (1-indexed: 2=first retry)
37 changes: 28 additions & 9 deletions examples/test/step/test_steps_with_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import pytest
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import OperationType

from src.step import steps_with_retry
from test.conftest import deserialize_operation_payload

Expand All @@ -14,20 +13,40 @@
lambda_function_name="steps with retry",
)
def test_steps_with_retry(durable_runner):
"""Test steps_with_retry pattern."""
"""Test steps_with_retry pattern.

With counter-based deterministic behavior:
- Poll 1, Attempt 1: counter = 1 → raises RuntimeError ❌
- Poll 1, Attempt 2: counter = 2 → returns None
- Poll 2, Attempt 1: counter = 3 → returns item ✓

The function finds the item on poll 2 after 1 retry on poll 1.
"""
with durable_runner:
result = durable_runner.run(input={"name": "test-item"}, timeout=30)

assert result.status is InvocationStatus.SUCCEEDED

# Result should be either success with item or error
assert isinstance(deserialize_operation_payload(result.result), dict)
assert "success" in deserialize_operation_payload(
result.result
) or "error" in deserialize_operation_payload(result.result)
# With counter-based deterministic behavior, finds item on poll 2
result_data = deserialize_operation_payload(result.result)
assert isinstance(result_data, dict)
assert result_data.get("success") is True
assert result_data.get("pollsRequired") == 2
assert "item" in result_data
assert result_data["item"]["id"] == "test-item"

# Verify step operations exist (polling steps)
# Verify step operations exist
step_ops = [
op for op in result.operations if op.operation_type == OperationType.STEP
]
assert len(step_ops) >= 1
# Should have exactly 2 step operations (poll 1 and poll 2)
assert len(step_ops) == 2

# Poll 1: succeeded after 1 retry (returned None)
assert step_ops[0].name == "get_item_poll_1"
assert step_ops[0].result == "null"
assert step_ops[0].attempt == 2 # 1 retry occurred (1-indexed: 2=first retry)

# Poll 2: succeeded immediately (returned item)
assert step_ops[1].name == "get_item_poll_2"
assert step_ops[1].attempt == 1 # No retries needed (1-indexed: 1=initial)
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
ContextDetails,
ExecutionDetails,
Operation,
OperationAction,
OperationStatus,
OperationType,
OperationUpdate,
Expand Down Expand Up @@ -72,9 +73,15 @@ def _create_context_details(self, update: OperationUpdate) -> ContextDetails | N
)

def _create_step_details(
self, update: OperationUpdate, current_operation: Operation | None = None
self,
update: OperationUpdate,
current_operation: Operation | None = None,
) -> StepDetails | None:
"""Create StepDetails from OperationUpdate."""
"""Create StepDetails from OperationUpdate.

Automatically increments attempt count for RETRY, SUCCEED, and FAIL actions.
"""

attempt: int = 0
next_attempt_timestamp: datetime.datetime | None = None

Expand All @@ -84,6 +91,13 @@ def _create_step_details(
next_attempt_timestamp = (
current_operation.step_details.next_attempt_timestamp
)
# Increment attempt for RETRY, SUCCEED, and FAIL actions
if update.action in {
OperationAction.RETRY,
OperationAction.SUCCEED,
OperationAction.FAIL,
}:
attempt += 1
return StepDetails(
attempt=attempt,
next_attempt_timestamp=next_attempt_timestamp,
Expand Down
4 changes: 4 additions & 0 deletions tests/checkpoint/processors/step_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ def test_process_succeed_action_with_current_operation():

current_op = Mock()
current_op.start_timestamp = datetime.now(UTC)
current_op.step_details = StepDetails()

update = OperationUpdate(
operation_id="step-123",
Expand All @@ -243,6 +244,7 @@ def test_process_succeed_action_with_current_operation():

assert result.start_timestamp == current_op.start_timestamp
assert result.status == OperationStatus.SUCCEEDED
assert result.step_details.attempt == 1


def test_process_fail_action():
Expand Down Expand Up @@ -274,6 +276,7 @@ def test_process_fail_action_with_current_operation():

current_op = Mock()
current_op.start_timestamp = datetime.now(UTC)
current_op.step_details = StepDetails()

error = ErrorObject.from_message("step failed")
update = OperationUpdate(
Expand All @@ -288,6 +291,7 @@ def test_process_fail_action_with_current_operation():

assert result.start_timestamp == current_op.start_timestamp
assert result.status == OperationStatus.FAILED
assert result.step_details.attempt == 1


def test_process_invalid_action():
Expand Down