Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions examples/examples-catalog.json
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,61 @@
"ExecutionTimeout": 300
},
"path": "./src/none_results/none_results.py"
},
{
"name": "Callback Success",
"description": "Creating a callback ID for external systems to use",
"handler": "callback_simple.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/callback/callback_simple.py"
},
{
"name": "Callback Success None",
"description": "Creating a callback ID for external systems to use",
"handler": "callback_simple.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/callback/callback_simple.py"
},
{
"name": "Create Callback Heartbeat",
"description": "Demonstrates callback failure scenarios where the error propagates and is handled by framework",
"handler": "callback_heartbeat.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/callback/callback_heartbeat.py"
},
{
"name": "Create Callback Mixed Operations",
"description": "Demonstrates createCallback mixed with steps, waits, and other operations",
"handler": "callback_mixed_ops.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/callback/callback_mixed_ops.py"
},
{
"name": "Create Callback Custom Serdes",
"description": "Demonstrates createCallback with custom serialization/deserialization for Date objects",
"handler": "callback_serdes.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/callback/callback_serdes.py"
}
]
}
22 changes: 22 additions & 0 deletions examples/src/callback/callback_heartbeat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from typing import TYPE_CHECKING, Any

from aws_durable_execution_sdk_python.config import CallbackConfig, Duration
from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution


if TYPE_CHECKING:
from aws_durable_execution_sdk_python.types import Callback


@durable_execution
def handler(_event: Any, context: DurableContext) -> str:
callback_config = CallbackConfig(
timeout=Duration.from_seconds(60), heartbeat_timeout=Duration.from_seconds(10)
)

callback: Callback[str] = context.create_callback(
name="heartbeat_callback", config=callback_config
)

return callback.result()
35 changes: 35 additions & 0 deletions examples/src/callback/callback_mixed_ops.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Demonstrates createCallback mixed with steps, waits, and other operations."""

import time
from typing import Any

from aws_durable_execution_sdk_python.config import CallbackConfig, Duration
from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution


@durable_execution
def handler(_event: Any, context: DurableContext) -> dict[str, Any]:
"""Handler demonstrating createCallback mixed with other operations."""

step_result: dict[str, Any] = context.step(
lambda _: {"userId": 123, "name": "John Doe"},
name="fetch-data",
)

callback_config = CallbackConfig(timeout=Duration.from_minutes(1))
callback = context.create_callback(
name="process-user",
config=callback_config,
)

# Mix callback with step and wait operations
context.wait(Duration.from_seconds(1), name="initial-wait")

callback_result = callback.result()

return {
"stepResult": step_result,
"callbackResult": callback_result,
"completed": True,
}
76 changes: 76 additions & 0 deletions examples/src/callback/callback_serdes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"""Demonstrates createCallback with custom serialization/deserialization for Date objects."""

import json
from datetime import datetime, timezone
from typing import Any, Optional

from aws_durable_execution_sdk_python.config import CallbackConfig, Duration
from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution
from aws_durable_execution_sdk_python.serdes import SerDes, SerDesContext


class CustomData:
"""Data structure with datetime."""

def __init__(self, id: int, message: str, timestamp: datetime):
self.id = id
self.message = message
self.timestamp = timestamp

def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary."""
return {
"id": self.id,
"message": self.message,
"timestamp": self.timestamp.isoformat(),
}

@staticmethod
def from_dict(data: dict[str, Any]) -> "CustomData":
"""Create from dictionary."""
return CustomData(
id=data["id"],
message=data["message"],
timestamp=datetime.fromisoformat(data["timestamp"].replace("Z", "+00:00")),
)


class CustomDataSerDes(SerDes[CustomData]):
"""Custom serializer for CustomData that handles datetime conversion."""

def serialize(self, value: Optional[CustomData], _: SerDesContext) -> Optional[str]:
"""Serialize CustomData to JSON string."""
if value is None:
return None
return json.dumps(value.to_dict())

def deserialize(
self, payload: Optional[str], _: SerDesContext
) -> Optional[CustomData]:
"""Deserialize JSON string to CustomData."""
if payload is None:
return None
data = json.loads(payload)
return CustomData.from_dict(data)


@durable_execution
def handler(_event: Any, context: DurableContext) -> dict[str, Any]:
"""Handler demonstrating createCallback with custom serdes."""
callback_config = CallbackConfig(
timeout=Duration.from_seconds(30),
serdes=CustomDataSerDes(),
)

