diff --git a/examples/examples-catalog.json b/examples/examples-catalog.json index 561323a..ae9a8de 100644 --- a/examples/examples-catalog.json +++ b/examples/examples-catalog.json @@ -100,6 +100,105 @@ }, "path": "./src/wait_for_callback/wait_for_callback.py" }, + { + "name": "Wait For Callback Success Anonymous", + "description": "Usage of context.wait_for_callback() to wait for external system responses", + "handler": "wait_for_callback_anonymous.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/wait_for_callback/wait_for_callback_anonymous.py" + }, + { + "name": "Wait For Callback Heartbeat Sends", + "description": "Usage of context.wait_for_callback() to wait for external system responses", + "handler": "wait_for_callback_heartbeat.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/wait_for_callback/wait_for_callback_heartbeat.py" + }, + { + "name": "Wait For Callback With Child Context", + "description": "Usage of context.wait_for_callback() to wait for external system responses", + "handler": "wait_for_callback_child.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/wait_for_callback/wait_for_callback_child.py" + }, + { + "name": "Wait For Callback Mixed Ops", + "description": "Usage of context.wait_for_callback() to wait for external system responses", + "handler": "wait_for_callback_mixed_ops.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/wait_for_callback/wait_for_callback_mixed_ops.py" + }, + { + "name": "Wait For Callback Multiple Invocations", + "description": "Usage of context.wait_for_callback() to wait for external system responses", + "handler": "wait_for_callback_multiple_invocations.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/wait_for_callback/wait_for_callback_multiple_invocations.py" + }, + { + "name": "Wait For Callback Failing Submitter Catchable", + "description": "Usage of context.wait_for_callback() to wait for external system responses", + "handler": "wait_for_callback_submitter_failure_catchable.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/wait_for_callback/wait_for_callback_submitter_failure_catchable.py" + }, + { + "name": "Wait For Callback Submitter Failure", + "description": "Usage of context.wait_for_callback() to wait for external system responses", + "handler": "wait_for_callback_submitter_failure.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/wait_for_callback/wait_for_callback_submitter_failure.py" + }, + { + "name": "Wait For Callback Serdes", + "description": "Usage of context.wait_for_callback() to wait for external system responses", + "handler": "wait_for_callback_serdes.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/wait_for_callback/wait_for_callback_serdes.py" + }, + { + "name": "Wait For Callback Nested", + "description": "Usage of context.wait_for_callback() to wait for external system responses", + "handler": "wait_for_callback_nested.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/wait_for_callback/wait_for_callback_nested.py" + }, { "name": "Run in Child Context", "description": "Usage of context.run_in_child_context() to execute operations in isolated contexts", diff --git a/examples/src/wait_for_callback/wait_for_callback_anonymous.py b/examples/src/wait_for_callback/wait_for_callback_anonymous.py new file mode 100644 index 0000000..9327ac7 --- /dev/null +++ b/examples/src/wait_for_callback/wait_for_callback_anonymous.py @@ -0,0 +1,18 @@ +"""Demonstrates waitForCallback with anonymous (inline) submitter function.""" + +import time +from typing import Any + +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> dict[str, Any]: + """Handler demonstrating waitForCallback with anonymous submitter.""" + result: str = context.wait_for_callback(lambda _: time.sleep(1)) + + return { + "callbackResult": result, + "completed": True, + } diff --git a/examples/src/wait_for_callback/wait_for_callback_child.py b/examples/src/wait_for_callback/wait_for_callback_child.py new file mode 100644 index 0000000..2f50c67 --- /dev/null +++ b/examples/src/wait_for_callback/wait_for_callback_child.py @@ -0,0 +1,42 @@ +"""Demonstrates waitForCallback operations within child contexts.""" + +from typing import Any + +from aws_durable_execution_sdk_python.context import ( + DurableContext, + durable_with_child_context, +) +from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.config import Duration + + +@durable_with_child_context +def child_context_with_callback(child_context: DurableContext) -> dict[str, Any]: + """Child context containing wait and callback operations.""" + child_context.wait(Duration.from_seconds(1), name="child-wait") + + child_callback_result: str = child_context.wait_for_callback( + lambda _: None, name="child-callback-op" + ) + + return { + "childResult": child_callback_result, + "childProcessed": True, + } + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> dict[str, Any]: + """Handler demonstrating waitForCallback within child contexts.""" + parent_result: str = context.wait_for_callback( + lambda _: None, name="parent-callback-op" + ) + + child_context_result: dict[str, Any] = context.run_in_child_context( + child_context_with_callback(), name="child-context-with-callback" + ) + + return { + "parentResult": parent_result, + "childContextResult": child_context_result, + } diff --git a/examples/src/wait_for_callback/wait_for_callback_heartbeat.py b/examples/src/wait_for_callback/wait_for_callback_heartbeat.py new file mode 100644 index 0000000..ac4c398 --- /dev/null +++ b/examples/src/wait_for_callback/wait_for_callback_heartbeat.py @@ -0,0 +1,31 @@ +"""Demonstrates sending heartbeats during long-running callback processing.""" + +import time +from typing import Any + +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.config import Duration +from aws_durable_execution_sdk_python.config import WaitForCallbackConfig + + +def submitter(_callback_id: str) -> None: + """Simulate long-running submitter function.""" + time.sleep(5) + return None + + +@durable_execution +def handler(event: dict[str, Any], context: DurableContext) -> dict[str, Any]: + """Handler demonstrating waitForCallback with heartbeat timeout.""" + + config = WaitForCallbackConfig( + timeout=Duration.from_seconds(120), heartbeat_timeout=Duration.from_seconds(15) + ) + + result: str = context.wait_for_callback(submitter, config=config) + + return { + "callbackResult": result, + "completed": True, + } diff --git a/examples/src/wait_for_callback/wait_for_callback_mixed_ops.py b/examples/src/wait_for_callback/wait_for_callback_mixed_ops.py new file mode 100644 index 0000000..107ec19 --- /dev/null +++ b/examples/src/wait_for_callback/wait_for_callback_mixed_ops.py @@ -0,0 +1,47 @@ +"""Demonstrates waitForCallback combined with steps, waits, and other operations.""" + +import time +from typing import Any + +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.config import Duration + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> dict[str, Any]: + """Handler demonstrating waitForCallback mixed with other operations.""" + # Mix waitForCallback with other operation types + context.wait(Duration.from_seconds(1), name="initial-wait") + + step_result: dict[str, Any] = context.step( + lambda _: {"userId": 123, "name": "John Doe"}, + name="fetch-user-data", + ) + + def submitter(_) -> None: + """Submitter uses data from previous step.""" + time.sleep(0.1) + return None + + callback_result: str = context.wait_for_callback( + submitter, + name="wait-for-callback", + ) + + context.wait(Duration.from_seconds(2), name="final-wait") + + final_step: dict[str, Any] = context.step( + lambda _: { + "status": "completed", + "timestamp": int(time.time() * 1000), + }, + name="finalize-processing", + ) + + return { + "stepResult": step_result, + "callbackResult": callback_result, + "finalStep": final_step, + "workflowCompleted": True, + } diff --git a/examples/src/wait_for_callback/wait_for_callback_multiple_invocations.py b/examples/src/wait_for_callback/wait_for_callback_multiple_invocations.py new file mode 100644 index 0000000..3793adc --- /dev/null +++ b/examples/src/wait_for_callback/wait_for_callback_multiple_invocations.py @@ -0,0 +1,53 @@ +"""Demonstrates multiple invocations tracking with waitForCallback operations across different invocations.""" + +from typing import Any + +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.config import Duration + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> dict[str, Any]: + """Handler demonstrating multiple invocations with waitForCallback operations.""" + # First invocation - wait operation + context.wait(Duration.from_seconds(1), name="wait-invocation-1") + + # First callback operation + def first_submitter(callback_id: str) -> None: + """Submitter for first callback.""" + print(f"First callback submitted with ID: {callback_id}") + return None + + callback_result_1: str = context.wait_for_callback( + first_submitter, + name="first-callback", + ) + + # Step operation between callbacks + step_result: dict[str, Any] = context.step( + lambda _: {"processed": True, "step": 1}, + name="process-callback-data", + ) + + # Second invocation - another wait operation + context.wait(Duration.from_seconds(1), name="wait-invocation-2") + + # Second callback operation + def second_submitter(callback_id: str) -> None: + """Submitter for second callback.""" + print(f"Second callback submitted with ID: {callback_id}") + return None + + callback_result_2: str = context.wait_for_callback( + second_submitter, + name="second-callback", + ) + + # Final invocation returns complete result + return { + "firstCallback": callback_result_1, + "secondCallback": callback_result_2, + "stepResult": step_result, + "invocationCount": "multiple", + } diff --git a/examples/src/wait_for_callback/wait_for_callback_nested.py b/examples/src/wait_for_callback/wait_for_callback_nested.py new file mode 100644 index 0000000..f855ac3 --- /dev/null +++ b/examples/src/wait_for_callback/wait_for_callback_nested.py @@ -0,0 +1,66 @@ +"""Demonstrates nested waitForCallback operations across multiple child context levels.""" + +from typing import Any + +from aws_durable_execution_sdk_python.context import ( + DurableContext, + durable_with_child_context, +) +from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.config import Duration + + +@durable_with_child_context +def inner_child_context(inner_child_ctx: DurableContext) -> dict[str, Any]: + """Inner child context with deep nested callback.""" + inner_child_ctx.wait(Duration.from_seconds(5), name="deep-wait") + + nested_callback_result: str = inner_child_ctx.wait_for_callback( + lambda _: None, + name="nested-callback-op", + ) + + return { + "nestedCallback": nested_callback_result, + "deepLevel": "inner-child", + } + + +@durable_with_child_context +def outer_child_context(outer_child_ctx: DurableContext) -> dict[str, Any]: + """Outer child context with inner callback and nested context.""" + inner_result: str = outer_child_ctx.wait_for_callback( + lambda _: None, + name="inner-callback-op", + ) + + # Nested child context with another callback + deep_nested_result: dict[str, Any] = outer_child_ctx.run_in_child_context( + inner_child_context(), + name="inner-child-context", + ) + + return { + "innerCallback": inner_result, + "deepNested": deep_nested_result, + "level": "outer-child", + } + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> dict[str, Any]: + """Handler demonstrating nested waitForCallback operations across multiple levels.""" + outer_result: str = context.wait_for_callback( + lambda _: None, + name="outer-callback-op", + ) + + nested_result: dict[str, Any] = context.run_in_child_context( + outer_child_context(), + name="outer-child-context", + ) + + return { + "outerCallback": outer_result, + "nestedResults": nested_result, + } diff --git a/examples/src/wait_for_callback/wait_for_callback_serdes.py b/examples/src/wait_for_callback/wait_for_callback_serdes.py new file mode 100644 index 0000000..e266412 --- /dev/null +++ b/examples/src/wait_for_callback/wait_for_callback_serdes.py @@ -0,0 +1,90 @@ +"""Demonstrates waitForCallback with custom serialization/deserialization.""" + +import json +from datetime import datetime +from typing import Any, Optional, TypedDict + +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.config import Duration, WaitForCallbackConfig +from aws_durable_execution_sdk_python.serdes import SerDes + + +class CustomDataMetadata(TypedDict): + """Metadata for CustomData.""" + + version: str + processed: bool + + +class CustomData(TypedDict): + """Custom data structure with datetime.""" + + id: int + message: str + timestamp: datetime + metadata: CustomDataMetadata + + +class CustomSerdes(SerDes[CustomData]): + """Custom serialization/deserialization for CustomData.""" + + @staticmethod + def serialize(data: CustomData, _=None) -> str: + """Serialize CustomData to JSON string.""" + if data is None: + return None + + serialized_data = { + "id": data["id"], + "message": data["message"], + "timestamp": data["timestamp"].isoformat(), + "metadata": data["metadata"], + "_serializedBy": "custom-serdes-v1", + } + return json.dumps(serialized_data) + + @staticmethod + def deserialize(data_str: str, _=None) -> CustomData: + """Deserialize JSON string to CustomData.""" + if data_str is None: + return None + + parsed = json.loads(data_str) + return CustomData( + id=parsed["id"], + message=parsed["message"], + timestamp=datetime.fromisoformat( + parsed["timestamp"].replace("Z", "+00:00") + ), + metadata=CustomDataMetadata( + version=parsed["metadata"]["version"], + processed=parsed["metadata"]["processed"], + ), + ) + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> dict[str, Any]: + """Handler demonstrating waitForCallback with custom serdes.""" + + config = WaitForCallbackConfig( + timeout=Duration.from_seconds(10), + heartbeat_timeout=Duration.from_seconds(20), + serdes=CustomSerdes(), + ) + + result: CustomData = context.wait_for_callback( + lambda _: None, + name="custom-serdes-callback", + config=config, + ) + + isDateObject = isinstance(result["timestamp"], datetime) + # convert timestamp to isoformat because lambda only accepts default json type as result + result["timestamp"] = result["timestamp"].isoformat() + + return { + "receivedData": result, + "isDateObject": isDateObject, + } diff --git a/examples/src/wait_for_callback/wait_for_callback_submitter_failure.py b/examples/src/wait_for_callback/wait_for_callback_submitter_failure.py new file mode 100644 index 0000000..780c7fa --- /dev/null +++ b/examples/src/wait_for_callback/wait_for_callback_submitter_failure.py @@ -0,0 +1,39 @@ +"""Demonstrates waitForCallback with submitter retry strategy using exponential backoff (0.5s, 1s, 2s).""" + +from typing import Any + +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.retries import ( + RetryStrategyConfig, + create_retry_strategy, +) +from aws_durable_execution_sdk_python.config import Duration, WaitForCallbackConfig + + +@durable_execution +def handler(event: dict[str, Any], context: DurableContext) -> dict[str, Any]: + """Handler demonstrating waitForCallback with submitter retry and exponential backoff.""" + + def submitter(callback_id: str) -> None: + """Submitter function that can fail based on event parameter.""" + print(f"Submitting callback to external system - callbackId: {callback_id}") + raise Exception("Simulated submitter failure") + + config = WaitForCallbackConfig( + timeout=Duration.from_seconds(10), + heartbeat_timeout=Duration.from_seconds(20), + retry_strategy=create_retry_strategy( + config=RetryStrategyConfig( + max_attempts=3, + initial_delay=Duration.from_seconds(1), + max_delay=Duration.from_seconds(1), + ) + ), + ) + + result: str = context.wait_for_callback( + submitter, + name="retry-submitter-callback", + config=config, + ) diff --git a/examples/src/wait_for_callback/wait_for_callback_submitter_failure_catchable.py b/examples/src/wait_for_callback/wait_for_callback_submitter_failure_catchable.py new file mode 100644 index 0000000..ec24ae4 --- /dev/null +++ b/examples/src/wait_for_callback/wait_for_callback_submitter_failure_catchable.py @@ -0,0 +1,52 @@ +"""Demonstrates waitForCallback with submitter function that fails.""" + +import time +from typing import Any + +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.retries import ( + RetryStrategyConfig, + create_retry_strategy, +) +from aws_durable_execution_sdk_python.config import Duration, WaitForCallbackConfig + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> dict[str, Any]: + """Handler demonstrating waitForCallback with failing submitter.""" + + def submitter(_) -> None: + """Submitter function that fails after a delay.""" + time.sleep(0.5) + # Submitter fails + raise Exception("Submitter failed") + + config = WaitForCallbackConfig( + timeout=Duration.from_seconds(10), + heartbeat_timeout=Duration.from_seconds(20), + retry_strategy=create_retry_strategy( + config=RetryStrategyConfig( + max_attempts=3, + initial_delay=Duration.from_seconds(1), + max_delay=Duration.from_seconds(1), + ) + ), + ) + + try: + result: str = context.wait_for_callback( + submitter, + name="failing-submitter-callback", + config=config, + ) + + return { + "callbackResult": result, + "success": True, + } + except Exception as error: + return { + "success": False, + "error": str(error), + } diff --git a/examples/template.yaml b/examples/template.yaml index 545481c..67d4c29 100644 --- a/examples/template.yaml +++ b/examples/template.yaml @@ -142,6 +142,132 @@ Resources: DurableConfig: RetentionPeriodInDays: 7 ExecutionTimeout: 300 + WaitForCallbackAnonymous: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: wait_for_callback_anonymous.handler + Description: Usage of context.wait_for_callback() to wait for external system + responses + Role: + Fn::GetAtt: + - DurableFunctionRole + - Arn + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 + WaitForCallbackHeartbeat: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: wait_for_callback_heartbeat.handler + Description: Usage of context.wait_for_callback() to wait for external system + responses + Role: + Fn::GetAtt: + - DurableFunctionRole + - Arn + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 + WaitForCallbackChild: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: wait_for_callback_child.handler + Description: Usage of context.wait_for_callback() to wait for external system + responses + Role: + Fn::GetAtt: + - DurableFunctionRole + - Arn + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 + WaitForCallbackMixedOps: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: wait_for_callback_mixed_ops.handler + Description: Usage of context.wait_for_callback() to wait for external system + responses + Role: + Fn::GetAtt: + - DurableFunctionRole + - Arn + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 + WaitForCallbackMultipleInvocations: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: wait_for_callback_multiple_invocations.handler + Description: Usage of context.wait_for_callback() to wait for external system + responses + Role: + Fn::GetAtt: + - DurableFunctionRole + - Arn + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 + WaitForCallbackSubmitterFailureCatchable: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: wait_for_callback_submitter_failure_catchable.handler + Description: Usage of context.wait_for_callback() to wait for external system + responses + Role: + Fn::GetAtt: + - DurableFunctionRole + - Arn + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 + WaitForCallbackSubmitterFailure: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: wait_for_callback_submitter_failure.handler + Description: Usage of context.wait_for_callback() to wait for external system + responses + Role: + Fn::GetAtt: + - DurableFunctionRole + - Arn + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 + WaitForCallbackSerdes: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: wait_for_callback_serdes.handler + Description: Usage of context.wait_for_callback() to wait for external system + responses + Role: + Fn::GetAtt: + - DurableFunctionRole + - Arn + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 + WaitForCallbackNested: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: wait_for_callback_nested.handler + Description: Usage of context.wait_for_callback() to wait for external system + responses + Role: + Fn::GetAtt: + - DurableFunctionRole + - Arn + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 RunInChildContext: Type: AWS::Serverless::Function Properties: diff --git a/examples/test/wait_for_callback/test_wait_for_callback_anonymous.py b/examples/test/wait_for_callback/test_wait_for_callback_anonymous.py new file mode 100644 index 0000000..d047da2 --- /dev/null +++ b/examples/test/wait_for_callback/test_wait_for_callback_anonymous.py @@ -0,0 +1,39 @@ +"""Tests for wait_for_callback_anonymous.""" + +import json + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus + +from src.wait_for_callback import wait_for_callback_anonymous +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=wait_for_callback_anonymous.handler, + lambda_function_name="Wait For Callback Success Anonymous", +) +def test_handle_basic_wait_for_callback_with_anonymous_submitter(durable_runner): + """Test basic waitForCallback with anonymous submitter.""" + with durable_runner: + execution_arn = durable_runner.run_async(input=None, timeout=30) + callback_id = durable_runner.wait_for_callback(execution_arn=execution_arn) + callback_result = json.dumps({"data": "callback_completed"}) + durable_runner.send_callback_success( + callback_id=callback_id, result=callback_result.encode() + ) + + result = durable_runner.wait_for_result(execution_arn=execution_arn) + + assert result.status is InvocationStatus.SUCCEEDED + + result_data = deserialize_operation_payload(result.result) + + assert result_data == { + "callbackResult": callback_result, + "completed": True, + } + + # Verify operations were tracked + assert len(result.operations) > 0 diff --git a/examples/test/wait_for_callback/test_wait_for_callback_child.py b/examples/test/wait_for_callback/test_wait_for_callback_child.py new file mode 100644 index 0000000..3016a36 --- /dev/null +++ b/examples/test/wait_for_callback/test_wait_for_callback_child.py @@ -0,0 +1,73 @@ +"""Tests for wait_for_callback_child_context.""" + +import json + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus + +from src.wait_for_callback import wait_for_callback_child +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=wait_for_callback_child.handler, + lambda_function_name="Wait For Callback With Child Context", +) +def test_handle_wait_for_callback_within_child_contexts(durable_runner): + """Test waitForCallback within child contexts.""" + test_payload = {"test": "child-context-callbacks"} + + with durable_runner: + execution_arn = durable_runner.run_async(input=test_payload, timeout=30) + # Wait for parent callback and get callback_id + parent_callback_id = durable_runner.wait_for_callback( + execution_arn=execution_arn + ) + # Send parent callback result + parent_callback_result = json.dumps({"parentData": "parent-completed"}) + durable_runner.send_callback_success( + callback_id=parent_callback_id, result=parent_callback_result.encode() + ) + # Wait for child callback and get callback_id + child_callback_id = durable_runner.wait_for_callback( + execution_arn=execution_arn, name="child-callback-op create callback id" + ) + # Send child callback result + child_callback_result = json.dumps({"childData": 42}) + durable_runner.send_callback_success( + callback_id=child_callback_id, result=child_callback_result.encode() + ) + # Wait for the execution to complete + result = durable_runner.wait_for_result(execution_arn=execution_arn) + + assert result.status is InvocationStatus.SUCCEEDED + result_data = deserialize_operation_payload(result.result) + assert result_data == { + "parentResult": parent_callback_result, + "childContextResult": { + "childResult": child_callback_result, + "childProcessed": True, + }, + } + + # Find the child context operation + child_context_ops = [ + op + for op in result.operations + if op.operation_type.value == "CONTEXT" + and op.name == "child-context-with-callback" + ] + assert len(child_context_ops) == 1 + child_context_op = child_context_ops[0] + + # Verify child operations are accessible + child_operations = child_context_op.child_operations + assert child_operations is not None + assert len(child_operations) == 2 # wait + waitForCallback + + all_ops = result.get_all_operations() + + # Verify completed operations count + completed_operations = [op for op in all_ops if op.status.value == "SUCCEEDED"] + assert len(completed_operations) == 8 diff --git a/examples/test/wait_for_callback/test_wait_for_callback_heartbeat.py b/examples/test/wait_for_callback/test_wait_for_callback_heartbeat.py new file mode 100644 index 0000000..bdbf627 --- /dev/null +++ b/examples/test/wait_for_callback/test_wait_for_callback_heartbeat.py @@ -0,0 +1,62 @@ +"""Tests for wait_for_callback_heartbeat_sends.""" + +import json +import time + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus + +from src.wait_for_callback import wait_for_callback_heartbeat +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=wait_for_callback_heartbeat.handler, + lambda_function_name="Wait For Callback Heartbeat Sends", +) +def test_handle_wait_for_callback_heartbeat_scenarios_during_long_running_submitter( + durable_runner, +): + """Test waitForCallback heartbeat scenarios during long-running submitter execution.""" + + with durable_runner: + # Start the execution (this will pause at the callback) + execution_arn = durable_runner.run_async( + input={"input": "test_payload"}, timeout=60 + ) + + # Wait for callback and get callback_id + callback_id = durable_runner.wait_for_callback(execution_arn=execution_arn) + + # Send heartbeat to keep the callback alive during processing + durable_runner.send_callback_heartbeat(callback_id=callback_id) + + # Wait a bit more to simulate callback processing time + wait_time = 7.0 + time.sleep(wait_time) + + # Send another heartbeat + durable_runner.send_callback_heartbeat(callback_id=callback_id) + + # Finally complete the callback + callback_result = json.dumps({"processed": 1000}) + durable_runner.send_callback_success( + callback_id=callback_id, result=callback_result.encode() + ) + + # Wait for the execution to complete + result = durable_runner.wait_for_result(execution_arn=execution_arn) + + assert result.status is InvocationStatus.SUCCEEDED + + result_data = deserialize_operation_payload(result.result) + + assert result_data["callbackResult"] == callback_result + assert result_data["completed"] is True + + # Should have completed operations with successful callback + completed_operations = [ + op for op in result.operations if op.status.value == "SUCCEEDED" + ] + assert len(completed_operations) > 0 diff --git a/examples/test/wait_for_callback/test_wait_for_callback_mixed_ops.py b/examples/test/wait_for_callback/test_wait_for_callback_mixed_ops.py new file mode 100644 index 0000000..4f4f982 --- /dev/null +++ b/examples/test/wait_for_callback/test_wait_for_callback_mixed_ops.py @@ -0,0 +1,52 @@ +"""Tests for wait_for_callback_mixed_ops.""" + +import json + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus + +from src.wait_for_callback import wait_for_callback_mixed_ops +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=wait_for_callback_mixed_ops.handler, + lambda_function_name="Wait For Callback Mixed Ops", +) +def test_handle_wait_for_callback_mixed_with_steps_waits_and_other_operations( + durable_runner, +): + """Test waitForCallback mixed with steps, waits, and other operations.""" + with durable_runner: + # Start the execution (this will pause at the callback) + execution_arn = durable_runner.run_async(input=None, timeout=30) + + # Wait for callback and get callback_id + callback_id = durable_runner.wait_for_callback(execution_arn=execution_arn) + + # Complete the callback + callback_result = json.dumps({"processed": True}) + durable_runner.send_callback_success( + callback_id=callback_id, result=callback_result.encode() + ) + + # Wait for the execution to complete + result = durable_runner.wait_for_result(execution_arn=execution_arn) + + assert result.status is InvocationStatus.SUCCEEDED + + result_data = deserialize_operation_payload(result.result) + + # Verify all expected fields + assert result_data["stepResult"] == {"userId": 123, "name": "John Doe"} + assert result_data["callbackResult"] == callback_result + assert result_data["finalStep"]["status"] == "completed" + assert isinstance(result_data["finalStep"]["timestamp"], int) + assert result_data["workflowCompleted"] is True + + # Verify all operations were tracked - should have wait, step, waitForCallback (context + callback + submitter), wait, step + completed_operations = [ + op for op in result.get_all_operations() if op.status.value == "SUCCEEDED" + ] + assert len(completed_operations) == 7 diff --git a/examples/test/wait_for_callback/test_wait_for_callback_multiple_invocations.py b/examples/test/wait_for_callback/test_wait_for_callback_multiple_invocations.py new file mode 100644 index 0000000..8c297dc --- /dev/null +++ b/examples/test/wait_for_callback/test_wait_for_callback_multiple_invocations.py @@ -0,0 +1,74 @@ +"""Tests for wait_for_callback_multiple_invocations.""" + +import json +import time + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus + +from src.wait_for_callback import ( + wait_for_callback_multiple_invocations, +) +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=wait_for_callback_multiple_invocations.handler, + lambda_function_name="Wait For Callback Multiple Invocations", +) +def test_handle_multiple_invocations_tracking_with_wait_for_callback_operations( + durable_runner, +): + """Test multiple invocations tracking with waitForCallback operations.""" + test_payload = {"test": "multiple-invocations"} + + with durable_runner: + # Start the execution (this will pause at callbacks) + execution_arn = durable_runner.run_async(input=test_payload, timeout=60) + + # Wait for first callback and get callback_id + first_callback_id = durable_runner.wait_for_callback( + execution_arn=execution_arn + ) + + # Complete first callback + first_callback_result = json.dumps({"step": 1}) + durable_runner.send_callback_success( + callback_id=first_callback_id, result=first_callback_result.encode() + ) + + # Wait for second callback and get callback_id + second_callback_id = durable_runner.wait_for_callback( + execution_arn=execution_arn, name="second-callback create callback id" + ) + + # Complete second callback + second_callback_result = json.dumps({"step": 2}) + durable_runner.send_callback_success( + callback_id=second_callback_id, result=second_callback_result.encode() + ) + + # Wait for the execution to complete + result = durable_runner.wait_for_result(execution_arn=execution_arn) + + assert result.status is InvocationStatus.SUCCEEDED + + result_data = deserialize_operation_payload(result.result) + + assert result_data == { + "firstCallback": '{"step": 1}', + "secondCallback": '{"step": 2}', + "stepResult": {"processed": True, "step": 1}, + "invocationCount": "multiple", + } + + # Verify invocations were tracked - should be exactly 5 invocations + # Note: Check if Python SDK provides invocations tracking + if hasattr(result, "invocations"): + invocations = result.invocations + assert len(invocations) == 5 + + # Verify operations were executed + operations = result.operations + assert len(operations) > 4 # wait + callback + step + wait + callback operations diff --git a/examples/test/wait_for_callback/test_wait_for_callback_nested.py b/examples/test/wait_for_callback/test_wait_for_callback_nested.py new file mode 100644 index 0000000..2c1c941 --- /dev/null +++ b/examples/test/wait_for_callback/test_wait_for_callback_nested.py @@ -0,0 +1,101 @@ +"""Tests for wait_for_callback_nested.""" + +import json + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus + +from src.wait_for_callback import wait_for_callback_nested +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=wait_for_callback_nested.handler, + lambda_function_name="Wait For Callback Nested", +) +def test_handle_nested_wait_for_callback_operations_in_child_contexts(durable_runner): + """Test nested waitForCallback operations in child contexts.""" + with durable_runner: + # Start the execution (this will pause at callbacks) + execution_arn = durable_runner.run_async(input=None, timeout=60) + + # Complete outer callback first + outer_callback_id = durable_runner.wait_for_callback( + execution_arn=execution_arn + ) + outer_callback_result = json.dumps({"level": "outer-completed"}) + durable_runner.send_callback_success( + callback_id=outer_callback_id, result=outer_callback_result.encode() + ) + + # Complete inner callback + inner_callback_id = durable_runner.wait_for_callback( + execution_arn=execution_arn, name="inner-callback-op create callback id" + ) + inner_callback_result = json.dumps({"level": "inner-completed"}) + durable_runner.send_callback_success( + callback_id=inner_callback_id, result=inner_callback_result.encode() + ) + + # Complete nested callback + nested_callback_id = durable_runner.wait_for_callback( + execution_arn=execution_arn, name="nested-callback-op create callback id" + ) + nested_callback_result = json.dumps({"level": "nested-completed"}) + durable_runner.send_callback_success( + callback_id=nested_callback_id, result=nested_callback_result.encode() + ) + + # Wait for the execution to complete + result = durable_runner.wait_for_result(execution_arn=execution_arn) + + assert result.status is InvocationStatus.SUCCEEDED + + result_data = deserialize_operation_payload(result.result) + + assert result_data == { + "outerCallback": outer_callback_result, + "nestedResults": { + "innerCallback": inner_callback_result, + "deepNested": { + "nestedCallback": nested_callback_result, + "deepLevel": "inner-child", + }, + "level": "outer-child", + }, + } + + # Get all operations including nested ones + all_ops = result.get_all_operations() + + # Find the outer context operation + outer_context_ops = [ + op + for op in result.operations + if op.operation_type.value == "CONTEXT" and op.name == "outer-child-context" + ] + assert len(outer_context_ops) == 1 + outer_context_op = outer_context_ops[0] + + # Verify outer child operations hierarchy + outer_children = outer_context_op.child_operations + assert outer_children is not None + assert len(outer_children) == 2 # inner callback + inner context + + # Find the inner context operation + inner_context_ops = [ + op + for op in all_ops + if op.operation_type.value == "CONTEXT" and op.name == "inner-child-context" + ] + assert len(inner_context_ops) == 1 + inner_context_op = inner_context_ops[0] + + # Verify inner child operations hierarchy + inner_children = inner_context_op.child_operations + assert inner_children is not None + assert len(inner_children) == 2 # deep wait + nested callback + + # Should have tracked all operations + assert len(all_ops) == 12 diff --git a/examples/test/wait_for_callback/test_wait_for_callback_serdes.py b/examples/test/wait_for_callback/test_wait_for_callback_serdes.py new file mode 100644 index 0000000..1333f88 --- /dev/null +++ b/examples/test/wait_for_callback/test_wait_for_callback_serdes.py @@ -0,0 +1,66 @@ +"""Tests for wait_for_callback_serdes.""" + +import json +from datetime import datetime, timezone + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus + +from src.wait_for_callback import wait_for_callback_serdes +from src.wait_for_callback.wait_for_callback_serdes import CustomSerdes +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=wait_for_callback_serdes.handler, + lambda_function_name="Wait For Callback Serdes", +) +def test_handle_wait_for_callback_with_custom_serdes_configuration(durable_runner): + """Test waitForCallback with custom serdes configuration.""" + with durable_runner: + # Start the execution (this will pause at the callback) + execution_arn = durable_runner.run_async(input=None, timeout=30) + + # Wait for callback and get callback_id + callback_id = durable_runner.wait_for_callback(execution_arn=execution_arn) + + # Send data that requires custom serialization + test_data = { + "id": 42, + "message": "Hello Custom Serdes", + "timestamp": datetime(2025, 6, 15, 12, 30, 45, tzinfo=timezone.utc), + "metadata": { + "version": "2.0.0", + "processed": True, + }, + } + + # Serialize the data using custom serdes for sending + custom_serdes = CustomSerdes() + serialized_data = custom_serdes.serialize(test_data) + durable_runner.send_callback_success( + callback_id=callback_id, result=serialized_data.encode() + ) + + # Wait for the execution to complete + result = durable_runner.wait_for_result(execution_arn=execution_arn) + + assert result.status is InvocationStatus.SUCCEEDED + + result_data = deserialize_operation_payload(result.result) + + # The result will always get stringified since it's the lambda response + # DateTime will be serialized to ISO string in the final result + assert result_data["receivedData"]["id"] == 42 + assert result_data["receivedData"]["message"] == "Hello Custom Serdes" + assert "2025-06-15T12:30:45" in result_data["receivedData"]["timestamp"] + assert result_data["receivedData"]["metadata"]["version"] == "2.0.0" + assert result_data["receivedData"]["metadata"]["processed"] is True + assert result_data["isDateObject"] is True + + # Should have completed operations with successful callback + completed_operations = [ + op for op in result.operations if op.status.value == "SUCCEEDED" + ] + assert len(completed_operations) > 0 diff --git a/examples/test/wait_for_callback/test_wait_for_callback_submitter_failure.py b/examples/test/wait_for_callback/test_wait_for_callback_submitter_failure.py new file mode 100644 index 0000000..e4463c8 --- /dev/null +++ b/examples/test/wait_for_callback/test_wait_for_callback_submitter_failure.py @@ -0,0 +1,32 @@ +"""Tests for wait_for_callback_submitter_retry_success.""" + +import json + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus + +from src.wait_for_callback import ( + wait_for_callback_submitter_failure, +) + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=wait_for_callback_submitter_failure.handler, + lambda_function_name="Wait For Callback Submitter Failure", +) +def test_fail_after_exhausting_retries_when_submitter_always_fails(durable_runner): + """Test that execution fails after exhausting retries when submitter always fails.""" + test_payload = {"shouldFail": True} + + with durable_runner: + execution_arn = durable_runner.run_async(input=test_payload, timeout=30) + result = durable_runner.wait_for_result(execution_arn=execution_arn) + + # Execution should fail after retries are exhausted + assert result.status is InvocationStatus.FAILED + + # Verify error details + error = result.error + assert error is not None + assert "Simulated submitter failure" in error.message diff --git a/examples/test/wait_for_callback/test_wait_for_callback_submitter_failure_catchable.py b/examples/test/wait_for_callback/test_wait_for_callback_submitter_failure_catchable.py new file mode 100644 index 0000000..b3458d0 --- /dev/null +++ b/examples/test/wait_for_callback/test_wait_for_callback_submitter_failure_catchable.py @@ -0,0 +1,28 @@ +"""Tests for wait_for_callback_failing_submitter.""" + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus + +from src.wait_for_callback import wait_for_callback_submitter_failure_catchable +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=wait_for_callback_submitter_failure_catchable.handler, + lambda_function_name="Wait For Callback Failing Submitter Catchable", +) +def test_handle_wait_for_callback_with_failing_submitter_function_errors( + durable_runner, +): + """Test waitForCallback with failing submitter function errors.""" + with durable_runner: + execution_arn = durable_runner.run_async(input=None, timeout=30) + result = durable_runner.wait_for_result(execution_arn=execution_arn) + + result_data = deserialize_operation_payload(result.result) + + assert result_data == { + "success": False, + "error": "Submitter failed", + } diff --git a/src/aws_durable_execution_sdk_python_testing/runner.py b/src/aws_durable_execution_sdk_python_testing/runner.py index 977b748..31343e5 100644 --- a/src/aws_durable_execution_sdk_python_testing/runner.py +++ b/src/aws_durable_execution_sdk_python_testing/runner.py @@ -556,6 +556,18 @@ def get_invoke(self, name: str) -> InvokeOperation: def get_execution(self, name: str) -> ExecutionOperation: return cast(ExecutionOperation, self.get_operation_by_name(name)) + def get_all_operations(self) -> list[Operation]: + """Recursively get all operations including nested ones.""" + all_ops = [] + stack = list(self.operations) + while stack: + op = stack.pop() + all_ops.append(op) + # Add child operations to stack (if they exist) + if hasattr(op, "child_operations") and op.child_operations: + stack.extend(op.child_operations) + return all_ops + class DurableFunctionTestRunner: def __init__(self, handler: Callable, poll_interval: float = 1.0):