Skip to content

Commit 8ec79c6

Browse files
author
Alex Wang
committed
examples: add large scale map example
1 parent beda345 commit 8ec79c6

File tree

4 files changed

+111
-7
lines changed

4 files changed

+111
-7
lines changed

examples/examples-catalog.json

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,17 @@
122122
},
123123
"path": "./src/map/map_operations.py"
124124
},
125+
{
126+
"name": "Map Large Scale",
127+
"description": "Processing collections using map-like durable operations in large scale",
128+
"handler": "map_with_large_scale.handler",
129+
"integration": true,
130+
"durableConfig": {
131+
"RetentionPeriodInDays": 7,
132+
"ExecutionTimeout": 300
133+
},
134+
"path": "./src/map/map_with_large_scale.py"
135+
},
125136
{
126137
"name": "Block Example",
127138
"description": "Nested child contexts demonstrating block operations",
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
"""Test map with 50 iterations, each returning 100KB data."""
2+
3+
from typing import Any
4+
5+
from aws_durable_execution_sdk_python.config import MapConfig
6+
from aws_durable_execution_sdk_python.context import DurableContext
7+
from aws_durable_execution_sdk_python.execution import durable_execution
8+
from aws_durable_execution_sdk_python.config import Duration
9+
10+
11+
def generate_large_string(size_in_kb: int) -> str:
12+
"""Generate a string of approximately the specified size in KB."""
13+
return "A" * 1024 * size_in_kb
14+
15+
@durable_execution
16+
def handler(_event: Any, context: DurableContext) -> dict[str, Any]:
17+
"""Handler demonstrating large scale map with substantial data."""
18+
# Create array of 50 items (more manageable for testing)
19+
items = list(range(1, 51)) # 1 to 50
20+
21+
config = MapConfig(max_concurrency=10) # Process 10 items concurrently
22+
data = generate_large_string(100)
23+
results = context.map(
24+
inputs=items,
25+
func=lambda ctx, item, index, _: ctx.step(
26+
lambda _: {
27+
"itemId": item,
28+
"index": index,
29+
"dataSize": len(data),
30+
"data": data,
31+
"processed": True,
32+
}
33+
),
34+
name="large-scale-map",
35+
config=config,
36+
)
37+
38+
context.wait(Duration.from_seconds(1), name="wait1")
39+
40+
# Process results immediately after map operation
41+
# Note: After wait operations, the BatchResult may be summarized
42+
final_results = results.get_results()
43+
total_data_size = sum(result["dataSize"] for result in final_results)
44+
all_items_processed = all(result["processed"] for result in final_results)
45+
46+
total_size_in_mb = round(total_data_size / (1024 * 1024))
47+
48+
summary = {
49+
"itemsProcessed": results.success_count,
50+
"totalDataSizeMB": total_size_in_mb,
51+
"totalDataSizeBytes": total_data_size,
52+
"maxConcurrency": 10,
53+
"averageItemSize": round(total_data_size / results.success_count),
54+
"allItemsProcessed": all_items_processed,
55+
}
56+
57+
context.wait(Duration.from_seconds(1), name="wait2")
58+
59+
return {
60+
"success": True,
61+
"message": "Successfully processed 50 items with substantial data using map",
62+
"summary": summary,
63+
}

examples/src/run_in_child_context/run_in_child_context_large_data.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,7 @@
1212

1313
def generate_large_string(size_in_kb: int) -> str:
1414
"""Generate a string of approximately the specified size in KB."""
15-
target_size = size_in_kb * 1024 # Convert KB to bytes
16-
base_string = "A" * 1000 # 1KB string
17-
repetitions = target_size // 1000
18-
remainder = target_size % 1000
19-
20-
return base_string * repetitions + "A" * remainder
21-
15+
return "A" * 1024 * size_in_kb
2216

2317
@durable_with_child_context
2418
def large_data_processor(child_context: DurableContext) -> dict[str, Any]:
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
"""Tests for map_large_scale."""
2+
3+
import pytest
4+
from aws_durable_execution_sdk_python.execution import InvocationStatus
5+
6+
from src.map import map_with_large_scale
7+
from test.conftest import deserialize_operation_payload
8+
9+
10+
@pytest.mark.example
11+
@pytest.mark.durable_execution(
12+
handler=map_with_large_scale.handler,
13+
lambda_function_name="map large scale",
14+
)
15+
def test_handle_50_items_with_100kb_each_using_map(durable_runner):
16+
"""Test handling 50 items with 100KB each using map."""
17+
pass
18+
with durable_runner:
19+
result = durable_runner.run(input=None, timeout=60)
20+
21+
result_data = deserialize_operation_payload(result.result)
22+
23+
# Verify the execution succeeded
24+
assert result.status is InvocationStatus.SUCCEEDED
25+
assert result_data["success"] is True
26+
27+
# Verify the expected number of items were processed (50 items)
28+
assert result_data["summary"]["itemsProcessed"] == 50
29+
assert result_data["summary"]["allItemsProcessed"] is True
30+
31+
# Verify data size expectations (~5MB total from 50 items × 100KB each)
32+
assert result_data["summary"]["totalDataSizeMB"] > 4 # Should be ~5MB
33+
assert result_data["summary"]["totalDataSizeMB"] < 6
34+
assert result_data["summary"]["totalDataSizeBytes"] > 5000000 # ~5MB
35+
assert result_data["summary"]["averageItemSize"] > 100000 # ~100KB per item
36+
assert result_data["summary"]["maxConcurrency"] == 10

0 commit comments

Comments
 (0)