callback = context.create_callback(
name="custom-serdes-callback",
config=callback_config,
)

result: CustomData = callback.result()

return {
"receivedData": result.to_dict(),
"isDateObject": isinstance(result.timestamp, datetime),
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from typing import TYPE_CHECKING, Any

from aws_durable_execution_sdk_python.config import CallbackConfig
from aws_durable_execution_sdk_python.config import CallbackConfig, Duration
from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution
from aws_durable_execution_sdk_python.config import Duration


if TYPE_CHECKING:
Expand All @@ -20,6 +19,4 @@ def handler(_event: Any, context: DurableContext) -> str:
name="example_callback", config=callback_config
)

# In a real scenario, you would pass callback.callback_id to an external system
# For this example, we'll just return the callback_id to show it was created
return f"Callback created with ID: {callback.callback_id}"
return callback.result()
55 changes: 55 additions & 0 deletions examples/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -557,3 +557,58 @@ Resources:
DurableConfig:
RetentionPeriodInDays: 7
ExecutionTimeout: 300
CallbackSimple:
Type: AWS::Serverless::Function
Properties:
CodeUri: build/
Handler: callback_simple.handler
Description: Creating a callback ID for external systems to use
Role:
Fn::GetAtt:
- DurableFunctionRole
- Arn
DurableConfig:
RetentionPeriodInDays: 7
ExecutionTimeout: 300
CallbackHeartbeat:
Type: AWS::Serverless::Function
Properties:
CodeUri: build/
Handler: callback_heartbeat.handler
Description: Demonstrates callback failure scenarios where the error propagates
and is handled by framework
Role:
Fn::GetAtt:
- DurableFunctionRole
- Arn
DurableConfig:
RetentionPeriodInDays: 7
ExecutionTimeout: 300
CallbackMixedOps:
Type: AWS::Serverless::Function
Properties:
CodeUri: build/
Handler: callback_mixed_ops.handler
Description: Demonstrates createCallback mixed with steps, waits, and other
operations
Role:
Fn::GetAtt:
- DurableFunctionRole
- Arn
DurableConfig:
RetentionPeriodInDays: 7
ExecutionTimeout: 300
CallbackSerdes:
Type: AWS::Serverless::Function
Properties:
CodeUri: build/
Handler: callback_serdes.handler
Description: Demonstrates createCallback with custom serialization/deserialization
for Date objects
Role:
Fn::GetAtt:
- DurableFunctionRole
- Arn
DurableConfig:
RetentionPeriodInDays: 7
ExecutionTimeout: 300
32 changes: 0 additions & 32 deletions examples/test/callback/test_callback.py

This file was deleted.

51 changes: 51 additions & 0 deletions examples/test/callback/test_callback_heartbeat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""Tests for create_callback_heartbeat."""

import pytest
from aws_durable_execution_sdk_python.execution import InvocationStatus
import time
import json
from src.callback import callback_heartbeat
from test.conftest import deserialize_operation_payload


@pytest.mark.example
@pytest.mark.durable_execution(
handler=callback_heartbeat.handler,
lambda_function_name="Create Callback Heartbeat",
)
def test_handle_callback_operations_with_failure_uncaught(durable_runner):
"""Test handling callback operations with failure."""
test_payload = {"shouldCatchError": False}

heartbeat_interval = 5
total_duration = 20
num_heartbeats = total_duration // heartbeat_interval

with durable_runner:
execution_arn = durable_runner.run_async(input=test_payload, timeout=30)

callback_id = durable_runner.wait_for_callback(execution_arn=execution_arn)

for i in range(num_heartbeats):
print(
f"Sending heartbeat {i + 1}/{num_heartbeats} at {(i + 1) * heartbeat_interval}s"
)
durable_runner.send_callback_heartbeat(callback_id=callback_id)
time.sleep(heartbeat_interval)

callback_result = json.dumps(
{
"status": "completed",
"data": "success after heartbeats",
}
)
durable_runner.send_callback_success(
callback_id=callback_id, result=callback_result.encode()
)

result = durable_runner.wait_for_result(execution_arn=execution_arn)
assert result.status is InvocationStatus.SUCCEEDED

# Assert the callback result is returned
result_data = deserialize_operation_payload(result.result)
assert result_data == callback_result
Loading
Loading