diff --git a/examples/examples-catalog.json b/examples/examples-catalog.json index ae9a8de..944fd9c 100644 --- a/examples/examples-catalog.json +++ b/examples/examples-catalog.json @@ -444,6 +444,61 @@ "ExecutionTimeout": 300 }, "path": "./src/none_results/none_results.py" + }, + { + "name": "Callback Success", + "description": "Creating a callback ID for external systems to use", + "handler": "callback_simple.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/callback/callback_simple.py" + }, + { + "name": "Callback Success None", + "description": "Creating a callback ID for external systems to use", + "handler": "callback_simple.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/callback/callback_simple.py" + }, + { + "name": "Create Callback Heartbeat", + "description": "Demonstrates callback failure scenarios where the error propagates and is handled by framework", + "handler": "callback_heartbeat.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/callback/callback_heartbeat.py" + }, + { + "name": "Create Callback Mixed Operations", + "description": "Demonstrates createCallback mixed with steps, waits, and other operations", + "handler": "callback_mixed_ops.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/callback/callback_mixed_ops.py" + }, + { + "name": "Create Callback Custom Serdes", + "description": "Demonstrates createCallback with custom serialization/deserialization for Date objects", + "handler": "callback_serdes.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/callback/callback_serdes.py" } ] } diff --git a/examples/src/callback/callback_heartbeat.py b/examples/src/callback/callback_heartbeat.py new file mode 100644 index 0000000..b439d52 --- /dev/null +++ b/examples/src/callback/callback_heartbeat.py @@ -0,0 +1,22 @@ +from typing import TYPE_CHECKING, Any + +from aws_durable_execution_sdk_python.config import CallbackConfig, Duration +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution + + +if TYPE_CHECKING: + from aws_durable_execution_sdk_python.types import Callback + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> str: + callback_config = CallbackConfig( + timeout=Duration.from_seconds(60), heartbeat_timeout=Duration.from_seconds(10) + ) + + callback: Callback[str] = context.create_callback( + name="heartbeat_callback", config=callback_config + ) + + return callback.result() diff --git a/examples/src/callback/callback_mixed_ops.py b/examples/src/callback/callback_mixed_ops.py new file mode 100644 index 0000000..089b17d --- /dev/null +++ b/examples/src/callback/callback_mixed_ops.py @@ -0,0 +1,35 @@ +"""Demonstrates createCallback mixed with steps, waits, and other operations.""" + +import time +from typing import Any + +from aws_durable_execution_sdk_python.config import CallbackConfig, Duration +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> dict[str, Any]: + """Handler demonstrating createCallback mixed with other operations.""" + + step_result: dict[str, Any] = context.step( + lambda _: {"userId": 123, "name": "John Doe"}, + name="fetch-data", + ) + + callback_config = CallbackConfig(timeout=Duration.from_minutes(1)) + callback = context.create_callback( + name="process-user", + config=callback_config, + ) + + # Mix callback with step and wait operations + context.wait(Duration.from_seconds(1), name="initial-wait") + + callback_result = callback.result() + + return { + "stepResult": step_result, + "callbackResult": callback_result, + "completed": True, + } diff --git a/examples/src/callback/callback_serdes.py b/examples/src/callback/callback_serdes.py new file mode 100644 index 0000000..c624a79 --- /dev/null +++ b/examples/src/callback/callback_serdes.py @@ -0,0 +1,76 @@ +"""Demonstrates createCallback with custom serialization/deserialization for Date objects.""" + +import json +from datetime import datetime, timezone +from typing import Any, Optional + +from aws_durable_execution_sdk_python.config import CallbackConfig, Duration +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.serdes import SerDes, SerDesContext + + +class CustomData: + """Data structure with datetime.""" + + def __init__(self, id: int, message: str, timestamp: datetime): + self.id = id + self.message = message + self.timestamp = timestamp + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary.""" + return { + "id": self.id, + "message": self.message, + "timestamp": self.timestamp.isoformat(), + } + + @staticmethod + def from_dict(data: dict[str, Any]) -> "CustomData": + """Create from dictionary.""" + return CustomData( + id=data["id"], + message=data["message"], + timestamp=datetime.fromisoformat(data["timestamp"].replace("Z", "+00:00")), + ) + + +class CustomDataSerDes(SerDes[CustomData]): + """Custom serializer for CustomData that handles datetime conversion.""" + + def serialize(self, value: Optional[CustomData], _: SerDesContext) -> Optional[str]: + """Serialize CustomData to JSON string.""" + if value is None: + return None + return json.dumps(value.to_dict()) + + def deserialize( + self, payload: Optional[str], _: SerDesContext + ) -> Optional[CustomData]: + """Deserialize JSON string to CustomData.""" + if payload is None: + return None + data = json.loads(payload) + return CustomData.from_dict(data) + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> dict[str, Any]: + """Handler demonstrating createCallback with custom serdes.""" + callback_config = CallbackConfig( + timeout=Duration.from_seconds(30), + serdes=CustomDataSerDes(), + ) + + callback = context.create_callback( + name="custom-serdes-callback", + config=callback_config, + ) + + result: CustomData = callback.result() + + return { + "receivedData": result.to_dict(), + "isDateObject": isinstance(result.timestamp, datetime), + } diff --git a/examples/src/callback/callback.py b/examples/src/callback/callback_simple.py similarity index 69% rename from examples/src/callback/callback.py rename to examples/src/callback/callback_simple.py index 1078788..063aad1 100644 --- a/examples/src/callback/callback.py +++ b/examples/src/callback/callback_simple.py @@ -1,9 +1,8 @@ from typing import TYPE_CHECKING, Any -from aws_durable_execution_sdk_python.config import CallbackConfig +from aws_durable_execution_sdk_python.config import CallbackConfig, Duration from aws_durable_execution_sdk_python.context import DurableContext from aws_durable_execution_sdk_python.execution import durable_execution -from aws_durable_execution_sdk_python.config import Duration if TYPE_CHECKING: @@ -20,6 +19,4 @@ def handler(_event: Any, context: DurableContext) -> str: name="example_callback", config=callback_config ) - # In a real scenario, you would pass callback.callback_id to an external system - # For this example, we'll just return the callback_id to show it was created - return f"Callback created with ID: {callback.callback_id}" + return callback.result() diff --git a/examples/template.yaml b/examples/template.yaml index 67d4c29..eab43f9 100644 --- a/examples/template.yaml +++ b/examples/template.yaml @@ -557,3 +557,58 @@ Resources: DurableConfig: RetentionPeriodInDays: 7 ExecutionTimeout: 300 + CallbackSimple: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: callback_simple.handler + Description: Creating a callback ID for external systems to use + Role: + Fn::GetAtt: + - DurableFunctionRole + - Arn + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 + CallbackHeartbeat: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: callback_heartbeat.handler + Description: Demonstrates callback failure scenarios where the error propagates + and is handled by framework + Role: + Fn::GetAtt: + - DurableFunctionRole + - Arn + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 + CallbackMixedOps: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: callback_mixed_ops.handler + Description: Demonstrates createCallback mixed with steps, waits, and other + operations + Role: + Fn::GetAtt: + - DurableFunctionRole + - Arn + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 + CallbackSerdes: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: callback_serdes.handler + Description: Demonstrates createCallback with custom serialization/deserialization + for Date objects + Role: + Fn::GetAtt: + - DurableFunctionRole + - Arn + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 diff --git a/examples/test/callback/test_callback.py b/examples/test/callback/test_callback.py deleted file mode 100644 index 4d9f95f..0000000 --- a/examples/test/callback/test_callback.py +++ /dev/null @@ -1,32 +0,0 @@ -"""Tests for callback example.""" - -import pytest -from aws_durable_execution_sdk_python.execution import InvocationStatus - -from src.callback import callback -from test.conftest import deserialize_operation_payload - - -@pytest.mark.example -@pytest.mark.durable_execution( - handler=callback.handler, - lambda_function_name="callback", -) -def test_callback(durable_runner): - """Test callback example.""" - with durable_runner: - result = durable_runner.run(input="test", timeout=10) - - assert result.status is InvocationStatus.SUCCEEDED - assert deserialize_operation_payload(result.result).startswith( - "Callback created with ID:" - ) - - # Find the callback operation - callback_ops = [ - op for op in result.operations if op.operation_type.value == "CALLBACK" - ] - assert len(callback_ops) == 1 - callback_op = callback_ops[0] - assert callback_op.name == "example_callback" - assert callback_op.callback_id is not None diff --git a/examples/test/callback/test_callback_heartbeat.py b/examples/test/callback/test_callback_heartbeat.py new file mode 100644 index 0000000..66a99e9 --- /dev/null +++ b/examples/test/callback/test_callback_heartbeat.py @@ -0,0 +1,51 @@ +"""Tests for create_callback_heartbeat.""" + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus +import time +import json +from src.callback import callback_heartbeat +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=callback_heartbeat.handler, + lambda_function_name="Create Callback Heartbeat", +) +def test_handle_callback_operations_with_failure_uncaught(durable_runner): + """Test handling callback operations with failure.""" + test_payload = {"shouldCatchError": False} + + heartbeat_interval = 5 + total_duration = 20 + num_heartbeats = total_duration // heartbeat_interval + + with durable_runner: + execution_arn = durable_runner.run_async(input=test_payload, timeout=30) + + callback_id = durable_runner.wait_for_callback(execution_arn=execution_arn) + + for i in range(num_heartbeats): + print( + f"Sending heartbeat {i + 1}/{num_heartbeats} at {(i + 1) * heartbeat_interval}s" + ) + durable_runner.send_callback_heartbeat(callback_id=callback_id) + time.sleep(heartbeat_interval) + + callback_result = json.dumps( + { + "status": "completed", + "data": "success after heartbeats", + } + ) + durable_runner.send_callback_success( + callback_id=callback_id, result=callback_result.encode() + ) + + result = durable_runner.wait_for_result(execution_arn=execution_arn) + assert result.status is InvocationStatus.SUCCEEDED + + # Assert the callback result is returned + result_data = deserialize_operation_payload(result.result) + assert result_data == callback_result diff --git a/examples/test/callback/test_callback_mixed_ops.py b/examples/test/callback/test_callback_mixed_ops.py new file mode 100644 index 0000000..f87c06c --- /dev/null +++ b/examples/test/callback/test_callback_mixed_ops.py @@ -0,0 +1,49 @@ +"""Tests for create_callback_mixed_ops.""" + +import json +import time + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus + +from src.callback import callback_mixed_ops +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=callback_mixed_ops.handler, + lambda_function_name="Create Callback Mixed Operations", +) +def test_handle_callback_operations_mixed_with_other_operation_types(durable_runner): + """Test callback operations mixed with other operation types.""" + with durable_runner: + execution_arn = durable_runner.run_async(input=None, timeout=30) + callback_id = durable_runner.wait_for_callback(execution_arn=execution_arn) + callback_result = json.dumps( + { + "processed": True, + } + ) + durable_runner.send_callback_success( + callback_id=callback_id, result=callback_result.encode() + ) + result = durable_runner.wait_for_result(execution_arn=execution_arn) + + assert result.status is InvocationStatus.SUCCEEDED + + result_data = deserialize_operation_payload(result.result) + + assert result_data == { + "stepResult": {"userId": 123, "name": "John Doe"}, + "callbackResult": callback_result, + "completed": True, + } + + completed_operations = result.operations + assert len(completed_operations) == 3 + + operation_types = [op.operation_type.value for op in completed_operations] + assert "WAIT" in operation_types + assert "STEP" in operation_types + assert "CALLBACK" in operation_types diff --git a/examples/test/callback/test_callback_permutations.py b/examples/test/callback/test_callback_permutations.py deleted file mode 100644 index 36d9e5b..0000000 --- a/examples/test/callback/test_callback_permutations.py +++ /dev/null @@ -1,30 +0,0 @@ -"""Tests for callback operation permutations.""" - -import pytest -from aws_durable_execution_sdk_python.execution import InvocationStatus - -from src.callback import callback_with_timeout -from test.conftest import deserialize_operation_payload - - -@pytest.mark.example -@pytest.mark.durable_execution( - handler=callback_with_timeout.handler, - lambda_function_name="callback with timeout", -) -def test_callback_with_timeout(durable_runner): - """Test callback with custom timeout configuration.""" - with durable_runner: - result = durable_runner.run(input="test", timeout=10) - - assert result.status is InvocationStatus.SUCCEEDED - assert deserialize_operation_payload(result.result).startswith( - "Callback created with 60s timeout:" - ) - - callback_ops = [ - op for op in result.operations if op.operation_type.value == "CALLBACK" - ] - assert len(callback_ops) == 1 - assert callback_ops[0].name == "timeout_callback" - assert callback_ops[0].callback_id is not None diff --git a/examples/test/callback/test_callback_serdes.py b/examples/test/callback/test_callback_serdes.py new file mode 100644 index 0000000..b007782 --- /dev/null +++ b/examples/test/callback/test_callback_serdes.py @@ -0,0 +1,60 @@ +"""Tests for create_callback_serdes.""" + +import json +from datetime import datetime, timezone + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus + +from src.callback.callback_serdes import CustomData, CustomDataSerDes +from src.callback import callback_serdes +from test.conftest import deserialize_operation_payload + + +class CustomDataTestSerDes(CustomDataSerDes): + """Test version of CustomDataSerDes for use in tests.""" + + pass + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=callback_serdes.handler, + lambda_function_name="Create Callback Custom Serdes", +) +def test_handle_callback_operations_with_custom_serdes(durable_runner): + """Test callback operations with custom serdes.""" + with durable_runner: + # Start the execution (this will pause at the callback) + execution_arn = durable_runner.run_async(input=None, timeout=30) + + # Wait for callback and get callback_id + callback_id = durable_runner.wait_for_callback(execution_arn=execution_arn) + + # Send data that requires custom serialization + test_data = CustomData( + id=42, + message="Hello World", + timestamp=datetime(2025, 1, 1, 0, 0, 0, tzinfo=timezone.utc), + ) + + # Serialize the data using custom serdes for sending + serdes = CustomDataTestSerDes() + serialized_data = serdes.serialize(test_data, None) + + durable_runner.send_callback_success( + callback_id=callback_id, result=serialized_data.encode() + ) + + # Wait for the execution to complete + result = durable_runner.wait_for_result(execution_arn=execution_arn) + + assert result.status is InvocationStatus.SUCCEEDED + + result_data = deserialize_operation_payload(result.result) + + # Verify the result structure + assert result_data["receivedData"]["id"] == 42 + assert result_data["receivedData"]["message"] == "Hello World" + assert "2025-01-01T00:00:00" in result_data["receivedData"]["timestamp"] + assert result_data["isDateObject"] is True diff --git a/examples/test/callback/test_callback_simple.py b/examples/test/callback/test_callback_simple.py new file mode 100644 index 0000000..678e542 --- /dev/null +++ b/examples/test/callback/test_callback_simple.py @@ -0,0 +1,47 @@ +"""Tests for callback example.""" + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus + +from src.callback import callback_simple +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=callback_simple.handler, + lambda_function_name="Callback Success", +) +def test_callback_success(durable_runner): + callback_result = "successful" + + with durable_runner: + execution_arn = durable_runner.run_async(input=None, timeout=30) + callback_id = durable_runner.wait_for_callback(execution_arn=execution_arn) + durable_runner.send_callback_success( + callback_id=callback_id, result=callback_result.encode() + ) + result = durable_runner.wait_for_result(execution_arn=execution_arn) + + assert result.status is InvocationStatus.SUCCEEDED + + result_data = deserialize_operation_payload(result.result) + assert result_data == callback_result + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=callback_simple.handler, + lambda_function_name="Callback Success None", +) +def test_callback_success_none_result(durable_runner): + with durable_runner: + execution_arn = durable_runner.run_async(input=None, timeout=30) + callback_id = durable_runner.wait_for_callback(execution_arn=execution_arn) + durable_runner.send_callback_success(callback_id=callback_id, result=b"") + result = durable_runner.wait_for_result(execution_arn=execution_arn) + + assert result.status is InvocationStatus.SUCCEEDED + + result_data = deserialize_operation_payload(result.result) + assert result_data is None