Skip to content

Commit 77f87dd

Browse files
author
Rares Polenciuc
committed
feat: add callback examples
1 parent 4e8ecb8 commit 77f87dd

File tree

9 files changed

+419
-1
lines changed

9 files changed

+419
-1
lines changed
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
import threading
2+
import queue
3+
from enum import StrEnum
4+
from time import sleep
5+
from typing import Callable, Optional
6+
7+
8+
class RunnerMode(StrEnum):
9+
"""Runner mode for local or cloud execution."""
10+
11+
LOCAL = "local"
12+
CLOUD = "cloud"
13+
14+
15+
class ExternalSystem:
16+
_instance = None
17+
_lock = threading.Lock()
18+
19+
def __new__(cls):
20+
if cls._instance is None:
21+
with cls._lock:
22+
if cls._instance is None:
23+
cls._instance = super().__new__(cls)
24+
cls._instance._initialized = False
25+
return cls._instance
26+
27+
def __init__(self):
28+
if self._initialized:
29+
return
30+
self._call_queue = queue.Queue()
31+
self._worker_thread = None
32+
self._shutdown_flag = threading.Event()
33+
34+
self._mode = RunnerMode.CLOUD
35+
self._success_handler = self._cloud_success_handler
36+
self._failure_handler = self._cloud_failure_handler
37+
self._heartbeat_handler = self._cloud_heartbeat_handler
38+
self._initialized = True
39+
40+
@property
41+
def mode(self) -> RunnerMode:
42+
return self._mode
43+
44+
def activate_local_mode(
45+
self,
46+
success_handler: Optional[Callable[[str, bytes], None]] = None,
47+
failure_handler: Optional[Callable[[str, Exception], None]] = None,
48+
heartbeat_handler: Optional[Callable[[str], None]] = None,
49+
):
50+
"""Activate local mode with custom handlers."""
51+
self._mode = RunnerMode.LOCAL
52+
self._success_handler = success_handler
53+
self._failure_handler = failure_handler
54+
self._heartbeat_handler = heartbeat_handler
55+
56+
def activate_cloud_mode(self):
57+
"""Activate cloud mode with boto3 handlers."""
58+
self._mode = RunnerMode.CLOUD
59+
self._success_handler = self._cloud_success_handler
60+
self._failure_handler = self._cloud_failure_handler
61+
self._heartbeat_handler = self._cloud_heartbeat_handler
62+
63+
def send_success(self, callback_id: str, msg: bytes):
64+
"""Send success callback."""
65+
self._call_queue.put(("success", callback_id, msg), timeout=0.5)
66+
67+
def send_failure(self, callback_id: str, error: Exception):
68+
"""Send failure callback."""
69+
self._call_queue.put(("failure", callback_id, error), timeout=0.5)
70+
71+
def send_heartbeat(self, callback_id: str):
72+
"""Send heartbeat callback."""
73+
self._call_queue.put(("heartbeat", callback_id, None), timeout=0.5)
74+
75+
def start(self):
76+
if self._worker_thread is None or not self._worker_thread.is_alive():
77+
self._worker_thread = threading.Thread(target=self._worker, daemon=True)
78+
self._worker_thread.start()
79+
80+
def _worker(self):
81+
"""Background worker that processes callbacks."""
82+
while not self._shutdown_flag.is_set():
83+
try:
84+
operation_type, callback_id, data = self._call_queue.get(timeout=0.5)
85+
86+
if operation_type == "success" and self._success_handler:
87+
self._success_handler(callback_id, data)
88+
elif operation_type == "failure" and self._failure_handler:
89+
self._failure_handler(callback_id, data)
90+
elif operation_type == "heartbeat" and self._heartbeat_handler:
91+
self._heartbeat_handler(callback_id)
92+
93+
self._call_queue.task_done()
94+
except queue.Empty:
95+
continue
96+
97+
def reset(self):
98+
"""Reset the external system state."""
99+
# Clear the queue
100+
while not self._call_queue.empty():
101+
try:
102+
self._call_queue.get_nowait()
103+
self._call_queue.task_done()
104+
except queue.Empty:
105+
break
106+
107+
def shutdown(self):
108+
"""Shutdown the worker thread."""
109+
self._shutdown_flag.set()
110+
111+
# Clear the queue
112+
while not self._call_queue.empty():
113+
try:
114+
self._call_queue.get_nowait()
115+
self._call_queue.task_done()
116+
except queue.Empty:
117+
break
118+
119+
# Wait for thread to finish
120+
if self._worker_thread and self._worker_thread.is_alive():
121+
self._worker_thread.join(timeout=1)
122+
123+
# Reset for next test
124+
self._worker_thread = None
125+
self._shutdown_flag.clear()
126+
127+
@classmethod
128+
def reset_instance(cls):
129+
"""Reset the singleton instance."""
130+
with cls._lock:
131+
if cls._instance:
132+
cls._instance.shutdown()
133+
cls._instance = None
134+
135+
def _cloud_success_handler(self, callback_id: str, msg: bytes):
136+
"""Default cloud success handler using boto3."""
137+
try:
138+
import boto3
139+
import os
140+
141+
client = boto3.client(
142+
"lambdainternal",
143+
endpoint_url=os.environ.get("LAMBDA_ENDPOINT"),
144+
region_name=os.environ.get("AWS_REGION", "us-west-2"),
145+
)
146+
147+
client.send_durable_execution_callback_success(
148+
CallbackId=callback_id, Result=msg.decode("utf-8") if msg else None
149+
)
150+
except Exception:
151+
pass # Fail silently in cloud mode
152+
153+
def _cloud_failure_handler(self, callback_id: str, error: Exception):
154+
"""Default cloud failure handler using boto3."""
155+
try:
156+
import boto3
157+
import os
158+
159+
client = boto3.client(
160+
"lambdainternal",
161+
endpoint_url=os.environ.get("LAMBDA_ENDPOINT"),
162+
region_name=os.environ.get("AWS_REGION", "us-west-2"),
163+
)
164+
165+
client.send_durable_execution_callback_failure(
166+
CallbackId=callback_id, Error=str(error)
167+
)
168+
except Exception:
169+
pass # Fail silently in cloud mode
170+
171+
def _cloud_heartbeat_handler(self, callback_id: str):
172+
"""Default cloud heartbeat handler using boto3."""
173+
try:
174+
import boto3
175+
import os
176+
177+
client = boto3.client(
178+
"lambdainternal",
179+
endpoint_url=os.environ.get("LAMBDA_ENDPOINT"),
180+
region_name=os.environ.get("AWS_REGION", "us-west-2"),
181+
)
182+
183+
client.send_durable_execution_callback_heartbeat(CallbackId=callback_id)
184+
except Exception:
185+
pass # Fail silently in cloud mode
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from typing import Any
2+
3+
from aws_durable_execution_sdk_python import DurableContext, durable_execution
4+
from aws_durable_execution_sdk_python.config import WaitForCallbackConfig, Duration
5+
from .external_system import ExternalSystem # noqa: TID252
6+
7+
external_system = ExternalSystem() # Singleton instance
8+
9+
10+
@durable_execution
11+
def handler(_event: Any, context: DurableContext) -> str:
12+
name = "Callback Failure"
13+
config = WaitForCallbackConfig(timeout=Duration(10), retry_strategy=None)
14+
15+
def submitter(callback_id: str) -> None:
16+
"""Submitter function that triggers failure."""
17+
try:
18+
raise Exception("Callback failed")
19+
except Exception as e:
20+
external_system.send_failure(callback_id, e)
21+
external_system.start()
22+
23+
try:
24+
context.wait_for_callback(submitter, name=name, config=config)
25+
return "OK"
26+
except Exception as e:
27+
result = str(e)
28+
return result
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from typing import Any
2+
3+
from aws_durable_execution_sdk_python import DurableContext, durable_execution
4+
from aws_durable_execution_sdk_python.config import WaitForCallbackConfig, Duration
5+
from .external_system import ExternalSystem # noqa: TID252
6+
7+
external_system = ExternalSystem() # Singleton instance
8+
9+
10+
@durable_execution
11+
def handler(_event: Any, context: DurableContext) -> str:
12+
name = "Callback Heartbeat"
13+
config = WaitForCallbackConfig(timeout=Duration(30), retry_strategy=None)
14+
15+
def submitter(callback_id: str) -> None:
16+
"""Submitter function that sends heartbeat then succeeds."""
17+
external_system.send_heartbeat(callback_id)
18+
external_system.send_success(callback_id, b"")
19+
external_system.start()
20+
21+
context.wait_for_callback(submitter, name=name, config=config)
22+
return "OK"
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from typing import Any
2+
3+
from aws_durable_execution_sdk_python import DurableContext, durable_execution
4+
from aws_durable_execution_sdk_python.config import WaitForCallbackConfig, Duration
5+
from .external_system import ExternalSystem # noqa: TID252
6+
7+
external_system = ExternalSystem() # Singleton instance
8+
9+
10+
@durable_execution
11+
def handler(_event: Any, context: DurableContext) -> str:
12+
name = "Callback Waiting"
13+
config = WaitForCallbackConfig(timeout=Duration(30), retry_strategy=None)
14+
15+
def submitter(callback_id: str) -> None:
16+
"""Submitter function."""
17+
external_system.send_success(callback_id, b"")
18+
external_system.start()
19+
20+
context.wait_for_callback(submitter, name=name, config=config)
21+
22+
return "OK"

