Skip to content

Commit e3b1032

Browse files
author
Alex Wang
committed
fix: add replay_children in checkpoint processor
- Add replay_children in checkpoint processor - Add large scale map test case
1 parent 7f95a5c commit e3b1032

File tree

6 files changed

+137
-7
lines changed

6 files changed

+137
-7
lines changed

examples/examples-catalog.json

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,17 @@
133133
},
134134
"path": "./src/map/map_operations.py"
135135
},
136+
{
137+
"name": "Map Large Scale",
138+
"description": "Processing collections using map-like durable operations in large scale",
139+
"handler": "map_with_large_scale.handler",
140+
"integration": true,
141+
"durableConfig": {
142+
"RetentionPeriodInDays": 7,
143+
"ExecutionTimeout": 300
144+
},
145+
"path": "./src/map/map_with_large_scale.py"
146+
},
136147
{
137148
"name": "Block Example",
138149
"description": "Nested child contexts demonstrating block operations",
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
"""Test map with 50 iterations, each returning 100KB data."""
2+
3+
from typing import Any
4+
5+
from aws_durable_execution_sdk_python.config import MapConfig
6+
from aws_durable_execution_sdk_python.context import DurableContext
7+
from aws_durable_execution_sdk_python.execution import durable_execution
8+
from aws_durable_execution_sdk_python.config import Duration
9+
10+
11+
def generate_large_string(size_in_kb: int) -> str:
12+
"""Generate a string of approximately the specified size in KB."""
13+
return "A" * 1024 * size_in_kb
14+
15+
16+
@durable_execution
17+
def handler(_event: Any, context: DurableContext) -> dict[str, Any]:
18+
"""Handler demonstrating large scale map with substantial data."""
19+
# Create array of 50 items (more manageable for testing)
20+
items = list(range(1, 51)) # 1 to 50
21+
22+
config = MapConfig(max_concurrency=10) # Process 10 items concurrently
23+
data = generate_large_string(100)
24+
results = context.map(
25+
inputs=items,
26+
func=lambda ctx, item, index, _: ctx.step(
27+
lambda _: {
28+
"itemId": item,
29+
"index": index,
30+
"dataSize": len(data),
31+
"data": data,
32+
"processed": True,
33+
}
34+
),
35+
name="large-scale-map",
36+
config=config,
37+
)
38+
39+
context.wait(Duration.from_seconds(1), name="wait1")
40+
41+
# Process results immediately after map operation
42+
# Note: After wait operations, the BatchResult may be summarized
43+
final_results = results.get_results()
44+
total_data_size = sum(result["dataSize"] for result in final_results)
45+
all_items_processed = all(result["processed"] for result in final_results)
46+
47+
total_size_in_mb = round(total_data_size / (1024 * 1024))
48+
49+
summary = {
50+
"itemsProcessed": results.success_count,
51+
"totalDataSizeMB": total_size_in_mb,
52+
"totalDataSizeBytes": total_data_size,
53+
"maxConcurrency": 10,
54+
"averageItemSize": round(total_data_size / results.success_count),
55+
"allItemsProcessed": all_items_processed,
56+
}
57+
58+
context.wait(Duration.from_seconds(1), name="wait2")
59+
60+
return {
61+
"success": True,
62+
"message": "Successfully processed 50 items with substantial data using map",
63+
"summary": summary,
64+
}

examples/src/run_in_child_context/run_in_child_context_large_data.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,7 @@
1212

1313
def generate_large_string(size_in_kb: int) -> str:
1414
"""Generate a string of approximately the specified size in KB."""
15-
target_size = size_in_kb * 1024 # Convert KB to bytes
16-
base_string = "A" * 1000 # 1KB string
17-
repetitions = target_size // 1000
18-
remainder = target_size % 1000
19-
20-
return base_string * repetitions + "A" * remainder
15+
return "A" * 1024 * size_in_kb
2116

2217

2318
@durable_with_child_context
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
"""Tests for map_large_scale."""
2+
3+
import pytest
4+
from aws_durable_execution_sdk_python.execution import InvocationStatus
5+
6+
from src.map import map_with_large_scale
7+
from test.conftest import deserialize_operation_payload
8+
9+
10+
@pytest.mark.example
11+
@pytest.mark.durable_execution(
12+
handler=map_with_large_scale.handler,
13+
lambda_function_name="map large scale",
14+
)
15+
def test_handle_50_items_with_100kb_each_using_map(durable_runner):
16+
"""Test handling 50 items with 100KB each using map."""
17+
pass
18+
with durable_runner:
19+
result = durable_runner.run(input=None, timeout=60)
20+
21+
result_data = deserialize_operation_payload(result.result)
22+
23+
# Verify the execution succeeded
24+
assert result.status is InvocationStatus.SUCCEEDED
25+
assert result_data["success"] is True
26+
27+
# Verify the expected number of items were processed (50 items)
28+
assert result_data["summary"]["itemsProcessed"] == 50
29+
assert result_data["summary"]["allItemsProcessed"] is True
30+
31+
# Verify data size expectations (~5MB total from 50 items × 100KB each)
32+
assert result_data["summary"]["totalDataSizeMB"] > 4 # Should be ~5MB
33+
assert result_data["summary"]["totalDataSizeMB"] < 6
34+
assert result_data["summary"]["totalDataSizeBytes"] > 5000000 # ~5MB
35+
assert result_data["summary"]["averageItemSize"] > 100000 # ~100KB per item
36+
assert result_data["summary"]["maxConcurrency"] == 10

src/aws_durable_execution_sdk_python_testing/checkpoint/processors/base.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,13 @@ def _create_execution_details(
7777
def _create_context_details(self, update: OperationUpdate) -> ContextDetails | None:
7878
"""Create ContextDetails from OperationUpdate."""
7979
return (
80-
ContextDetails(result=update.payload, error=update.error)
80+
ContextDetails(
81+
result=update.payload,
82+
error=update.error,
83+
replay_children=update.context_options.replay_children
84+
if update.context_options
85+
else False,
86+
)
8187
if update.operation_type == OperationType.CONTEXT
8288
else None
8389
)

tests/checkpoint/processors/base_test.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
StepDetails,
2121
WaitDetails,
2222
WaitOptions,
23+
ContextOptions,
2324
)
2425

2526
from aws_durable_execution_sdk_python_testing.checkpoint.processors.base import (
@@ -198,6 +199,23 @@ def test_create_step_details():
198199
assert result.error == error
199200

200201

202+
def test_create_context_details_with_replay_children():
203+
processor = MockProcessor()
204+
update = OperationUpdate(
205+
operation_id="test-id",
206+
operation_type=OperationType.CONTEXT,
207+
action=OperationAction.SUCCEED,
208+
payload="test-payload",
209+
context_options=ContextOptions(replay_children=True),
210+
)
211+
212+
result = processor.create_context_details(update)
213+
214+
assert isinstance(result, ContextDetails)
215+
assert result.result == "test-payload"
216+
assert result.replay_children == True
217+
218+
201219
def test_create_step_details_non_step_type():
202220
processor = MockProcessor()
203221
update = OperationUpdate(

0 commit comments

Comments
 (0)