Skip to content

Commit fea8882

Browse files
Alex Wangwangyb-A
authored andcommitted
feat: Implement send callback request for local runner
- Implement send callback request for local runner - Add wait for callback support for local runner - Add optional result and error to cloud runner send callback handler - Add wait for callback test cases
1 parent 10036b2 commit fea8882

File tree

8 files changed

+348
-86
lines changed

8 files changed

+348
-86
lines changed

examples/examples-catalog.json

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,21 @@
7979
"path": "./src/callback/callback.py"
8080
},
8181
{
82-
"name": "Wait for Callback",
82+
"name": "Wait for Callback Success",
8383
"description": "Usage of context.wait_for_callback() to wait for external system responses",
8484
"handler": "wait_for_callback.handler",
85-
"integration": false,
85+
"integration": true,
86+
"durableConfig": {
87+
"RetentionPeriodInDays": 7,
88+
"ExecutionTimeout": 300
89+
},
90+
"path": "./src/wait_for_callback/wait_for_callback.py"
91+
},
92+
{
93+
"name": "Wait for Callback Failure",
94+
"description": "Usage of context.wait_for_callback() to wait for external system responses",
95+
"handler": "wait_for_callback.handler",
96+
"integration": true,
8697
"durableConfig": {
8798
"RetentionPeriodInDays": 7,
8899
"ExecutionTimeout": 300

examples/src/hello_world.py

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,62 @@
1-
from typing import Any
1+
"""Simple durable Lambda handler example.
22
3-
from aws_durable_execution_sdk_python.context import DurableContext
3+
This example demonstrates:
4+
- Step execution with logging
5+
- Wait operations (pausing without consuming resources)
6+
- Replay-aware logging
7+
- Returning a response
8+
"""
9+
10+
from __future__ import annotations
11+
12+
from typing import TYPE_CHECKING, Any
13+
14+
from aws_durable_execution_sdk_python.config import Duration
15+
from aws_durable_execution_sdk_python.context import DurableContext, durable_step
416
from aws_durable_execution_sdk_python.execution import durable_execution
517

18+
if TYPE_CHECKING:
19+
from aws_durable_execution_sdk_python.types import StepContext
20+
21+
22+
@durable_step
23+
def step_1(step_context: StepContext) -> None:
24+
"""First step that logs a message."""
25+
step_context.logger.info("Hello from step1")
26+
27+
28+
@durable_step
29+
def step_2(step_context: StepContext, status_code: int) -> str:
30+
"""Second step that returns a message."""
31+
step_context.logger.info("Returning message with status code: %d", status_code)
32+
return f"Hello from Durable Lambda! (status: {status_code})"
33+
634

735
@durable_execution
8-
def handler(_event: Any, _context: DurableContext) -> str:
9-
"""Simple hello world durable function."""
10-
return "Hello World!"
36+
def handler(event: Any, context: DurableContext) -> dict[str, Any]:
37+
"""Durable Lambda handler with steps, waits, and logging.
38+
39+
Args:
40+
event: Lambda event input
41+
context: Durable execution context
42+
43+
Returns:
44+
Response dictionary with statusCode and body
45+
"""
46+
# Execute Step #1 - logs a message
47+
context.step(step_1())
48+
49+
# Pause for 10 seconds without consuming CPU cycles or incurring usage charges
50+
# The execution will suspend here and resume after 10 seconds
51+
context.wait(Duration.from_seconds(10))
52+
53+
context.logger.info("Waited for 10 seconds")
54+
55+
# Execute Step #2 - returns a message with status code
56+
message = context.step(step_2(status_code=200))
57+
58+
# Return response
59+
return {
60+
"statusCode": 200,
61+
"body": message,
62+
}

examples/test/conftest.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@
1010
from typing import Any
1111

1212
import pytest
13-
from aws_durable_execution_sdk_python.lambda_service import OperationPayload
13+
from aws_durable_execution_sdk_python.lambda_service import (
14+
ErrorObject,
15+
OperationPayload,
16+
)
1417
from aws_durable_execution_sdk_python.serdes import ExtendedTypeSerDes
1518

1619
from aws_durable_execution_sdk_python_testing.runner import (
@@ -112,11 +115,15 @@ def run_async(
112115
) -> str:
113116
return self._runner.run_async(input=input, timeout=timeout)
114117

115-
def send_callback_success(self, callback_id: str) -> None:
116-
self._runner.send_callback_success(callback_id=callback_id)
118+
def send_callback_success(
119+
self, callback_id: str, result: bytes | None = None
120+
) -> None:
121+
self._runner.send_callback_success(callback_id=callback_id, result=result)
117122

118-
def send_callback_failure(self, callback_id: str) -> None:
119-
self._runner.send_callback_failure(callback_id=callback_id)
123+
def send_callback_failure(
124+
self, callback_id: str, error: ErrorObject | None = None
125+
) -> None:
126+
self._runner.send_callback_failure(callback_id=callback_id, error=error)
120127

121128
def send_callback_heartbeat(self, callback_id: str) -> None:
122129
self._runner.send_callback_heartbeat(callback_id=callback_id)

examples/test/test_hello_world.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515
def test_hello_world(durable_runner):
1616
"""Test hello world example."""
1717
with durable_runner:
18-
result = durable_runner.run(input="test", timeout=10)
18+
result = durable_runner.run(input="test", timeout=30)
1919

2020
assert result.status is InvocationStatus.SUCCEEDED
21-
assert deserialize_operation_payload(result.result) == "Hello World!"
21+
assert deserialize_operation_payload(result.result) == {
22+
"statusCode": 200,
23+
"body": "Hello from Durable Lambda! (status: 200)",
24+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import pytest
2+
from aws_durable_execution_sdk_python.execution import InvocationStatus
3+
from aws_durable_execution_sdk_python.lambda_service import ErrorObject
4+
5+
from src.wait_for_callback import wait_for_callback
6+
7+
8+
@pytest.mark.example
9+
@pytest.mark.durable_execution(
10+
handler=wait_for_callback.handler,
11+
lambda_function_name="Wait For Callback Failure",
12+
)
13+
def test_wait_for_callback_failure(durable_runner):
14+
with durable_runner:
15+
execution_arn = durable_runner.run_async(input="test", timeout=30)
16+
callback_id = durable_runner.wait_for_callback(execution_arn=execution_arn)
17+
durable_runner.send_callback_failure(
18+
callback_id=callback_id, error=ErrorObject.from_message("my callback error")
19+
)
20+
result = durable_runner.wait_for_result(execution_arn=execution_arn)
21+
22+
assert result.status is InvocationStatus.FAILED
23+
assert isinstance(result.error, ErrorObject)
24+
assert result.error.to_dict() == {
25+
"ErrorMessage": "my callback error",
26+
"ErrorType": "CallableRuntimeError",
27+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import pytest
2+
from aws_durable_execution_sdk_python.execution import InvocationStatus
3+
4+
from src.wait_for_callback import wait_for_callback
5+
from test.conftest import deserialize_operation_payload
6+
7+
8+
@pytest.mark.example
9+
@pytest.mark.durable_execution(
10+
handler=wait_for_callback.handler,
11+
lambda_function_name="Wait For Callback Success",
12+
)
13+
def test_wait_for_callback_success(durable_runner):
14+
with durable_runner:
15+
execution_arn = durable_runner.run_async(input="test", timeout=30)
16+
callback_id = durable_runner.wait_for_callback(execution_arn=execution_arn)
17+
durable_runner.send_callback_success(
18+
callback_id=callback_id, result="callback success".encode()
19+
)
20+
result = durable_runner.wait_for_result(execution_arn=execution_arn)
21+
assert result.status is InvocationStatus.SUCCEEDED
22+
assert (
23+
deserialize_operation_payload(result.result)
24+
== "External system result: callback success"
25+
)

0 commit comments

Comments
 (0)