Skip to content

Commit 74a4bf2

Browse files
vip-amznAstraea Quinn S
authored andcommitted
test: add concurrency integ tests
1 parent 0153bc1 commit 74a4bf2

26 files changed

+1277
-43
lines changed

examples/examples-catalog.json

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,116 @@
187187
"ExecutionTimeout": 300
188188
},
189189
"path": "./src/simple_execution/simple_execution.py"
190+
},
191+
{
192+
"name": "Map with Max Concurrency",
193+
"description": "Map operation with maxConcurrency limit",
194+
"handler": "map_with_max_concurrency.handler",
195+
"integration": true,
196+
"durableConfig": {
197+
"RetentionPeriodInDays": 7,
198+
"ExecutionTimeout": 300
199+
},
200+
"path": "./src/map/map_with_max_concurrency.py"
201+
},
202+
{
203+
"name": "Map with Min Successful",
204+
"description": "Map operation with min_successful completion config",
205+
"handler": "map_with_min_successful.handler",
206+
"integration": true,
207+
"durableConfig": {
208+
"RetentionPeriodInDays": 7,
209+
"ExecutionTimeout": 300
210+
},
211+
"path": "./src/map/map_with_min_successful.py"
212+
},
213+
{
214+
"name": "Map with Failure Tolerance",
215+
"description": "Map operation with failure tolerance",
216+
"handler": "map_with_failure_tolerance.handler",
217+
"integration": true,
218+
"durableConfig": {
219+
"RetentionPeriodInDays": 7,
220+
"ExecutionTimeout": 300
221+
},
222+
"path": "./src/map/map_with_failure_tolerance.py"
223+
},
224+
{
225+
"name": "Parallel with Max Concurrency",
226+
"description": "Parallel operation with maxConcurrency limit",
227+
"handler": "parallel_with_max_concurrency.handler",
228+
"integration": true,
229+
"durableConfig": {
230+
"RetentionPeriodInDays": 7,
231+
"ExecutionTimeout": 300
232+
},
233+
"path": "./src/parallel/parallel_with_max_concurrency.py"
234+
},
235+
{
236+
"name": "Parallel with Wait",
237+
"description": "Parallel operation with wait operations in branches",
238+
"handler": "parallel_with_wait.handler",
239+
"integration": true,
240+
"durableConfig": {
241+
"RetentionPeriodInDays": 7,
242+
"ExecutionTimeout": 300
243+
},
244+
"path": "./src/parallel/parallel_with_wait.py"
245+
},
246+
{
247+
"name": "Parallel with Failure Tolerance",
248+
"description": "Parallel operation with failure tolerance",
249+
"handler": "parallel_with_failure_tolerance.handler",
250+
"integration": true,
251+
"durableConfig": {
252+
"RetentionPeriodInDays": 7,
253+
"ExecutionTimeout": 300
254+
},
255+
"path": "./src/parallel/parallel_with_failure_tolerance.py"
256+
},
257+
{
258+
"name": "Map with Custom SerDes",
259+
"description": "Map operation with custom item-level serialization",
260+
"handler": "map_with_custom_serdes.handler",
261+
"integration": true,
262+
"durableConfig": {
263+
"RetentionPeriodInDays": 7,
264+
"ExecutionTimeout": 300
265+
},
266+
"path": "./src/map/map_with_custom_serdes.py"
267+
},
268+
{
269+
"name": "Map with Batch SerDes",
270+
"description": "Map operation with custom batch-level serialization",
271+
"handler": "map_with_batch_serdes.handler",
272+
"integration": true,
273+
"durableConfig": {
274+
"RetentionPeriodInDays": 7,
275+
"ExecutionTimeout": 300
276+
},
277+
"path": "./src/map/map_with_batch_serdes.py"
278+
},
279+
{
280+
"name": "Parallel with Custom SerDes",
281+
"description": "Parallel operation with custom item-level serialization",
282+
"handler": "parallel_with_custom_serdes.handler",
283+
"integration": true,
284+
"durableConfig": {
285+
"RetentionPeriodInDays": 7,
286+
"ExecutionTimeout": 300
287+
},
288+
"path": "./src/parallel/parallel_with_custom_serdes.py"
289+
},
290+
{
291+
"name": "Parallel with Batch SerDes",
292+
"description": "Parallel operation with custom batch-level serialization",
293+
"handler": "parallel_with_batch_serdes.handler",
294+
"integration": true,
295+
"durableConfig": {
296+
"RetentionPeriodInDays": 7,
297+
"ExecutionTimeout": 300
298+
},
299+
"path": "./src/parallel/parallel_with_batch_serdes.py"
190300
}
191301
]
192302
}

examples/src/map/map_operations.py

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,23 @@
1-
"""Example demonstrating map-like operations for processing collections durably."""
1+
"""Example demonstrating map operations for processing collections durably."""
22

