From 773c89a22e9e28155189da30b84db64fb264d095 Mon Sep 17 00:00:00 2001 From: Alex Wang Date: Fri, 14 Nov 2025 10:06:08 -0800 Subject: [PATCH] feat: Implement send callback request for local runner - Implement send callback request for local runner - Add wait for callback support for local runner - Add optional result and error to cloud runner send callback handler - Add wait for callback test cases --- examples/examples-catalog.json | 15 +- examples/src/hello_world.py | 62 ++++- examples/test/conftest.py | 17 +- examples/test/test_hello_world.py | 7 +- .../test_wait_for_callback_failure.py | 27 +++ .../test_wait_for_callback_success.py | 25 +++ .../runner.py | 212 ++++++++++++------ tests/runner_test.py | 69 +++++- 8 files changed, 348 insertions(+), 86 deletions(-) create mode 100644 examples/test/wait_for_callback/test_wait_for_callback_failure.py create mode 100644 examples/test/wait_for_callback/test_wait_for_callback_success.py diff --git a/examples/examples-catalog.json b/examples/examples-catalog.json index a61ab52..a8ad760 100644 --- a/examples/examples-catalog.json +++ b/examples/examples-catalog.json @@ -79,10 +79,21 @@ "path": "./src/callback/callback.py" }, { - "name": "Wait for Callback", + "name": "Wait for Callback Success", "description": "Usage of context.wait_for_callback() to wait for external system responses", "handler": "wait_for_callback.handler", - "integration": false, + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/wait_for_callback/wait_for_callback.py" + }, + { + "name": "Wait for Callback Failure", + "description": "Usage of context.wait_for_callback() to wait for external system responses", + "handler": "wait_for_callback.handler", + "integration": true, "durableConfig": { "RetentionPeriodInDays": 7, "ExecutionTimeout": 300 diff --git a/examples/src/hello_world.py b/examples/src/hello_world.py index c8bdd66..d3e4905 100644 --- a/examples/src/hello_world.py +++ b/examples/src/hello_world.py @@ -1,10 +1,62 @@ -from typing import Any +"""Simple durable Lambda handler example. -from aws_durable_execution_sdk_python.context import DurableContext +This example demonstrates: +- Step execution with logging +- Wait operations (pausing without consuming resources) +- Replay-aware logging +- Returning a response +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from aws_durable_execution_sdk_python.config import Duration +from aws_durable_execution_sdk_python.context import DurableContext, durable_step from aws_durable_execution_sdk_python.execution import durable_execution +if TYPE_CHECKING: + from aws_durable_execution_sdk_python.types import StepContext + + +@durable_step +def step_1(step_context: StepContext) -> None: + """First step that logs a message.""" + step_context.logger.info("Hello from step1") + + +@durable_step +def step_2(step_context: StepContext, status_code: int) -> str: + """Second step that returns a message.""" + step_context.logger.info("Returning message with status code: %d", status_code) + return f"Hello from Durable Lambda! (status: {status_code})" + @durable_execution -def handler(_event: Any, _context: DurableContext) -> str: - """Simple hello world durable function.""" - return "Hello World!" +def handler(event: Any, context: DurableContext) -> dict[str, Any]: + """Durable Lambda handler with steps, waits, and logging. + + Args: + event: Lambda event input + context: Durable execution context + + Returns: + Response dictionary with statusCode and body + """ + # Execute Step #1 - logs a message + context.step(step_1()) + + # Pause for 10 seconds without consuming CPU cycles or incurring usage charges + # The execution will suspend here and resume after 10 seconds + context.wait(Duration.from_seconds(10)) + + context.logger.info("Waited for 10 seconds") + + # Execute Step #2 - returns a message with status code + message = context.step(step_2(status_code=200)) + + # Return response + return { + "statusCode": 200, + "body": message, + } diff --git a/examples/test/conftest.py b/examples/test/conftest.py index 5339c0c..679ba48 100644 --- a/examples/test/conftest.py +++ b/examples/test/conftest.py @@ -10,7 +10,10 @@ from typing import Any import pytest -from aws_durable_execution_sdk_python.lambda_service import OperationPayload +from aws_durable_execution_sdk_python.lambda_service import ( + ErrorObject, + OperationPayload, +) from aws_durable_execution_sdk_python.serdes import ExtendedTypeSerDes from aws_durable_execution_sdk_python_testing.runner import ( @@ -112,11 +115,15 @@ def run_async( ) -> str: return self._runner.run_async(input=input, timeout=timeout) - def send_callback_success(self, callback_id: str) -> None: - self._runner.send_callback_success(callback_id=callback_id) + def send_callback_success( + self, callback_id: str, result: bytes | None = None + ) -> None: + self._runner.send_callback_success(callback_id=callback_id, result=result) - def send_callback_failure(self, callback_id: str) -> None: - self._runner.send_callback_failure(callback_id=callback_id) + def send_callback_failure( + self, callback_id: str, error: ErrorObject | None = None + ) -> None: + self._runner.send_callback_failure(callback_id=callback_id, error=error) def send_callback_heartbeat(self, callback_id: str) -> None: self._runner.send_callback_heartbeat(callback_id=callback_id) diff --git a/examples/test/test_hello_world.py b/examples/test/test_hello_world.py index c87b8cc..f0a5446 100644 --- a/examples/test/test_hello_world.py +++ b/examples/test/test_hello_world.py @@ -15,7 +15,10 @@ def test_hello_world(durable_runner): """Test hello world example.""" with durable_runner: - result = durable_runner.run(input="test", timeout=10) + result = durable_runner.run(input="test", timeout=30) assert result.status is InvocationStatus.SUCCEEDED - assert deserialize_operation_payload(result.result) == "Hello World!" + assert deserialize_operation_payload(result.result) == { + "statusCode": 200, + "body": "Hello from Durable Lambda! (status: 200)", + } diff --git a/examples/test/wait_for_callback/test_wait_for_callback_failure.py b/examples/test/wait_for_callback/test_wait_for_callback_failure.py new file mode 100644 index 0000000..ac8d52f --- /dev/null +++ b/examples/test/wait_for_callback/test_wait_for_callback_failure.py @@ -0,0 +1,27 @@ +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus +from aws_durable_execution_sdk_python.lambda_service import ErrorObject + +from src.wait_for_callback import wait_for_callback + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=wait_for_callback.handler, + lambda_function_name="Wait For Callback Failure", +) +def test_wait_for_callback_failure(durable_runner): + with durable_runner: + execution_arn = durable_runner.run_async(input="test", timeout=30) + callback_id = durable_runner.wait_for_callback(execution_arn=execution_arn) + durable_runner.send_callback_failure( + callback_id=callback_id, error=ErrorObject.from_message("my callback error") + ) + result = durable_runner.wait_for_result(execution_arn=execution_arn) + + assert result.status is InvocationStatus.FAILED + assert isinstance(result.error, ErrorObject) + assert result.error.to_dict() == { + "ErrorMessage": "my callback error", + "ErrorType": "CallableRuntimeError", + } diff --git a/examples/test/wait_for_callback/test_wait_for_callback_success.py b/examples/test/wait_for_callback/test_wait_for_callback_success.py new file mode 100644 index 0000000..67217a2 --- /dev/null +++ b/examples/test/wait_for_callback/test_wait_for_callback_success.py @@ -0,0 +1,25 @@ +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus + +from src.wait_for_callback import wait_for_callback +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=wait_for_callback.handler, + lambda_function_name="Wait For Callback Success", +) +def test_wait_for_callback_success(durable_runner): + with durable_runner: + execution_arn = durable_runner.run_async(input="test", timeout=30) + callback_id = durable_runner.wait_for_callback(execution_arn=execution_arn) + durable_runner.send_callback_success( + callback_id=callback_id, result="callback success".encode() + ) + result = durable_runner.wait_for_result(execution_arn=execution_arn) + assert result.status is InvocationStatus.SUCCEEDED + assert ( + deserialize_operation_payload(result.result) + == "External system result: callback success" + ) diff --git a/src/aws_durable_execution_sdk_python_testing/runner.py b/src/aws_durable_execution_sdk_python_testing/runner.py index 866f50d..9dfc8e2 100644 --- a/src/aws_durable_execution_sdk_python_testing/runner.py +++ b/src/aws_durable_execution_sdk_python_testing/runner.py @@ -40,6 +40,7 @@ DurableFunctionsLocalRunnerError, DurableFunctionsTestError, InvalidParameterValueException, + ResourceNotFoundException, ) from aws_durable_execution_sdk_python_testing.executor import Executor from aws_durable_execution_sdk_python_testing.invoker import ( @@ -395,6 +396,69 @@ def create_operation( return operation_class.from_svc_operation(svc_operation, all_operations) +def _get_callback_id_from_events( + events: list[Event], name: str | None = None +) -> str | None: + """ + Get callback ID from execution history for callbacks that haven't completed. + + Args: + execution_arn: The ARN of the execution to query. + name: Optional callback name to search for. If not provided, returns the latest callback. + + Returns: + The callback ID string for a non-completed callback, or None if not found. + + Raises: + DurableFunctionsTestError: If the named callback has already succeeded/failed/timed out. + """ + callback_started_events = [ + event for event in events if event.event_type == "CallbackStarted" + ] + + if not callback_started_events: + return None + + completed_callback_ids = { + event.event_id + for event in events + if event.event_type + in ["CallbackSucceeded", "CallbackFailed", "CallbackTimedOut"] + } + + if name is not None: + for event in callback_started_events: + if event.name == name: + callback_id = event.event_id + if callback_id in completed_callback_ids: + raise DurableFunctionsTestError( + f"Callback {name} has already completed (succeeded/failed/timed out)" + ) + return ( + event.callback_started_details.callback_id + if event.callback_started_details + else None + ) + return None + + # If name is not provided, find the latest non-completed callback event + active_callbacks = [ + event + for event in callback_started_events + if event.event_id not in completed_callback_ids + ] + + if not active_callbacks: + return None + + latest_event = active_callbacks[-1] + return ( + latest_event.callback_started_details.callback_id + if latest_event.callback_started_details + else None + ) + + @dataclass(frozen=True) class DurableFunctionTestResult: status: InvocationStatus @@ -493,10 +557,11 @@ def get_execution(self, name: str) -> ExecutionOperation: class DurableFunctionTestRunner: - def __init__(self, handler: Callable): + def __init__(self, handler: Callable, poll_interval: float = 1.0): self._scheduler: Scheduler = Scheduler() self._scheduler.start() self._store = InMemoryExecutionStore() + self.poll_interval = poll_interval self._checkpoint_processor = CheckpointProcessor( store=self._store, scheduler=self._scheduler ) @@ -529,6 +594,37 @@ def run( execution_name: str = "execution-name", account_id: str = "123456789012", ) -> DurableFunctionTestResult: + execution_arn = self.run_async( + input=input, + timeout=timeout, + function_name=function_name, + execution_name=execution_name, + account_id=account_id, + ) + + return self.wait_for_result(execution_arn=execution_arn, timeout=timeout) + + def send_callback_success( + self, callback_id: str, result: bytes | None = None + ) -> None: + self._executor.send_callback_success(callback_id=callback_id, result=result) + + def send_callback_failure( + self, callback_id: str, error: ErrorObject | None = None + ) -> None: + self._executor.send_callback_failure(callback_id=callback_id, error=error) + + def send_callback_heartbeat(self, callback_id: str) -> None: + self._executor.send_callback_heartbeat(callback_id=callback_id) + + def run_async( + self, + input: str | None = None, # noqa: A002 + timeout: int = 900, + function_name: str = "test-function", + execution_name: str = "execution-name", + account_id: str = "123456789012", + ) -> str: start_input = StartDurableExecutionInput( account_id=account_id, function_name=function_name, @@ -549,18 +645,52 @@ def run( if output.execution_arn is None: msg_arn: str = "Execution ARN must exist to run test." raise DurableFunctionsTestError(msg_arn) + return output.execution_arn + def wait_for_result( + self, execution_arn: str, timeout: int = 60 + ) -> DurableFunctionTestResult: # Block until completion - completed = self._executor.wait_until_complete(output.execution_arn, timeout) + completed = self._executor.wait_until_complete(execution_arn, timeout) if not completed: msg_timeout: str = "Execution did not complete within timeout" raise TimeoutError(msg_timeout) - execution: Execution = self._store.load(output.execution_arn) + execution: Execution = self._store.load(execution_arn) return DurableFunctionTestResult.create(execution=execution) + def wait_for_callback( + self, execution_arn: str, name: str | None = None, timeout: int = 60 + ) -> str: + start_time = time.time() + + while time.time() - start_time < timeout: + try: + history_response = self._executor.get_execution_history(execution_arn) + callback_id = _get_callback_id_from_events( + events=history_response.events, name=name + ) + if callback_id: + return callback_id + except ResourceNotFoundException as e: + pass + except Exception as e: + msg = f"Failed to fetch execution history: {e}" + raise DurableFunctionsTestError(msg) from e + + # Wait before next poll + time.sleep(self.poll_interval) + + # Timeout reached + elapsed = time.time() - start_time + msg = ( + f"Callback did not available within {timeout}s " + f"(elapsed: {elapsed:.1f}s." + ) + raise TimeoutError(msg) + class DurableChildContextTestRunner(DurableFunctionTestRunner): """Test a durable block, annotated with @durable_with_child_context, in isolation.""" @@ -853,81 +983,23 @@ def run_async( return response.get("DurableExecutionArn") - def _get_callback_id_from_events( - self, events: list[Event], name: str | None = None - ) -> str | None: - """ - Get callback ID from execution history for callbacks that haven't completed. - - Args: - execution_arn: The ARN of the execution to query. - name: Optional callback name to search for. If not provided, returns the latest callback. - - Returns: - The callback ID string for a non-completed callback, or None if not found. - - Raises: - DurableFunctionsTestError: If the named callback has already succeeded/failed/timed out. - """ - callback_started_events = [ - event for event in events if event.event_type == "CallbackStarted" - ] - - if not callback_started_events: - return None - - completed_callback_ids = { - event.event_id - for event in events - if event.event_type - in ["CallbackSucceeded", "CallbackFailed", "CallbackTimedOut"] - } - - if name is not None: - for event in callback_started_events: - if event.name == name: - callback_id = event.event_id - if callback_id in completed_callback_ids: - raise DurableFunctionsTestError( - f"Callback {name} has already completed (succeeded/failed/timed out)" - ) - return ( - event.callback_started_details.callback_id - if event.callback_started_details - else None - ) - return None - - # If name is not provided, find the latest non-completed callback event - active_callbacks = [ - event - for event in callback_started_events - if event.event_id not in completed_callback_ids - ] - - if not active_callbacks: - return None - - latest_event = active_callbacks[-1] - return ( - latest_event.callback_started_details.callback_id - if latest_event.callback_started_details - else None - ) - - def send_callback_success(self, callback_id: str) -> None: + def send_callback_success( + self, callback_id: str, result: bytes | None = None + ) -> None: try: self.lambda_client.send_durable_execution_callback_success( - CallbackId=callback_id + CallbackId=callback_id, Result=result ) except Exception as e: msg = f"Failed to send callback success for {self.function_name}, callback_id {callback_id}: {e}" raise DurableFunctionsTestError(msg) from e - def send_callback_failure(self, callback_id: str) -> None: + def send_callback_failure( + self, callback_id: str, error: ErrorObject | None = None + ) -> None: try: self.lambda_client.send_durable_execution_callback_failure( - CallbackId=callback_id + CallbackId=callback_id, Error=error.to_dict() if error else None ) except Exception as e: msg = f"Failed to send callback failure for {self.function_name}, callback_id {callback_id}: {e}" @@ -1042,7 +1114,7 @@ def wait_for_callback( while time.time() - start_time < timeout: try: history_response = self._fetch_execution_history(execution_arn) - callback_id = self._get_callback_id_from_events( + callback_id = _get_callback_id_from_events( events=history_response.events, name=name ) if callback_id: diff --git a/tests/runner_test.py b/tests/runner_test.py index f34a76b..3b81269 100644 --- a/tests/runner_test.py +++ b/tests/runner_test.py @@ -21,11 +21,13 @@ from aws_durable_execution_sdk_python_testing.exceptions import ( DurableFunctionsTestError, InvalidParameterValueException, + ResourceNotFoundException, ) from aws_durable_execution_sdk_python_testing.execution import Execution from aws_durable_execution_sdk_python_testing.model import ( StartDurableExecutionInput, StartDurableExecutionOutput, + GetDurableExecutionHistoryResponse, ) from aws_durable_execution_sdk_python_testing.runner import ( OPERATION_FACTORIES, @@ -1708,7 +1710,7 @@ def test_cloud_runner_send_callback_success(mock_boto3): runner.send_callback_success("callback-123") mock_client.send_durable_execution_callback_success.assert_called_once_with( - CallbackId="callback-123" + CallbackId="callback-123", Result=None ) @@ -1726,7 +1728,7 @@ def test_cloud_runner_send_callback_failure(mock_boto3): runner.send_callback_failure("callback-123") mock_client.send_durable_execution_callback_failure.assert_called_once_with( - CallbackId="callback-123" + CallbackId="callback-123", Error=None ) @@ -1897,6 +1899,69 @@ def test_cloud_runner_wait_for_callback_all_done_without_name(mock_boto3): runner.wait_for_callback("test-arn", timeout=2) +@patch("aws_durable_execution_sdk_python_testing.runner.Executor") +def test_local_runner_wait_for_callback_all_done_without_name(mock_executor_class): + """Test DurableFunctionCloudTestRunner.wait_for_callback all_done_without_name.""" + handler = Mock() + mock_executor = Mock() + mock_executor_class.return_value = mock_executor + mock_executor.get_execution_history.return_value = ( + GetDurableExecutionHistoryResponse.from_dict( + { + "Events": [ + { + "EventType": "CallbackStarted", + "EventTimestamp": "2023-01-01T00:00:00Z", + "Id": "callback-event-1", + "Name": "test-callback", + "CallbackStartedDetails": {"CallbackId": "callback-123"}, + }, + { + "EventType": "CallbackSucceeded", + "EventTimestamp": "2023-01-01T00:05:00Z", + "Id": "callback-event-1", + "Name": "test-callback", + }, + ] + } + ) + ) + + runner = DurableFunctionTestRunner(handler) + with pytest.raises(TimeoutError, match="Callback did not available within"): + runner.wait_for_callback("test-arn", timeout=2) + + +@patch("aws_durable_execution_sdk_python_testing.runner.Executor") +def test_local_runner_wait_for_callback_with_exception(mock_executor_class): + """Test DurableFunctionCloudTestRunner.wait_for_callback with exception""" + handler = Mock() + mock_executor = Mock() + mock_executor_class.return_value = mock_executor + mock_executor.get_execution_history.side_effect = Exception("error") + + runner = DurableFunctionTestRunner(handler) + with pytest.raises( + DurableFunctionsTestError, match="Failed to fetch execution history" + ): + runner.wait_for_callback("test-arn", timeout=10) + + +@patch("aws_durable_execution_sdk_python_testing.runner.Executor") +def test_local_runner_wait_for_callback_with_resource_not_found_exception( + mock_executor_class, +): + """Test DurableFunctionCloudTestRunner.wait_for_callback with resource_not_found exception""" + handler = Mock() + mock_executor = Mock() + mock_executor_class.return_value = mock_executor + mock_executor.get_execution_history.side_effect = ResourceNotFoundException("error") + + runner = DurableFunctionTestRunner(handler) + with pytest.raises(TimeoutError, match="Callback did not available within"): + runner.wait_for_callback("test-arn", timeout=2) + + @patch("aws_durable_execution_sdk_python_testing.runner.boto3") @patch("aws_durable_execution_sdk_python_testing.runner.time") def test_cloud_runner_wait_for_callback_timeout(mock_time, mock_boto3):