examples/test/conftest.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@
1010
from typing import Any
1111

1212
import pytest
13-
from aws_durable_execution_sdk_python.lambda_service import OperationPayload
13+
from aws_durable_execution_sdk_python.lambda_service import (
14+
ErrorObject,
15+
OperationPayload,
16+
)
1417
from aws_durable_execution_sdk_python.serdes import ExtendedTypeSerDes
1518

1619
from aws_durable_execution_sdk_python_testing.runner import (
@@ -122,6 +125,29 @@ def __exit__(self, exc_type, exc_val, exc_tb):
122125
return self._runner.__exit__(exc_type, exc_val, exc_tb)
123126
return None
124127

128+
def succeed_callback(self, callback_id: str, result: bytes) -> None:
129+
"""Send callback success response."""
130+
if self.mode == RunnerMode.LOCAL:
131+
self._runner.send_callback_success(callback_id=callback_id, result=result)
132+
else:
133+
logger.warning("Current runner does not support callback success")
134+
135+
def fail_callback(self, callback_id: str, error: Exception | None = None) -> None:
136+
"""Send callback failure response."""
137+
if self.mode == RunnerMode.LOCAL:
138+
error_obj = ErrorObject.from_exception(error) if error else None
139+
self._runner.send_callback_failure(callback_id=callback_id, error=error_obj)
140+
else:
141+
logger.warning("Current runner does not support callback failure")
142+
143+
def heartbeat_callback(self, callback_id: str) -> None:
144+
"""Send callback heartbeat to keep callback alive."""
145+
146+
if self.mode == RunnerMode.LOCAL:
147+
self._runner.send_callback_heartbeat(callback_id=callback_id)
148+
else:
149+
logger.warning("Current runner does not support callback heartbeat")
150+
125151

126152
@pytest.fixture
127153
def durable_runner(request):
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
"""Tests for callback failure example."""
2+
3+
from asyncio import sleep
4+
5+
import pytest
6+
from aws_durable_execution_sdk_python.execution import InvocationStatus
7+
8+
from wait_for_callback import wait_for_callback_failure
9+
from test.conftest import deserialize_operation_payload
10+
from wait_for_callback.external_system import ExternalSystem
11+
12+
13+
@pytest.mark.example
14+
@pytest.mark.durable_execution(
15+
handler=wait_for_callback_failure.handler,
16+
lambda_function_name="wait for callback failure",
17+
)
18+
def test_callback_failure(durable_runner):
19+
"""Test callback failure handling."""
20+
21+
with durable_runner:
22+
external_system = ExternalSystem()
23+
# Configure external system for local mode if needed
24+
if durable_runner.mode == "local":
25+
26+
def failure_handler(callback_id: str, error: Exception):
27+
sleep(0.5) # Simulate async work
28+
durable_runner.fail_callback(callback_id, str(error))
29+
30+
def success_handler(callback_id: str, msg: bytes):
31+
durable_runner.succeed_callback(callback_id, msg)
32+
33+
external_system.activate_local_mode(
34+
success_handler=success_handler, failure_handler=failure_handler
35+
)
36+
37+
result = durable_runner.run(input="test", timeout=10)
38+
external_system.shutdown()
39+
40+
# Should handle the failure gracefully
41+
assert result.status is InvocationStatus.SUCCEEDED
42+
assert result.result != "OK"
43+
assert result.result == '"Callback failed"'
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
"""Tests for callback heartbeat example."""
2+
3+
from asyncio import sleep
4+
5+
import pytest
6+
from aws_durable_execution_sdk_python.execution import InvocationStatus
7+
8+
from wait_for_callback import wait_for_callback_heartbeat
9+
from test.conftest import deserialize_operation_payload
10+
from wait_for_callback.external_system import ExternalSystem
11+
12+
13+
@pytest.mark.example
14+
@pytest.mark.durable_execution(
15+
handler=wait_for_callback_heartbeat.handler,
16+
lambda_function_name="wait for callback heartbeat",
17+
)
18+
def test_callback_heartbeat(durable_runner):
19+
"""Test callback heartbeat functionality."""
20+
21+
with durable_runner:
22+
external_system = ExternalSystem()
23+
# Configure external system for local mode if needed
24+
if durable_runner.mode == "local":
25+
26+
def heartbeat_handler(callback_id: str):
27+
sleep(0.1) # Simulate async work
28+
# durable_runner.heartbeat_callback(callback_id)
29+
30+
def success_handler(callback_id: str, msg: bytes):
31+
sleep(0.5)
32+
durable_runner.succeed_callback(callback_id, msg)
33+
34+
external_system.activate_local_mode(
35+
success_handler=success_handler, heartbeat_handler=heartbeat_handler
36+
)
37+
38+
result = durable_runner.run(input="test", timeout=30)
39+
external_system.shutdown()
40+
41+
assert result.status is InvocationStatus.SUCCEEDED
42+
assert deserialize_operation_payload(result.result) == "OK"

0 commit comments

Comments
 (0)