33
from typing import Any
44

5+
from aws_durable_execution_sdk_python.config import MapConfig
56
from aws_durable_execution_sdk_python.context import DurableContext
67
from aws_durable_execution_sdk_python.execution import durable_execution
78

89

9-
def square(x: int) -> int:
10-
return x * x
11-
12-
1310
@durable_execution
1411
def handler(_event: Any, context: DurableContext) -> list[int]:
15-
"""Process a list of items using map-like operations."""
12+
"""Process a list of items using context.map()."""
1613
items = [1, 2, 3, 4, 5]
1714

18-
# Process each item as a separate durable step
19-
results = []
20-
for i, item in enumerate(items):
21-
result = context.step(lambda _, x=item: square(x), name=f"square_{i}")
22-
results.append(result)
23-
24-
return results
15+
# Use context.map() to process items concurrently and extract results immediately
16+
return context.map(
17+
inputs=items,
18+
func=lambda ctx, item, index, _: ctx.step(
19+
lambda _: item * 2, name=f"map_item_{index}"
20+
),
21+
name="map_operation",
22+
config=MapConfig(max_concurrency=2),
23+
).get_results()
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
"""Example demonstrating map with batch-level serdes."""
2+
3+
import json
4+
from typing import Any
5+
6+
from aws_durable_execution_sdk_python.concurrency.models import (
7+
BatchItem,
8+
BatchItemStatus,
9+
BatchResult,
10+
CompletionReason,
11+
)
12+
from aws_durable_execution_sdk_python.config import MapConfig
13+
from aws_durable_execution_sdk_python.context import DurableContext
14+
from aws_durable_execution_sdk_python.execution import durable_execution
15+
from aws_durable_execution_sdk_python.lambda_service import ErrorObject
16+
from aws_durable_execution_sdk_python.serdes import JsonSerDes, SerDes, SerDesContext
17+
18+
19+
class CustomBatchSerDes(SerDes[BatchResult]):
20+
"""Custom serializer for the entire BatchResult."""
21+
22+
def serialize(self, value: BatchResult, _: SerDesContext) -> str:
23+
# Serialize BatchResult with custom metadata
24+
25+
wrapped = {
26+
"batch_metadata": {
27+
"serializer": "CustomBatchSerDes",
28+
"version": "2.0",
29+
"total_items": len(value.get_results()),
30+
},
31+
"success_count": value.success_count,
32+
"failure_count": value.failure_count,
33+
"results": value.get_results(),
34+
"errors": [e.to_dict() if e else None for e in value.get_errors()],
35+
}
36+
return json.dumps(wrapped)
37+
38+
def deserialize(self, payload: str, _: SerDesContext) -> BatchResult:
39+
wrapped = json.loads(payload)
40+
batch_items = []
41+
results = wrapped["results"]
42+
errors = wrapped["errors"]
43+
44+
for i, result in enumerate(results):
45+
error = errors[i] if i < len(errors) else None
46+
if error:
47+
batch_items.append(
48+
BatchItem(
49+
index=i,
50+
status=BatchItemStatus.FAILED,
51+
result=None,
52+
error=ErrorObject.from_dict(error) if error else None,
53+
)
54+
)
55+
else:
56+
batch_items.append(
57+
BatchItem(
58+
index=i,
59+
status=BatchItemStatus.SUCCEEDED,
60+
result=result,
61+
error=None,
62+
)
63+
)
64+
65+
# Infer completion reason (assume ALL_COMPLETED if all succeeded)
66+
completion_reason = (
67+
CompletionReason.ALL_COMPLETED
68+
if wrapped["failure_count"] == 0
69+
else CompletionReason.FAILURE_TOLERANCE_EXCEEDED
70+
)
71+
72+
return BatchResult(all=batch_items, completion_reason=completion_reason)
73+
74+
75+
@durable_execution
76+
def handler(_event: Any, context: DurableContext) -> dict[str, Any]:
77+
"""Process items with custom batch-level serialization."""
78+
items = [10, 20, 30, 40]
79+
80+
# Use custom serdes for the entire BatchResult, default JSON for individual items
81+
config = MapConfig(serdes=CustomBatchSerDes(), item_serdes=JsonSerDes())
82+
83+
results = context.map(
84+
inputs=items,
85+
func=lambda ctx, item, index, _: ctx.step(
86+
lambda _: item * 2, name=f"double_{index}"
87+
),
88+
name="map_with_batch_serdes",
89+
config=config,
90+
)
91+
92+
return {
93+
"success_count": results.success_count,
94+
"results": results.get_results(),
95+
"sum": sum(results.get_results()),
96+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
"""Example demonstrating map with custom serdes."""
2+
3+
import json
4+
from typing import Any
5+
6+
from aws_durable_execution_sdk_python.config import MapConfig
7+
from aws_durable_execution_sdk_python.context import DurableContext
8+
from aws_durable_execution_sdk_python.execution import durable_execution
9+
from aws_durable_execution_sdk_python.serdes import SerDes, SerDesContext
10+
11+
12+
class CustomItemSerDes(SerDes[dict[str, Any]]):
13+
"""Custom serializer for individual items that adds metadata."""
14+
15+
def serialize(self, value: dict[str, Any], _: SerDesContext) -> str:
16+
# Add custom metadata during serialization
17+
wrapped = {"data": value, "serialized_by": "CustomItemSerDes", "version": "1.0"}
18+
19+
return json.dumps(wrapped)
20+
21+
def deserialize(self, payload: str, _: SerDesContext) -> dict[str, Any]:
22+
wrapped = json.loads(payload)
23+
# Extract the original data
24+
return wrapped["data"]
25+
26+
27+
@durable_execution
28+
def handler(_event: Any, context: DurableContext) -> dict[str, Any]:
29+
"""Process items with custom item serialization.
30+
31+
This example demonstrates using item_serdes to customize serialization
32+
of individual item results, while using default serialization for the
33+
overall BatchResult.
34+
"""
35+
items = [
36+
{"id": 1, "name": "item1"},
37+
{"id": 2, "name": "item2"},
38+
{"id": 3, "name": "item3"},
39+
]
40+
41+
# Use custom serdes for individual items only
42+
# The BatchResult will use default JSON serialization
43+
config = MapConfig(item_serdes=CustomItemSerDes())
44+
45+
results = context.map(
46+
inputs=items,
47+
func=lambda ctx, item, index, _: ctx.step(
48+
lambda _: {
49+
"processed": item["name"],
50+
"index": index,
51+
"doubled_id": item["id"] * 2,
52+
},
53+
name=f"process_{index}",
54+
),
55+
name="map_with_custom_serdes",
56+
config=config,
57+
)
58+
59+
return {
60+
"success_count": results.success_count,
61+
"results": results.get_results(),
62+
"processed_names": [r["processed"] for r in results.get_results()],
63+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
"""Example demonstrating map with failure tolerance."""
2+
3+
from typing import Any
4+
5+
from aws_durable_execution_sdk_python.config import (
6+
CompletionConfig,
7+
MapConfig,
8+
StepConfig,
9+
)
10+
from aws_durable_execution_sdk_python.context import DurableContext
11+
from aws_durable_execution_sdk_python.execution import durable_execution
12+
from aws_durable_execution_sdk_python.retries import RetryStrategyConfig
13+
14+
15+
@durable_execution
16+
def handler(_event: Any, context: DurableContext) -> dict[str, Any]:
17+
"""Process items with failure tolerance."""
18+
items = list(range(1, 11)) # [1, 2, 3, ..., 10]
19+
20+
# Tolerate up to 3 failures
21+
config = MapConfig(
22+
max_concurrency=5,
23+
completion_config=CompletionConfig(tolerated_failure_count=3),
24+
)
25+
26+
# Disable retries so failures happen immediately
27+
step_config = StepConfig(retry_strategy=RetryStrategyConfig(max_attempts=1))
28+
29+
results = context.map(
30+
inputs=items,
31+
func=lambda ctx, item, index, _: ctx.step(
32+
lambda _: _process_with_failures(item),
33+
name=f"item_{index}",
34+
config=step_config,
35+
),
36+
name="map_with_tolerance",
37+
config=config,
38+
)
39+
40+
return {
41+
"success_count": results.success_count,
42+
"failure_count": results.failure_count,
43+
"succeeded": [item.result for item in results.succeeded()],
44+
"failed_count": len(results.failed()),
45+
"completion_reason": results.completion_reason.value,
46+
}
47+
48+
49+
def _process_with_failures(item: int) -> int:
50+
"""Process item - fails for items 3, 6, 9."""
51+
if item % 3 == 0:
52+
raise ValueError(f"Item {item} failed")
53+
return item * 2
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
"""Example demonstrating map with maxConcurrency limit."""
2+
3+
from typing import Any
4+
5+
from aws_durable_execution_sdk_python.config import MapConfig
6+
from aws_durable_execution_sdk_python.context import DurableContext
7+
from aws_durable_execution_sdk_python.execution import durable_execution
8+
9+
10+
@durable_execution
11+
def handler(_event: Any, context: DurableContext) -> list[int]:
12+
"""Process items with concurrency limit of 3."""
13+
items = list(range(1, 11)) # [1, 2, 3, ..., 10]
14+
15+
# Extract results immediately to avoid BatchResult serialization
16+
return context.map(
17+
inputs=items,
18+
func=lambda ctx, item, index, _: ctx.step(
19+
lambda _: item * 3, name=f"process_{index}"
20+
),
21+
name="map_with_concurrency",
22+
config=MapConfig(max_concurrency=3),
23+
).get_results()

0 commit comments

Comments
 (0)