Skip to content

Commit be79e91

Browse files
rarepolzRares Polenciuc
andauthored
fix: callback timeouts and heartbeats (#144)
Co-authored-by: Rares Polenciuc <rarepolz@amazon.com>
1 parent 2cda53b commit be79e91

File tree

5 files changed

+74
-5
lines changed

5 files changed

+74
-5
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
"""Demonstrates waitForCallback timeout scenarios."""
2+
3+
from typing import Any
4+
5+
from aws_durable_execution_sdk_python.context import DurableContext
6+
from aws_durable_execution_sdk_python.execution import durable_execution
7+
from aws_durable_execution_sdk_python.config import Duration
8+
from aws_durable_execution_sdk_python.config import WaitForCallbackConfig
9+
10+
11+
@durable_execution
12+
def handler(_event: Any, context: DurableContext) -> dict[str, Any]:
13+
"""Handler demonstrating waitForCallback timeout."""
14+
15+
config = WaitForCallbackConfig(
16+
timeout=Duration.from_seconds(1), heartbeat_timeout=Duration.from_seconds(2)
17+
)
18+
19+
def submitter(_) -> None:
20+
"""Submitter succeeds but callback never completes."""
21+
return None
22+
23+
try:
24+
result: str = context.wait_for_callback(
25+
submitter,
26+
config=config,
27+
)
28+
return {
29+
"callbackResult": result,
30+
"success": True,
31+
}
32+
except Exception as error:
33+
return {
34+
"success": False,
35+
"error": str(error),
36+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
"""Tests for wait_for_callback_timeout."""
2+
3+
import pytest
4+
from aws_durable_execution_sdk_python.execution import InvocationStatus
5+
6+
from src.wait_for_callback import wait_for_callback_timeout
7+
from test.conftest import deserialize_operation_payload
8+
9+
10+
@pytest.mark.example
11+
@pytest.mark.durable_execution(
12+
handler=wait_for_callback_timeout.handler,
13+
lambda_function_name="Wait For Callback Timeout",
14+
)
15+
def test_handle_wait_for_callback_timeout_scenarios(durable_runner):
16+
"""Test waitForCallback timeout scenarios."""
17+
test_payload = {"test": "timeout-scenario"}
18+
19+
with durable_runner:
20+
execution_arn = durable_runner.run_async(input=test_payload, timeout=2)
21+
# Don't send callback - let it timeout
22+
result = durable_runner.wait_for_result(execution_arn=execution_arn)
23+
24+
# Handler catches the timeout error, so execution succeeds with error in result
25+
assert result.status is InvocationStatus.SUCCEEDED
26+
27+
result_data = deserialize_operation_payload(result.result)
28+
29+
assert result_data["success"] is False
30+
assert isinstance(result_data["error"], str)
31+
assert len(result_data["error"]) > 0
32+
assert "Callback timed out: Callback.Timeout" == result_data["error"]

src/aws_durable_execution_sdk_python_testing/executor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1147,9 +1147,9 @@ def _on_callback_timeout(self, execution_arn: str, callback_id: str) -> None:
11471147
f"Callback timed out: {CallbackTimeoutType.TIMEOUT.value}"
11481148
)
11491149
execution.complete_callback_timeout(callback_id, timeout_error)
1150-
execution.complete_fail(timeout_error)
11511150
self._store.update(execution)
11521151
logger.warning("[%s] Callback %s timed out", execution_arn, callback_id)
1152+
self._invoke_execution(callback_token.execution_arn)
11531153
except Exception:
11541154
logger.exception(
11551155
"[%s] Error processing callback timeout for %s",
@@ -1174,11 +1174,11 @@ def _on_callback_heartbeat_timeout(
11741174
f"Callback heartbeat timed out: {CallbackTimeoutType.HEARTBEAT.value}"
11751175
)
11761176
execution.complete_callback_timeout(callback_id, heartbeat_error)
1177-
execution.complete_fail(heartbeat_error)
11781177
self._store.update(execution)
11791178
logger.warning(
11801179
"[%s] Callback %s heartbeat timed out", execution_arn, callback_id
11811180
)
1181+
self._invoke_execution(callback_token.execution_arn)
11821182
except Exception:
11831183
logger.exception(
11841184
"[%s] Error processing callback heartbeat timeout for %s",

src/aws_durable_execution_sdk_python_testing/web/models.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,10 @@ def from_bytes(
121121
else:
122122
# Use standard JSON deserialization
123123
try:
124-
body_dict = json.loads(body_bytes.decode("utf-8"))
124+
if body_bytes == b"":
125+
body_dict = {}
126+
else:
127+
body_dict = json.loads(body_bytes.decode("utf-8"))
125128
logger.debug("Successfully deserialized request using standard JSON")
126129
except (json.JSONDecodeError, UnicodeDecodeError) as e:
127130
msg = f"JSON deserialization failed: {e}"

tests/executor_test.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2578,7 +2578,6 @@ def test_callback_timeout_handlers(executor, mock_store):
25782578
mock_execution.complete_callback_timeout.assert_called()
25792579
timeout_error = mock_execution.complete_callback_timeout.call_args[0][1]
25802580
assert "Callback timed out" in str(timeout_error.message)
2581-
mock_execution.complete_fail.assert_called()
25822581

25832582
# Reset mocks for heartbeat test
25842583
mock_execution.reset_mock()
@@ -2590,7 +2589,6 @@ def test_callback_timeout_handlers(executor, mock_store):
25902589
mock_execution.complete_callback_timeout.assert_called()
25912590
heartbeat_error = mock_execution.complete_callback_timeout.call_args[0][1]
25922591
assert "Callback heartbeat timed out" in str(heartbeat_error.message)
2593-
mock_execution.complete_fail.assert_called()
25942592

25952593

25962594
def test_callback_timeout_completed_execution(executor, mock_store):

0 commit comments

Comments
 (0)