Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions examples/examples-catalog.json
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,17 @@
},
"path": "./src/map/map_with_failure_tolerance.py"
},
{
"name": "Map Completion Config",
"description": "Reproduces issue where map with minSuccessful loses failure count",
"handler": "map_completion.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/map/map_completion.py"
},
{
"name": "Parallel with Max Concurrency",
"description": "Parallel operation with maxConcurrency limit",
Expand Down
117 changes: 117 additions & 0 deletions examples/src/map/map_completion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
"""Reproduces issue where map with minSuccessful loses failure count."""

from typing import Any

from aws_durable_execution_sdk_python.config import (
CompletionConfig,
MapConfig,
StepConfig,
Duration,
)
from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution
from aws_durable_execution_sdk_python.retries import (
RetryStrategyConfig,
create_retry_strategy,
)


@durable_execution
def handler(_event: Any, context: DurableContext) -> dict[str, Any]:
"""Handler demonstrating map with completion config issue."""
# Test data: Items 2 and 4 will fail (40% failure rate)
items = [
{"id": 1, "shouldFail": False},
{"id": 2, "shouldFail": True}, # Will fail
{"id": 3, "shouldFail": False},
{"id": 4, "shouldFail": True}, # Will fail
{"id": 5, "shouldFail": False},
]

# Fixed completion config that causes the issue
completion_config = CompletionConfig(
min_successful=2,
tolerated_failure_percentage=50,
)

context.logger.info(
f"Starting map with config: min_successful=2, tolerated_failure_percentage=50"
)
context.logger.info(
f"Items pattern: {', '.join(['FAIL' if i['shouldFail'] else 'SUCCESS' for i in items])}"
)

def process_item(
ctx: DurableContext, item: dict[str, Any], index: int, _
) -> dict[str, Any]:
"""Process each item in the map."""
context.logger.info(
f"Processing item {item['id']} (index {index}), shouldFail: {item['shouldFail']}"
)

retry_config = RetryStrategyConfig(
max_attempts=2,
initial_delay=Duration.from_seconds(1),
max_delay=Duration.from_seconds(1),
)
step_config = StepConfig(retry_strategy=create_retry_strategy(retry_config))

def step_function(_: DurableContext) -> dict[str, Any]:
"""Step that processes or fails based on item."""
if item["shouldFail"]:
raise Exception(f"Processing failed for item {item['id']}")
return {
"itemId": item["id"],
"processed": True,
"result": f"Item {item['id']} processed successfully",
}

return ctx.step(
step_function,
name=f"process-item-{index}",
config=step_config,
)

config = MapConfig(
max_concurrency=3,
completion_config=completion_config,
)

results = context.map(
inputs=items,
func=process_item,
name="completion-config-items",
config=config,
)

context.logger.info("Map completed with results:")
context.logger.info(f"Total items processed: {results.total_count}")
context.logger.info(f"Successful items: {results.success_count}")
context.logger.info(f"Failed items: {results.failure_count}")
context.logger.info(f"Has failures: {results.has_failure}")
context.logger.info(f"Batch status: {results.status}")
context.logger.info(f"Completion reason: {results.completion_reason}")

return {
"totalItems": results.total_count,
"successfulCount": results.success_count,
"failedCount": results.failure_count,
"hasFailures": results.has_failure,
"batchStatus": str(results.status),
"completionReason": str(results.completion_reason),
"successfulItems": [
{
"index": item.index,
"itemId": items[item.index]["id"],
}
for item in results.succeeded()
],
"failedItems": [
{
"index": item.index,
"itemId": items[item.index]["id"],
"error": str(item.error),
}
for item in results.failed()
],
}
13 changes: 13 additions & 0 deletions examples/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,19 @@ Resources:
DurableConfig:
RetentionPeriodInDays: 7
ExecutionTimeout: 300
MapCompletion:
Type: AWS::Serverless::Function
Properties:
CodeUri: build/
Handler: map_completion.handler
Description: Reproduces issue where map with minSuccessful loses failure count
Role:
Fn::GetAtt:
- DurableFunctionRole
- Arn
DurableConfig:
RetentionPeriodInDays: 7
ExecutionTimeout: 300
ParallelWithMaxConcurrency:
Type: AWS::Serverless::Function
Properties:
Expand Down
42 changes: 42 additions & 0 deletions examples/test/map/test_map_completion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""Tests for map_completion."""

import json

import pytest

from src.map import map_completion
from test.conftest import deserialize_operation_payload
from aws_durable_execution_sdk_python.execution import InvocationStatus


@pytest.mark.example
@pytest.mark.durable_execution(
handler=map_completion.handler,
lambda_function_name="Map Completion Config",
)
def test_reproduce_completion_config_behavior_with_detailed_logging(durable_runner):
"""Demonstrates map behavior with minSuccessful and concurrent execution."""
with durable_runner:
result = durable_runner.run(input=None, timeout=60)

assert result.status is InvocationStatus.SUCCEEDED

result_data = deserialize_operation_payload(result.result)

# 4 or 5 items are processed despite min_successful=2, which is expected due to the concurrent executor nature.
# When the completion requirements are met and 2 items succeed, a completion event is set and the main thread
# continues to cancel remaining futures. However, background threads cannot be stopped immediately since they're
# not in the critical section. There's a gap between setting the completion_event and all futures actually stopping,
# during which concurrent threads continue processing and increment counters. With max_concurrency=3 and 5 items,
# 4 or 5 items may complete before the cancellation takes effect. This means >= 4 items are processed as expected
# due to concurrency, with 4 or 5 items being typical in practice.
#
# Additionally, failure_count shows 0 because failed items have retry strategies configured and are still retrying
# when execution completes. Failures aren't finalized until retries complete, so they don't appear in the failure_count.

assert result_data["totalItems"] >= 4
assert result_data["successfulCount"] >= 2
assert result_data["failedCount"] == 0
assert result_data["hasFailures"] is False
assert result_data["batchStatus"] == "BatchItemStatus.SUCCEEDED"
assert result_data["completionReason"] == "CompletionReason.MIN_SUCCESSFUL_REACHED"
Loading