diff --git a/examples/examples-catalog.json b/examples/examples-catalog.json index ae9a8de..58d84ac 100644 --- a/examples/examples-catalog.json +++ b/examples/examples-catalog.json @@ -346,6 +346,17 @@ }, "path": "./src/map/map_with_failure_tolerance.py" }, + { + "name": "Map Completion Config", + "description": "Reproduces issue where map with minSuccessful loses failure count", + "handler": "map_completion.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/map/map_completion.py" + }, { "name": "Parallel with Max Concurrency", "description": "Parallel operation with maxConcurrency limit", diff --git a/examples/src/map/map_completion.py b/examples/src/map/map_completion.py new file mode 100644 index 0000000..02db238 --- /dev/null +++ b/examples/src/map/map_completion.py @@ -0,0 +1,117 @@ +"""Reproduces issue where map with minSuccessful loses failure count.""" + +from typing import Any + +from aws_durable_execution_sdk_python.config import ( + CompletionConfig, + MapConfig, + StepConfig, + Duration, +) +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.retries import ( + RetryStrategyConfig, + create_retry_strategy, +) + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> dict[str, Any]: + """Handler demonstrating map with completion config issue.""" + # Test data: Items 2 and 4 will fail (40% failure rate) + items = [ + {"id": 1, "shouldFail": False}, + {"id": 2, "shouldFail": True}, # Will fail + {"id": 3, "shouldFail": False}, + {"id": 4, "shouldFail": True}, # Will fail + {"id": 5, "shouldFail": False}, + ] + + # Fixed completion config that causes the issue + completion_config = CompletionConfig( + min_successful=2, + tolerated_failure_percentage=50, + ) + + context.logger.info( + f"Starting map with config: min_successful=2, tolerated_failure_percentage=50" + ) + context.logger.info( + f"Items pattern: {', '.join(['FAIL' if i['shouldFail'] else 'SUCCESS' for i in items])}" + ) + + def process_item( + ctx: DurableContext, item: dict[str, Any], index: int, _ + ) -> dict[str, Any]: + """Process each item in the map.""" + context.logger.info( + f"Processing item {item['id']} (index {index}), shouldFail: {item['shouldFail']}" + ) + + retry_config = RetryStrategyConfig( + max_attempts=2, + initial_delay=Duration.from_seconds(1), + max_delay=Duration.from_seconds(1), + ) + step_config = StepConfig(retry_strategy=create_retry_strategy(retry_config)) + + def step_function(_: DurableContext) -> dict[str, Any]: + """Step that processes or fails based on item.""" + if item["shouldFail"]: + raise Exception(f"Processing failed for item {item['id']}") + return { + "itemId": item["id"], + "processed": True, + "result": f"Item {item['id']} processed successfully", + } + + return ctx.step( + step_function, + name=f"process-item-{index}", + config=step_config, + ) + + config = MapConfig( + max_concurrency=3, + completion_config=completion_config, + ) + + results = context.map( + inputs=items, + func=process_item, + name="completion-config-items", + config=config, + ) + + context.logger.info("Map completed with results:") + context.logger.info(f"Total items processed: {results.total_count}") + context.logger.info(f"Successful items: {results.success_count}") + context.logger.info(f"Failed items: {results.failure_count}") + context.logger.info(f"Has failures: {results.has_failure}") + context.logger.info(f"Batch status: {results.status}") + context.logger.info(f"Completion reason: {results.completion_reason}") + + return { + "totalItems": results.total_count, + "successfulCount": results.success_count, + "failedCount": results.failure_count, + "hasFailures": results.has_failure, + "batchStatus": str(results.status), + "completionReason": str(results.completion_reason), + "successfulItems": [ + { + "index": item.index, + "itemId": items[item.index]["id"], + } + for item in results.succeeded() + ], + "failedItems": [ + { + "index": item.index, + "itemId": items[item.index]["id"], + "error": str(item.error), + } + for item in results.failed() + ], + } diff --git a/examples/template.yaml b/examples/template.yaml index 67d4c29..b8d4b6a 100644 --- a/examples/template.yaml +++ b/examples/template.yaml @@ -440,6 +440,19 @@ Resources: DurableConfig: RetentionPeriodInDays: 7 ExecutionTimeout: 300 + MapCompletion: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: map_completion.handler + Description: Reproduces issue where map with minSuccessful loses failure count + Role: + Fn::GetAtt: + - DurableFunctionRole + - Arn + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 ParallelWithMaxConcurrency: Type: AWS::Serverless::Function Properties: diff --git a/examples/test/map/test_map_completion.py b/examples/test/map/test_map_completion.py new file mode 100644 index 0000000..5e672e3 --- /dev/null +++ b/examples/test/map/test_map_completion.py @@ -0,0 +1,42 @@ +"""Tests for map_completion.""" + +import json + +import pytest + +from src.map import map_completion +from test.conftest import deserialize_operation_payload +from aws_durable_execution_sdk_python.execution import InvocationStatus + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=map_completion.handler, + lambda_function_name="Map Completion Config", +) +def test_reproduce_completion_config_behavior_with_detailed_logging(durable_runner): + """Demonstrates map behavior with minSuccessful and concurrent execution.""" + with durable_runner: + result = durable_runner.run(input=None, timeout=60) + + assert result.status is InvocationStatus.SUCCEEDED + + result_data = deserialize_operation_payload(result.result) + + # 4 or 5 items are processed despite min_successful=2, which is expected due to the concurrent executor nature. + # When the completion requirements are met and 2 items succeed, a completion event is set and the main thread + # continues to cancel remaining futures. However, background threads cannot be stopped immediately since they're + # not in the critical section. There's a gap between setting the completion_event and all futures actually stopping, + # during which concurrent threads continue processing and increment counters. With max_concurrency=3 and 5 items, + # 4 or 5 items may complete before the cancellation takes effect. This means >= 4 items are processed as expected + # due to concurrency, with 4 or 5 items being typical in practice. + # + # Additionally, failure_count shows 0 because failed items have retry strategies configured and are still retrying + # when execution completes. Failures aren't finalized until retries complete, so they don't appear in the failure_count. + + assert result_data["totalItems"] >= 4 + assert result_data["successfulCount"] >= 2 + assert result_data["failedCount"] == 0 + assert result_data["hasFailures"] is False + assert result_data["batchStatus"] == "BatchItemStatus.SUCCEEDED" + assert result_data["completionReason"] == "CompletionReason.MIN_SUCCESSFUL_REACHED"