Skip to content

Commit 17b2028

Browse files
author
Alex Wang
committed
examples: add examples for mixed ops, no replay step and childcontext failure
1 parent 3850837 commit 17b2028

File tree

8 files changed

+358
-0
lines changed

8 files changed

+358
-0
lines changed

examples/examples-catalog.json

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,39 @@
444444
"ExecutionTimeout": 300
445445
},
446446
"path": "./src/none_results/none_results.py"
447+
},
448+
{
449+
"name": "No Replay Execution",
450+
"description": "Execution with simples steps and without replay",
451+
"handler": "no_replay_execution.handler",
452+
"integration": true,
453+
"durableConfig": {
454+
"RetentionPeriodInDays": 7,
455+
"ExecutionTimeout": 300
456+
},
457+
"path": "./src/no_replay_execution/no_replay_execution.py"
458+
},
459+
{
460+
"name": "Run In Child Context With Failing Step",
461+
"description": "Demonstrates runInChildContext with a failing step followed by a successful wait",
462+
"handler": "run_in_child_context_step_failure.handler",
463+
"integration": true,
464+
"durableConfig": {
465+
"RetentionPeriodInDays": 7,
466+
"ExecutionTimeout": 300
467+
},
468+
"path": "./src/run_in_child_context/run_in_child_context_step_failure.py"
469+
},
470+
{
471+
"name": "Comprehensive Operations",
472+
"description": "Complex multi-operation example demonstrating all major operations",
473+
"handler": "comprehensive_operations.handler",
474+
"integration": true,
475+
"durableConfig": {
476+
"RetentionPeriodInDays": 7,
477+
"ExecutionTimeout": 300
478+
},
479+
"path": "./src/comprehensive_operations/comprehensive_operations.py"
447480
}
448481
]
449482
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
"""Complex multi-operation example demonstrating all major operations."""
2+
3+
from typing import Any
4+
5+
from aws_durable_execution_sdk_python.context import DurableContext
6+
from aws_durable_execution_sdk_python.execution import durable_execution
7+
from aws_durable_execution_sdk_python.config import Duration
8+
9+
10+
@durable_execution
11+
def handler(event: dict[str, Any], context: DurableContext) -> dict[str, Any]:
12+
"""Comprehensive example demonstrating all major durable operations."""
13+
print(f"Starting comprehensive operations example with event: {event}")
14+
15+
# Step 1: ctx.step - Simple step that returns a result
16+
step1_result: str = context.step(
17+
lambda _: "Step 1 completed successfully",
18+
name="step1",
19+
)
20+
21+
# Step 2: ctx.wait - Wait for 1 second
22+
context.wait(Duration.from_seconds(1))
23+
24+
# Step 3: ctx.map - Map with 5 iterations returning numbers 1 to 5
25+
map_input = [1, 2, 3, 4, 5]
26+
27+
map_results = context.map(
28+
inputs=map_input,
29+
func=lambda ctx, item, index, _: ctx.step(
30+
lambda _: item, name=f"map-step-{index}"
31+
),
32+
name="map-numbers",
33+
).to_dict()
34+
35+
# Step 4: ctx.parallel - 3 branches, each returning a fruit name
36+
37+
parallel_results = context.parallel(
38+
functions=[
39+
lambda ctx: ctx.step(lambda _: "apple", name="fruit-step-1"),
40+
lambda ctx: ctx.step(lambda _: "banana", name="fruit-step-2"),
41+
lambda ctx: ctx.step(lambda _: "orange", name="fruit-step-3"),
42+
]
43+
).to_dict()
44+
45+
# Final result combining all operations
46+
return {
47+
"step1": step1_result,
48+
"waitCompleted": True,
49+
"mapResults": map_results,
50+
"parallelResults": parallel_results,
51+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
"""Demonstrates step execution tracking when no replay occurs."""
2+
3+
from typing import Any
4+
5+
from aws_durable_execution_sdk_python.context import DurableContext
6+
from aws_durable_execution_sdk_python.execution import durable_execution
7+
8+
9+
@durable_execution
10+
def handler(_event: Any, context: DurableContext) -> dict[str, bool]:
11+
"""Handler demonstrating step execution without replay."""
12+
context.step(lambda _: "user-1", name="fetch-user-1")
13+
context.step(lambda _: "user-2", name="fetch-user-2")
14+
15+
return {"completed": True}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
"""Demonstrates runInChildContext with a failing step followed by a successful wait."""
2+
3+
from typing import Any
4+
5+
from aws_durable_execution_sdk_python.context import DurableContext
6+
from aws_durable_execution_sdk_python.execution import durable_execution
7+
from aws_durable_execution_sdk_python.config import StepConfig, Duration
8+
from aws_durable_execution_sdk_python.retries import (
9+
RetryStrategyConfig,
10+
create_retry_strategy,
11+
)
12+
13+
14+
@durable_execution
15+
def handler(_event: Any, context: DurableContext) -> dict[str, bool]:
16+
"""Handler demonstrating runInChildContext with failing step."""
17+
18+
def child_with_failure(ctx: DurableContext) -> None:
19+
"""Child context with a failing step."""
20+
21+
retry_config = RetryStrategyConfig(
22+
max_attempts=3,
23+
initial_delay=Duration.from_seconds(1),
24+
max_delay=Duration.from_seconds(10),
25+
backoff_rate=2.0,
26+
)
27+
step_config = StepConfig(retry_strategy=create_retry_strategy(retry_config))
28+
29+
def failing_step(_: DurableContext) -> None:
30+
"""Step that always fails."""
31+
raise Exception("Step failed in child context")
32+
33+
ctx.step(
34+
failing_step,
35+
name="failing-step",
36+
config=step_config,
37+
)
38+
39+
try:
40+
context.run_in_child_context(
41+
child_with_failure,
42+
name="child-with-failure",
43+
)
44+
except Exception as error:
45+
# Catch and ignore child context and step errors
46+
result = {"success": True, "error": str(error)}
47+
48+
context.wait(Duration.from_seconds(1), name="wait-after-failure")
49+
50+
return result

examples/template.yaml

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -557,3 +557,43 @@ Resources:
557557
DurableConfig:
558558
RetentionPeriodInDays: 7
559559
ExecutionTimeout: 300
560+
NoReplayExecution:
561+
Type: AWS::Serverless::Function
562+
Properties:
563+
CodeUri: build/
564+
Handler: no_replay_execution.handler
565+
Description: Execution with simples steps and without replay
566+
Role:
567+
Fn::GetAtt:
568+
- DurableFunctionRole
569+
- Arn
570+
DurableConfig:
571+
RetentionPeriodInDays: 7
572+
ExecutionTimeout: 300
573+
RunInChildContextStepFailure:
574+
Type: AWS::Serverless::Function
575+
Properties:
576+
CodeUri: build/
577+
Handler: run_in_child_context_step_failure.handler
578+
Description: Demonstrates runInChildContext with a failing step followed by
579+
a successful wait
580+
Role:
581+
Fn::GetAtt:
582+
- DurableFunctionRole
583+
- Arn
584+
DurableConfig:
585+
RetentionPeriodInDays: 7
586+
ExecutionTimeout: 300
587+
ComprehensiveOperations:
588+
Type: AWS::Serverless::Function
589+
Properties:
590+
CodeUri: build/
591+
Handler: comprehensive_operations.handler
592+
Description: Complex multi-operation example demonstrating all major operations
593+
Role:
594+
Fn::GetAtt:
595+
- DurableFunctionRole
596+
- Arn
597+
DurableConfig:
598+
RetentionPeriodInDays: 7
599+
ExecutionTimeout: 300
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
"""Tests for comprehensive_operations."""
2+
3+
import pytest
4+
from aws_durable_execution_sdk_python.execution import InvocationStatus
5+
6+
from src.comprehensive_operations import comprehensive_operations
7+
from test.conftest import deserialize_operation_payload
8+
9+
10+
@pytest.mark.example
11+
@pytest.mark.durable_execution(
12+
handler=comprehensive_operations.handler,
13+
lambda_function_name="Comprehensive Operations",
14+
)
15+
def test_execute_all_operations_successfully(durable_runner):
16+
"""Test that all operations execute successfully."""
17+
with durable_runner:
18+
result = durable_runner.run(input={"message": "test"}, timeout=30)
19+
20+
assert result.status is InvocationStatus.SUCCEEDED
21+
22+
result_data = deserialize_operation_payload(result.result)
23+
24+
assert result_data["step1"] == "Step 1 completed successfully"
25+
assert result_data["waitCompleted"] is True
26+
27+
# verify map results
28+
map_results = result_data["mapResults"]
29+
assert len(map_results["all"]) == 5
30+
assert [item["result"] for item in map_results["all"]] == [1, 2, 3, 4, 5]
31+
assert map_results["completionReason"] == "ALL_COMPLETED"
32+
33+
# verify parallel results
34+
parallel_results = result_data["parallelResults"]
35+
assert len(parallel_results["all"]) == 3
36+
assert [item["result"] for item in parallel_results["all"]] == [
37+
"apple",
38+
"banana",
39+
"orange",
40+
]
41+
assert parallel_results["completionReason"] == "ALL_COMPLETED"
42+
43+
# Get all operations including nested ones
44+
all_ops = result.get_all_operations()
45+
46+
# Verify step1 operation
47+
step1_ops = [
48+
op for op in all_ops if op.operation_type.value == "STEP" and op.name == "step1"
49+
]
50+
assert len(step1_ops) == 1
51+
step1_op = step1_ops[0]
52+
assert (
53+
deserialize_operation_payload(step1_op.result)
54+
== "Step 1 completed successfully"
55+
)
56+
57+
# Verify wait operation (should be at index 1)
58+
wait_op = result.operations[1]
59+
assert wait_op.operation_type.value == "WAIT"
60+
61+
# Verify individual map step operations exist with correct names
62+
for i in range(5):
63+
map_step_ops = [
64+
op
65+
for op in all_ops
66+
if op.operation_type.value == "STEP" and op.name == f"map-step-{i}"
67+
]
68+
assert len(map_step_ops) == 1
69+
assert deserialize_operation_payload(map_step_ops[0].result) == i + 1
70+
71+
# Verify individual parallel step operations exist
72+
fruit_step_1_ops = [
73+
op
74+
for op in all_ops
75+
if op.operation_type.value == "STEP" and op.name == "fruit-step-1"
76+
]
77+
assert len(fruit_step_1_ops) == 1
78+
assert deserialize_operation_payload(fruit_step_1_ops[0].result) == "apple"
79+
80+
fruit_step_2_ops = [
81+
op
82+
for op in all_ops
83+
if op.operation_type.value == "STEP" and op.name == "fruit-step-2"
84+
]
85+
assert len(fruit_step_2_ops) == 1
86+
assert deserialize_operation_payload(fruit_step_2_ops[0].result) == "banana"
87+
88+
fruit_step_3_ops = [
89+
op
90+
for op in all_ops
91+
if op.operation_type.value == "STEP" and op.name == "fruit-step-3"
92+
]
93+
assert len(fruit_step_3_ops) == 1
94+
assert deserialize_operation_payload(fruit_step_3_ops[0].result) == "orange"
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
"""Tests for no_replay_execution."""
2+
3+
import pytest
4+
from aws_durable_execution_sdk_python.execution import InvocationStatus
5+
6+
from src.no_replay_execution import no_replay_execution
7+
from test.conftest import deserialize_operation_payload
8+
9+
10+
@pytest.mark.example
11+
@pytest.mark.durable_execution(
12+
handler=no_replay_execution.handler,
13+
lambda_function_name="No Replay Execution",
14+
)
15+
def test_handle_step_operations_when_no_replay_occurs(durable_runner):
16+
"""Test step operations when no replay occurs."""
17+
with durable_runner:
18+
result = durable_runner.run(input=None, timeout=10)
19+
20+
assert result.status is InvocationStatus.SUCCEEDED
21+
22+
# Verify final result
23+
assert deserialize_operation_payload(result.result) == {"completed": True}
24+
25+
# Get step operations
26+
user1_step_ops = [
27+
op
28+
for op in result.operations
29+
if op.operation_type.value == "STEP" and op.name == "fetch-user-1"
30+
]
31+
assert len(user1_step_ops) == 1
32+
user1_step = user1_step_ops[0]
33+
34+
user2_step_ops = [
35+
op
36+
for op in result.operations
37+
if op.operation_type.value == "STEP" and op.name == "fetch-user-2"
38+
]
39+
assert len(user2_step_ops) == 1
40+
user2_step = user2_step_ops[0]
41+
42+
# Verify first-time execution tracking (no replay)
43+
assert user1_step.operation_type.value == "STEP"
44+
assert user1_step.status.value == "SUCCEEDED"
45+
assert deserialize_operation_payload(user1_step.result) == "user-1"
46+
47+
assert user2_step.operation_type.value == "STEP"
48+
assert user2_step.status.value == "SUCCEEDED"
49+
assert deserialize_operation_payload(user2_step.result) == "user-2"
50+
51+
# Verify both operations tracked
52+
assert len(result.operations) == 2
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
"""Tests for run_in_child_context_failing_step."""
2+
3+
import pytest
4+
from aws_durable_execution_sdk_python.execution import InvocationStatus
5+
6+
from src.run_in_child_context import run_in_child_context_step_failure
7+
from test.conftest import deserialize_operation_payload
8+
9+
10+
@pytest.mark.example
11+
@pytest.mark.durable_execution(
12+
handler=run_in_child_context_step_failure.handler,
13+
lambda_function_name="Run In Child Context With Failing Step",
14+
)
15+
def test_succeed_despite_failing_step_in_child_context(durable_runner):
16+
"""Test that execution succeeds despite failing step in child context."""
17+
with durable_runner:
18+
result = durable_runner.run(input=None, timeout=30)
19+
20+
assert result.status is InvocationStatus.SUCCEEDED
21+
22+
result_data = deserialize_operation_payload(result.result)
23+
assert result_data == {"success": True, "error": "Step failed in child context"}

0 commit comments

Comments
 (0)