From cd377f792d174d8da8746ace655b8a78625078db Mon Sep 17 00:00:00 2001 From: Rares Polenciuc Date: Thu, 16 Oct 2025 16:28:43 +0100 Subject: [PATCH] feat: add execution history event generation and pagination - Add event_conversion module to transform operations into history events - Implement operation_to_started_event and operation_to_finished_event - Add generate_execution_events for complete execution history - Update executor get_execution_history with proper cursor-based pagination - Support reverse_order and include_execution_data options - Add comprehensive test coverage for event conversion and pagination --- .../executor.py | 130 +- .../model.py | 958 +++++++- tests/checkpoint/processors/base_test.py | 8 +- tests/event_factory_test.py | 2002 +++++++++++++++++ tests/executor_test.py | 134 ++ tests/model_test.py | 139 +- 6 files changed, 3213 insertions(+), 158 deletions(-) create mode 100644 tests/event_factory_test.py diff --git a/src/aws_durable_execution_sdk_python_testing/executor.py b/src/aws_durable_execution_sdk_python_testing/executor.py index 76d614d..6f07ebe 100644 --- a/src/aws_durable_execution_sdk_python_testing/executor.py +++ b/src/aws_durable_execution_sdk_python_testing/executor.py @@ -12,7 +12,13 @@ DurableExecutionInvocationOutput, InvocationStatus, ) -from aws_durable_execution_sdk_python.lambda_service import ErrorObject, OperationUpdate +from aws_durable_execution_sdk_python.lambda_service import ( + ErrorObject, + Operation, + OperationUpdate, + OperationStatus, + OperationType, +) from aws_durable_execution_sdk_python_testing.exceptions import ( ExecutionAlreadyStartedException, @@ -35,6 +41,8 @@ StartDurableExecutionInput, StartDurableExecutionOutput, StopDurableExecutionResponse, + TERMINAL_STATUSES, + EventCreationContext, ) from aws_durable_execution_sdk_python_testing.model import ( Event as HistoryEvent, @@ -59,8 +67,8 @@ class Executor(ExecutionObserver): - MAX_CONSECUTIVE_FAILED_ATTEMPTS = 5 - RETRY_BACKOFF_SECONDS = 5 + MAX_CONSECUTIVE_FAILED_ATTEMPTS: int = 5 + RETRY_BACKOFF_SECONDS: int = 5 def __init__( self, @@ -420,20 +428,18 @@ def get_execution_state( def get_execution_history( self, execution_arn: str, - include_execution_data: bool = False, # noqa: FBT001, FBT002, ARG002 - reverse_order: bool = False, # noqa: FBT001, FBT002, ARG002 + include_execution_data: bool = False, # noqa: FBT001, FBT002 + reverse_order: bool = False, # noqa: FBT001, FBT002 marker: str | None = None, max_items: int | None = None, ) -> GetDurableExecutionHistoryResponse: """Get execution history with events. - TODO: incomplete - Args: execution_arn: The execution ARN include_execution_data: Whether to include execution data in events reverse_order: Return events in reverse chronological order - marker: Pagination marker + marker: Pagination marker (event_id) max_items: Maximum items to return Returns: @@ -442,30 +448,110 @@ def get_execution_history( Raises: ResourceNotFoundException: If execution does not exist """ - execution = self.get_execution(execution_arn) # noqa: F841 + execution: Execution = self.get_execution(execution_arn) + + # Generate events + all_events: list[HistoryEvent] = [] + event_id: int = 1 + ops: list[Operation] = execution.operations + updates: list[OperationUpdate] = execution.updates + updates_dict: dict[str, OperationUpdate] = {u.operation_id: u for u in updates} + durable_execution_arn: str = execution.durable_execution_arn + for op in ops: + # Step Operation can have PENDING status -> not included in History + operation_update: OperationUpdate | None = updates_dict.get( + op.operation_id, None + ) - # Convert operations to events - # This is a simplified implementation - real implementation would need - # to generate proper event history from operations - events: list[HistoryEvent] = [] + if op.status is OperationStatus.PENDING: + if ( + op.operation_type is not OperationType.CHAINED_INVOKE + or op.start_timestamp is None + ): + continue + context: EventCreationContext = EventCreationContext( + op, + event_id, + durable_execution_arn, + execution.start_input, + execution.result, + operation_update, + include_execution_data, + ) + pending = HistoryEvent.create_chained_invoke_event_pending(context) + all_events.append(pending) + event_id += 1 + if op.start_timestamp is not None: + context = EventCreationContext( + op, + event_id, + durable_execution_arn, + execution.start_input, + execution.result, + operation_update, + include_execution_data, + ) + started = HistoryEvent.create_event_started(context) + all_events.append(started) + event_id += 1 + if op.end_timestamp is not None and op.status in TERMINAL_STATUSES: + context = EventCreationContext( + op, + event_id, + durable_execution_arn, + execution.start_input, + execution.result, + operation_update, + include_execution_data, + ) + finished = HistoryEvent.create_event_terminated(context) + all_events.append(finished) + event_id += 1 - # Apply pagination + # Apply cursor-based pagination if max_items is None: max_items = 100 - start_index = 0 + # Handle pagination marker + if reverse_order: + all_events.reverse() + start_index: int = 0 if marker: try: - start_index = int(marker) + marker_event_id: int = int(marker) + # Find the index of the first event with event_id >= marker + start_index = len(all_events) + for i, e in enumerate(all_events): + is_valid_page_start: bool = ( + e.event_id < marker_event_id + if reverse_order + else e.event_id >= marker_event_id + ) + if is_valid_page_start: + start_index = i + break except ValueError: start_index = 0 - end_index = start_index + max_items - paginated_events = events[start_index:end_index] - - next_marker = None - if end_index < len(events): - next_marker = str(end_index) + # Get paginated events + end_index: int = start_index + max_items + paginated_events: list[HistoryEvent] = all_events[start_index:end_index] + + # Generate next marker + next_marker: str | None = None + if end_index < len(all_events): + if reverse_order: + # Next marker is the event_id of the last returned event + next_marker = ( + str(paginated_events[-1].event_id) if paginated_events else None + ) + else: + # Next marker is the event_id of the next event after the last returned + next_marker = ( + str(all_events[end_index].event_id) + if end_index < len(all_events) + else None + ) return GetDurableExecutionHistoryResponse( events=paginated_events, next_marker=next_marker diff --git a/src/aws_durable_execution_sdk_python_testing/model.py b/src/aws_durable_execution_sdk_python_testing/model.py index 2abb5d2..469a0d3 100644 --- a/src/aws_durable_execution_sdk_python_testing/model.py +++ b/src/aws_durable_execution_sdk_python_testing/model.py @@ -4,8 +4,13 @@ import datetime from dataclasses import dataclass, replace +from enum import Enum from typing import Any +from dateutil.tz import UTC + +from aws_durable_execution_sdk_python.execution import DurableExecutionInvocationOutput + # Import existing types from the main SDK - REUSE EVERYTHING POSSIBLE from aws_durable_execution_sdk_python.lambda_service import ( CallbackDetails, @@ -36,6 +41,43 @@ ) +class EventType(Enum): + """Event types for durable execution events.""" + + EXECUTION_STARTED = "ExecutionStarted" + EXECUTION_SUCCEEDED = "ExecutionSucceeded" + EXECUTION_FAILED = "ExecutionFailed" + EXECUTION_TIMED_OUT = "ExecutionTimedOut" + EXECUTION_STOPPED = "ExecutionStopped" + CONTEXT_STARTED = "ContextStarted" + CONTEXT_SUCCEEDED = "ContextSucceeded" + CONTEXT_FAILED = "ContextFailed" + WAIT_STARTED = "WaitStarted" + WAIT_SUCCEEDED = "WaitSucceeded" + WAIT_CANCELLED = "WaitCancelled" + STEP_STARTED = "StepStarted" + STEP_SUCCEEDED = "StepSucceeded" + STEP_FAILED = "StepFailed" + CHAINED_INVOKE_STARTED = "ChainedInvokeStarted" + CHAINED_INVOKE_SUCCEEDED = "ChainedInvokeSucceeded" + CHAINED_INVOKE_FAILED = "ChainedInvokeFailed" + CHAINED_INVOKE_TIMED_OUT = "ChainedInvokeTimedOut" + CHAINED_INVOKE_STOPPED = "ChainedInvokeStopped" + CALLBACK_STARTED = "CallbackStarted" + CALLBACK_SUCCEEDED = "CallbackSucceeded" + CALLBACK_FAILED = "CallbackFailed" + CALLBACK_TIMED_OUT = "CallbackTimedOut" + + +TERMINAL_STATUSES: set[OperationStatus] = { + OperationStatus.SUCCEEDED, + OperationStatus.FAILED, + OperationStatus.TIMED_OUT, + OperationStatus.STOPPED, + OperationStatus.CANCELLED, +} + + @dataclass(frozen=True) class LambdaContext(LambdaContextProtocol): """Lambda context for testing.""" @@ -58,6 +100,7 @@ def log(self, msg) -> None: pass # No-op for testing +# region web_api_models # Web API specific models (not in Smithy but needed for web interface) @dataclass(frozen=True) class StartDurableExecutionInput: @@ -141,6 +184,10 @@ def to_dict(self) -> dict[str, Any]: return result +# endregion web_api_models + + +# region smithy_api_models # Smithy-based API models @dataclass(frozen=True) class GetDurableExecutionRequest: @@ -424,6 +471,10 @@ def to_dict(self) -> dict[str, Any]: return result +# endregion smithy_api_models + + +# region event_structures # Event-related structures from Smithy model @dataclass(frozen=True) class EventInput: @@ -445,6 +496,27 @@ def to_dict(self) -> dict[str, Any]: result["Payload"] = self.payload return result + @classmethod + def from_details( + cls, + details: ExecutionDetails, + include: bool = False, # noqa: FBT001, FBT002 + ) -> EventInput: + details_input: str | None = details.input_payload if details else None + payload: str | None = details_input if include else None + truncated: bool = not include + return cls(payload=payload, truncated=truncated) + + @classmethod + def from_start_durable_execution_input( + cls, + start_durable_execution_input: StartDurableExecutionInput, + include: bool = False, # noqa: FBT001, FBT002 + ) -> EventInput: + input: str | None = start_durable_execution_input.input + truncated: bool = not include + return cls(input, truncated) + @dataclass(frozen=True) class EventResult: @@ -466,6 +538,26 @@ def to_dict(self) -> dict[str, Any]: result["Payload"] = self.payload return result + @classmethod + def from_details( + cls, + details: CallbackDetails | StepDetails | ChainedInvokeDetails | ContextDetails, + include: bool = False, # noqa: FBT001, FBT002 + ) -> EventResult: + details_result: str | None = details.result if details else None + payload: str | None = details_result if include else None + truncated: bool = not include + return cls(payload=payload, truncated=truncated) + + @classmethod + def from_durable_execution_invocation_output( + cls, + durable_execution_invocation_output: DurableExecutionInvocationOutput, + include: bool = False, # noqa: FBT001, FBT002 + ) -> EventResult: + truncated: bool = not include + return cls(durable_execution_invocation_output.result, truncated) + @dataclass(frozen=True) class EventError: @@ -491,6 +583,25 @@ def to_dict(self) -> dict[str, Any]: result["Payload"] = self.payload.to_dict() return result + @classmethod + def from_details( + cls, + details: CallbackDetails | StepDetails | ChainedInvokeDetails | ContextDetails, + include: bool = False, # noqa: FBT001, FBT002 + ) -> EventError: + error_object: ErrorObject | None = details.error if details else None + truncated: bool = not include + return cls(error_object, truncated) + + @classmethod + def from_durable_execution_invocation_output( + cls, + durable_execution_invocation_output: DurableExecutionInvocationOutput, + include: bool = False, # noqa: FBT001, FBT002 + ) -> EventError: + truncated: bool = not include + return cls(durable_execution_invocation_output.error, truncated) + @dataclass(frozen=True) class RetryDetails: @@ -809,31 +920,46 @@ def to_dict(self) -> dict[str, Any]: @dataclass(frozen=True) -class ChainedInvokeStartedDetails: - """Invoke started event details.""" +class ChainedInvokePendingDetails: + """Chained Invoke Pending event details.""" input: EventInput | None = None - function_arn: str | None = None - durable_execution_arn: str | None = None + function_name: str | None = None @classmethod - def from_dict(cls, data: dict) -> ChainedInvokeStartedDetails: + def from_dict(cls, data: dict) -> ChainedInvokePendingDetails: input_data = None if input_dict := data.get("Input"): input_data = EventInput.from_dict(input_dict) return cls( input=input_data, - function_arn=data.get("FunctionArn"), - durable_execution_arn=data.get("DurableExecutionArn"), + function_name=data.get("FunctionName"), ) def to_dict(self) -> dict[str, Any]: result: dict[str, Any] = {} if self.input is not None: result["Input"] = self.input.to_dict() - if self.function_arn is not None: - result["FunctionArn"] = self.function_arn + if self.function_name is not None: + result["FunctionName"] = self.function_name + return result + + +@dataclass(frozen=True) +class ChainedInvokeStartedDetails: + """Chained invoke started event details.""" + + durable_execution_arn: str | None = None + + @classmethod + def from_dict(cls, data: dict) -> ChainedInvokeStartedDetails: + return cls( + durable_execution_arn=data.get("DurableExecutionArn"), + ) + + def to_dict(self) -> dict[str, Any]: + result: dict[str, Any] = {} if self.durable_execution_arn is not None: result["DurableExecutionArn"] = self.durable_execution_arn return result @@ -841,7 +967,7 @@ def to_dict(self) -> dict[str, Any]: @dataclass(frozen=True) class ChainedInvokeSucceededDetails: - """Invoke succeeded event details.""" + """Chained invoke succeeded event details.""" result: EventResult | None = None @@ -862,7 +988,7 @@ def to_dict(self) -> dict[str, Any]: @dataclass(frozen=True) class ChainedInvokeFailedDetails: - """Invoke failed event details.""" + """Chained invoke failed event details.""" error: EventError | None = None @@ -883,7 +1009,7 @@ def to_dict(self) -> dict[str, Any]: @dataclass(frozen=True) class ChainedInvokeTimedOutDetails: - """Invoke timed out event details.""" + """Chained invoke timed out event details.""" error: EventError | None = None @@ -904,7 +1030,7 @@ def to_dict(self) -> dict[str, Any]: @dataclass(frozen=True) class ChainedInvokeStoppedDetails: - """Invoke stopped event details.""" + """Chained invoke stopped event details.""" error: EventError | None = None @@ -1013,6 +1139,78 @@ def to_dict(self) -> dict[str, Any]: return result +# endregion event_structures + + +@dataclass(frozen=True) +class EventCreationContext: + operation: Operation + event_id: int + durable_execution_arn: str + start_durable_execution_input: StartDurableExecutionInput + durable_execution_invocation_output: DurableExecutionInvocationOutput | None = None + operation_update: OperationUpdate | None = None + include_execution_data: bool = False # noqa: FBT001, FBT002 + + @classmethod + def create( + cls, + operation: Operation, + event_id: int, + durable_execution_arn: str, + start_input: StartDurableExecutionInput, + result: DurableExecutionInvocationOutput | None = None, + operation_update: OperationUpdate | None = None, + include_execution_data: bool = False, # noqa: FBT001, FBT002 + ) -> EventCreationContext: + return cls( + operation=operation, + event_id=event_id, + durable_execution_arn=durable_execution_arn, + start_durable_execution_input=start_input, + durable_execution_invocation_output=result, + operation_update=operation_update, + include_execution_data=include_execution_data, + ) + + @property + def sub_type(self) -> str | None: + return self.operation.sub_type.value if self.operation.sub_type else None + + def get_retry_details(self) -> RetryDetails | None: + if not self.operation.step_details or not self.operation_update: + return None + + delay = 0 + if ( + self.operation_update.operation_type == OperationType.STEP + and self.operation_update.step_options + ): + delay = self.operation_update.step_options.next_attempt_delay_seconds + + return RetryDetails( + current_attempt=self.operation.step_details.attempt, + next_attempt_delay_seconds=delay, + ) + + @property + def start_timestamp(self) -> datetime.datetime: + return ( + self.operation.start_timestamp + if self.operation.start_timestamp is not None + else datetime.datetime.now(UTC) + ) + + @property + def end_timestamp(self) -> datetime.datetime: + return ( + self.operation.end_timestamp + if self.operation.end_timestamp is not None + else datetime.datetime.now(UTC) + ) + + +# region event_class @dataclass(frozen=True) class Event: """Event structure from Smithy model.""" @@ -1038,6 +1236,7 @@ class Event: step_started_details: StepStartedDetails | None = None step_succeeded_details: StepSucceededDetails | None = None step_failed_details: StepFailedDetails | None = None + chained_invoke_pending_details: ChainedInvokePendingDetails | None = None chained_invoke_started_details: ChainedInvokeStartedDetails | None = None chained_invoke_succeeded_details: ChainedInvokeSucceededDetails | None = None chained_invoke_failed_details: ChainedInvokeFailedDetails | None = None @@ -1111,6 +1310,12 @@ def from_dict(cls, data: dict) -> Event: if details_data := data.get("StepFailedDetails"): step_failed_details = StepFailedDetails.from_dict(details_data) + chained_invoke_pending_details = None + if details_data := data.get("ChainedInvokePendingDetails"): + chained_invoke_pending_details = ChainedInvokePendingDetails.from_dict( + details_data + ) + chained_invoke_started_details = None if details_data := data.get("ChainedInvokeStartedDetails"): chained_invoke_started_details = ChainedInvokeStartedDetails.from_dict( @@ -1181,6 +1386,7 @@ def from_dict(cls, data: dict) -> Event: step_started_details=step_started_details, step_succeeded_details=step_succeeded_details, step_failed_details=step_failed_details, + chained_invoke_pending_details=chained_invoke_pending_details, chained_invoke_started_details=chained_invoke_started_details, chained_invoke_succeeded_details=chained_invoke_succeeded_details, chained_invoke_failed_details=chained_invoke_failed_details, @@ -1238,6 +1444,10 @@ def to_dict(self) -> dict[str, Any]: result["StepSucceededDetails"] = self.step_succeeded_details.to_dict() if self.step_failed_details is not None: result["StepFailedDetails"] = self.step_failed_details.to_dict() + if self.chained_invoke_pending_details is not None: + result["ChainedInvokePendingDetails"] = ( + self.chained_invoke_pending_details.to_dict() + ) if self.chained_invoke_started_details is not None: result["ChainedInvokeStartedDetails"] = ( self.chained_invoke_started_details.to_dict() @@ -1272,7 +1482,702 @@ def to_dict(self) -> dict[str, Any]: ) return result + # region execution + @classmethod + def create_execution_event_started(cls, context: EventCreationContext) -> Event: + execution_details: ExecutionDetails | None = context.operation.execution_details + event_input: EventInput | None = ( + EventInput.from_details(execution_details, context.include_execution_data) + if execution_details + else None + ) + execution_timeout: int | None = ( + context.start_durable_execution_input.execution_timeout_seconds + ) + + return cls( + event_type=EventType.EXECUTION_STARTED.value, + event_timestamp=context.start_timestamp, + sub_type=context.sub_type, + event_id=context.event_id, + operation_id=context.operation.operation_id, + name=context.operation.name, + parent_id=context.operation.parent_id, + execution_started_details=ExecutionStartedDetails( + input=event_input, + execution_timeout=execution_timeout, + ), + ) + + @classmethod + def create_execution_event_succeeded(cls, context: EventCreationContext) -> Event: + result: EventResult | None = ( + EventResult.from_durable_execution_invocation_output( + context.durable_execution_invocation_output, + context.include_execution_data, + ) + if context.durable_execution_invocation_output + else None + ) + return cls( + event_type=EventType.EXECUTION_SUCCEEDED.value, + event_timestamp=context.end_timestamp, + sub_type=context.sub_type, + event_id=context.event_id, + operation_id=context.operation.operation_id, + name=context.operation.name, + parent_id=context.operation.parent_id, + execution_succeeded_details=ExecutionSucceededDetails(result=result), + ) + + @classmethod + def create_execution_event_failed(cls, context: EventCreationContext) -> Event: + error: EventError | None = ( + EventError.from_durable_execution_invocation_output( + context.durable_execution_invocation_output, + include=context.include_execution_data, + ) + if context.durable_execution_invocation_output + else None + ) + return cls( + event_type=EventType.EXECUTION_FAILED.value, + event_timestamp=context.end_timestamp, + sub_type=context.sub_type, + event_id=context.event_id, + operation_id=context.operation.operation_id, + name=context.operation.name, + parent_id=context.operation.parent_id, + execution_failed_details=ExecutionFailedDetails(error=error), + ) + + @classmethod + def create_execution_event_timed_out(cls, context: EventCreationContext) -> Event: + error: EventError | None = ( + EventError.from_durable_execution_invocation_output( + context.durable_execution_invocation_output, + include=context.include_execution_data, + ) + if context.durable_execution_invocation_output + else None + ) + return cls( + event_type=EventType.EXECUTION_TIMED_OUT.value, + event_timestamp=context.end_timestamp, + sub_type=context.sub_type, + event_id=context.event_id, + operation_id=context.operation.operation_id, + name=context.operation.name, + parent_id=context.operation.parent_id, + execution_timed_out_details=ExecutionTimedOutDetails(error=error), + ) + + @classmethod + def create_execution_event_stopped(cls, context: EventCreationContext) -> Event: + error: EventError | None = ( + EventError.from_durable_execution_invocation_output( + context.durable_execution_invocation_output, + include=context.include_execution_data, + ) + if context.durable_execution_invocation_output + else None + ) + return cls( + event_type=EventType.EXECUTION_STOPPED.value, + event_timestamp=context.end_timestamp, + sub_type=context.sub_type, + event_id=context.event_id, + operation_id=context.operation.operation_id, + name=context.operation.name, + parent_id=context.operation.parent_id, + execution_stopped_details=ExecutionStoppedDetails(error=error), + ) + + @classmethod + def create_execution_event(cls, context: EventCreationContext) -> Event: + """Create execution event based on action.""" + match context.operation.status: + case OperationStatus.STARTED: + return cls.create_execution_event_started(context) + case OperationStatus.SUCCEEDED: + return cls.create_execution_event_succeeded(context) + case OperationStatus.FAILED: + return cls.create_execution_event_failed(context) + case OperationStatus.TIMED_OUT: + return cls.create_execution_event_timed_out(context) + case OperationStatus.STOPPED: + return cls.create_execution_event_stopped(context) + case _: + msg = f"Operation status {context.operation.status} is not valid for execution operations. Valid statuses are: STARTED, SUCCEEDED, FAILED, TIMED_OUT, STOPPED" + raise InvalidParameterValueException(msg) + + # endregion execution + + # region context + @classmethod + def create_context_event_started(cls, context: EventCreationContext) -> Event: + return cls( + event_type=EventType.CONTEXT_STARTED.value, + event_timestamp=context.start_timestamp, + sub_type=context.sub_type, + event_id=context.event_id, + operation_id=context.operation.operation_id, + name=context.operation.name, + parent_id=context.operation.parent_id, + context_started_details=ContextStartedDetails(), + ) + + @classmethod + def create_context_event_succeeded(cls, context: EventCreationContext) -> Event: + context_details: ContextDetails | None = context.operation.context_details + event_result: EventResult | None = ( + EventResult.from_details(context_details, context.include_execution_data) + if context_details + else None + ) + return cls( + event_type=EventType.CONTEXT_SUCCEEDED.value, + event_timestamp=context.end_timestamp, + sub_type=context.sub_type, + event_id=context.event_id, + operation_id=context.operation.operation_id, + name=context.operation.name, + parent_id=context.operation.parent_id, + context_succeeded_details=ContextSucceededDetails(result=event_result), + ) + + @classmethod + def create_context_event_failed(cls, context: EventCreationContext) -> Event: + context_details: ContextDetails | None = context.operation.context_details + event_error: EventError | None = ( + EventError.from_details(context_details) if context_details else None + ) + return cls( + event_type=EventType.CONTEXT_FAILED.value, + event_timestamp=context.end_timestamp, + sub_type=context.sub_type, + event_id=context.event_id, + operation_id=context.operation.operation_id, + name=context.operation.name, + parent_id=context.operation.parent_id, + context_failed_details=ContextFailedDetails(error=event_error), + ) + + @classmethod + def create_context_event(cls, context: EventCreationContext) -> Event: + """Create context event based on action.""" + match context.operation.status: + case OperationStatus.STARTED: + return cls.create_context_event_started(context) + case OperationStatus.SUCCEEDED: + return cls.create_context_event_succeeded(context) + case OperationStatus.FAILED: + return cls.create_context_event_failed(context) + case _: + msg = ( + f"Operation status {context.operation.status} is not valid for context operations. " + f"Valid statuses are: STARTED, SUCCEEDED, FAILED" + ) + raise InvalidParameterValueException(msg) + + # endregion context + + # region wait + @classmethod + def create_wait_event_started(cls, context: EventCreationContext) -> Event: + wait_details: WaitDetails | None = context.operation.wait_details + scheduled_end_timestamp: datetime.datetime | None = ( + wait_details.scheduled_end_timestamp if wait_details else None + ) + duration: int | None = None + if ( + wait_details + and wait_details.scheduled_end_timestamp + and context.operation.start_timestamp + ): + duration = int( + ( + wait_details.scheduled_end_timestamp + - context.operation.start_timestamp + ).total_seconds() + ) + return cls( + event_type=EventType.WAIT_STARTED.value, + event_timestamp=context.start_timestamp, + sub_type=context.sub_type, + event_id=context.event_id, + operation_id=context.operation.operation_id, + name=context.operation.name, + parent_id=context.operation.parent_id, + wait_started_details=WaitStartedDetails( + duration=duration, + scheduled_end_timestamp=scheduled_end_timestamp, + ), + ) + + @classmethod + def create_wait_event_succeeded(cls, context: EventCreationContext) -> Event: + wait_details: WaitDetails | None = context.operation.wait_details + duration: int | None = None + if ( + wait_details + and wait_details.scheduled_end_timestamp + and context.operation.start_timestamp + ): + duration = int( + ( + wait_details.scheduled_end_timestamp - context.start_timestamp + ).total_seconds() + ) + return cls( + event_type=EventType.WAIT_SUCCEEDED.value, + event_timestamp=context.end_timestamp, + sub_type=context.sub_type, + event_id=context.event_id, + operation_id=context.operation.operation_id, + name=context.operation.name, + parent_id=context.operation.parent_id, + wait_succeeded_details=WaitSucceededDetails(duration=duration), + ) + + @classmethod + def create_wait_event_cancelled(cls, context: EventCreationContext) -> Event: + error: EventError | None = None + if ( + context.operation_update + and context.operation_update.operation_type == OperationType.WAIT + and context.operation_update.action == OperationAction.CANCEL + ): + error = EventError( + context.operation_update.error, not context.include_execution_data + ) + return cls( + event_type=EventType.WAIT_CANCELLED.value, + event_timestamp=context.end_timestamp, + sub_type=context.sub_type, + event_id=context.event_id, + operation_id=context.operation.operation_id, + name=context.operation.name, + parent_id=context.operation.parent_id, + wait_cancelled_details=WaitCancelledDetails(error=error), + ) + + @classmethod + def create_wait_event(cls, context: EventCreationContext) -> Event: + """Create wait event based on action.""" + match context.operation.status: + case OperationStatus.STARTED: + return cls.create_wait_event_started(context) + case OperationStatus.SUCCEEDED: + return cls.create_wait_event_succeeded(context) + case OperationStatus.CANCELLED: + return cls.create_wait_event_cancelled(context) + case _: + msg = ( + f"Operation status {context.operation.status} is not valid for wait operations. " + f"Valid statuses are: STARTED, SUCCEEDED, CANCELLED" + ) + raise InvalidParameterValueException(msg) + + # endregion wait + + # region step + @classmethod + def create_step_event_started(cls, context: EventCreationContext) -> Event: + return cls( + event_type=EventType.STEP_STARTED.value, + event_timestamp=context.start_timestamp, + sub_type=context.sub_type, + event_id=context.event_id, + operation_id=context.operation.operation_id, + name=context.operation.name, + parent_id=context.operation.parent_id, + step_started_details=StepStartedDetails(), + ) + + @classmethod + def create_step_event_succeeded(cls, context: EventCreationContext) -> Event: + step_details: StepDetails | None = context.operation.step_details + event_result: EventResult | None = ( + EventResult.from_details(step_details, context.include_execution_data) + if step_details + else None + ) + return cls( + event_type=EventType.STEP_SUCCEEDED.value, + event_timestamp=context.end_timestamp, + sub_type=context.sub_type, + event_id=context.event_id, + operation_id=context.operation.operation_id, + name=context.operation.name, + parent_id=context.operation.parent_id, + step_succeeded_details=StepSucceededDetails( + result=event_result, + retry_details=context.get_retry_details(), + ), + ) + + @classmethod + def create_step_event_failed(cls, context: EventCreationContext) -> Event: + step_details: StepDetails | None = context.operation.step_details + event_error: EventError | None = ( + EventError.from_details( + step_details, include=context.include_execution_data + ) + if step_details + else None + ) + return cls( + event_type=EventType.STEP_FAILED.value, + event_timestamp=context.end_timestamp, + sub_type=context.sub_type, + event_id=context.event_id, + operation_id=context.operation.operation_id, + name=context.operation.name, + parent_id=context.operation.parent_id, + step_failed_details=StepFailedDetails( + error=event_error, + retry_details=context.get_retry_details(), + ), + ) + + @classmethod + def create_step_event(cls, context: EventCreationContext) -> Event: + """Create step event based on action.""" + match context.operation.status: + case OperationStatus.STARTED: + return cls.create_step_event_started(context) + case OperationStatus.SUCCEEDED: + return cls.create_step_event_succeeded(context) + case OperationStatus.FAILED: + return cls.create_step_event_failed(context) + case _: + msg = ( + f"Operation status {context.operation.status} is not valid for step operations. " + f"Valid statuses are: STARTED, SUCCEEDED, FAILED" + ) + raise InvalidParameterValueException(msg) + + # endregion step + + # region chained_invoke + @classmethod + def create_chained_invoke_event_pending( + cls, context: EventCreationContext + ) -> Event: + input: EventInput = EventInput.from_start_durable_execution_input( + context.start_durable_execution_input, context.include_execution_data + ) + return cls( + event_type=EventType.CHAINED_INVOKE_STARTED.value, + event_timestamp=context.start_timestamp, + sub_type=context.sub_type, + event_id=context.event_id, + operation_id=context.operation.operation_id, + name=context.operation.name, + parent_id=context.operation.parent_id, + chained_invoke_pending_details=ChainedInvokePendingDetails( + input=input, + function_name=context.start_durable_execution_input.function_name, + ), + ) + + @classmethod + def create_chained_invoke_event_started( + cls, context: EventCreationContext + ) -> Event: + return cls( + event_type=EventType.CHAINED_INVOKE_STARTED.value, + event_timestamp=context.start_timestamp, + sub_type=context.sub_type, + event_id=context.event_id, + operation_id=context.operation.operation_id, + name=context.operation.name, + parent_id=context.operation.parent_id, + chained_invoke_started_details=ChainedInvokeStartedDetails( + durable_execution_arn=context.durable_execution_arn + ), + ) + + @classmethod + def create_chained_invoke_event_succeeded( + cls, context: EventCreationContext + ) -> Event: + chained_invoke_details: ChainedInvokeDetails | None = ( + context.operation.chained_invoke_details + ) + event_result: EventResult | None = ( + EventResult.from_details( + chained_invoke_details, context.include_execution_data + ) + if chained_invoke_details + else None + ) + return cls( + event_type=EventType.CHAINED_INVOKE_SUCCEEDED.value, + event_timestamp=context.end_timestamp, + sub_type=context.sub_type, + event_id=context.event_id, + operation_id=context.operation.operation_id, + name=context.operation.name, + parent_id=context.operation.parent_id, + chained_invoke_succeeded_details=ChainedInvokeSucceededDetails( + result=event_result + ), + ) + + @classmethod + def create_chained_invoke_event_failed(cls, context: EventCreationContext) -> Event: + chained_invoke_details: ChainedInvokeDetails | None = ( + context.operation.chained_invoke_details + ) + event_error: EventError | None = ( + EventError.from_details( + chained_invoke_details, include=context.include_execution_data + ) + if chained_invoke_details + else None + ) + return cls( + event_type=EventType.CHAINED_INVOKE_FAILED.value, + event_timestamp=context.end_timestamp, + sub_type=context.sub_type, + event_id=context.event_id, + operation_id=context.operation.operation_id, + name=context.operation.name, + parent_id=context.operation.parent_id, + chained_invoke_failed_details=ChainedInvokeFailedDetails(error=event_error), + ) + + @classmethod + def create_chained_invoke_event_timed_out( + cls, context: EventCreationContext + ) -> Event: + chained_invoke_details: ChainedInvokeDetails | None = ( + context.operation.chained_invoke_details + ) + event_error: EventError | None = ( + EventError.from_details( + chained_invoke_details, include=context.include_execution_data + ) + if chained_invoke_details + else None + ) + return cls( + event_type=EventType.CHAINED_INVOKE_TIMED_OUT.value, + event_timestamp=context.end_timestamp, + sub_type=context.sub_type, + event_id=context.event_id, + operation_id=context.operation.operation_id, + name=context.operation.name, + parent_id=context.operation.parent_id, + chained_invoke_timed_out_details=ChainedInvokeTimedOutDetails( + error=event_error + ), + ) + + @classmethod + def create_chained_invoke_event_stopped( + cls, context: EventCreationContext + ) -> Event: + chained_invoke_details: ChainedInvokeDetails | None = ( + context.operation.chained_invoke_details + ) + event_error: EventError | None = ( + EventError.from_details( + chained_invoke_details, include=context.include_execution_data + ) + if chained_invoke_details + else None + ) + return cls( + event_type=EventType.CHAINED_INVOKE_STOPPED.value, + event_timestamp=context.end_timestamp, + sub_type=context.sub_type, + event_id=context.event_id, + operation_id=context.operation.operation_id, + name=context.operation.name, + parent_id=context.operation.parent_id, + chained_invoke_stopped_details=ChainedInvokeStoppedDetails( + error=event_error + ), + ) + + @classmethod + def create_chained_invoke_event(cls, context: EventCreationContext) -> Event: + """Create chained invoke event based on action.""" + match context.operation.status: + case OperationStatus.PENDING: + return cls.create_chained_invoke_event_pending(context) + case OperationStatus.STARTED: + return cls.create_chained_invoke_event_started(context) + case OperationStatus.SUCCEEDED: + return cls.create_chained_invoke_event_succeeded(context) + case OperationStatus.FAILED: + return cls.create_chained_invoke_event_failed(context) + case OperationStatus.TIMED_OUT: + return cls.create_chained_invoke_event_timed_out(context) + case OperationStatus.STOPPED: + return cls.create_chained_invoke_event_stopped(context) + case _: + msg = ( + f"Operation status {context.operation.status} is not valid for chained invoke operations. Valid statuses are: " + f"STARTED, SUCCEEDED, FAILED, TIMED_OUT, STOPPED" + ) + raise InvalidParameterValueException(msg) + + # endregion chained_invoke + + # region callback + @classmethod + def create_callback_event_started(cls, context: EventCreationContext) -> Event: + callback_details: CallbackDetails | None = context.operation.callback_details + callback_id: str | None = ( + callback_details.callback_id if callback_details else None + ) + return cls( + event_type=EventType.CALLBACK_STARTED.value, + event_timestamp=context.start_timestamp, + sub_type=context.sub_type, + event_id=context.event_id, + operation_id=context.operation.operation_id, + name=context.operation.name, + parent_id=context.operation.parent_id, + callback_started_details=CallbackStartedDetails(callback_id=callback_id), + ) + + @classmethod + def create_callback_event_succeeded(cls, context: EventCreationContext) -> Event: + callback_details: CallbackDetails | None = context.operation.callback_details + event_result: EventResult | None = ( + EventResult.from_details(callback_details, context.include_execution_data) + if callback_details + else None + ) + return cls( + event_type=EventType.CALLBACK_SUCCEEDED.value, + event_timestamp=context.end_timestamp, + sub_type=context.sub_type, + event_id=context.event_id, + operation_id=context.operation.operation_id, + name=context.operation.name, + parent_id=context.operation.parent_id, + callback_succeeded_details=CallbackSucceededDetails(result=event_result), + ) + + @classmethod + def create_callback_event_failed(cls, context: EventCreationContext) -> Event: + callback_details: CallbackDetails | None = context.operation.callback_details + event_error: EventError | None = ( + EventError.from_details(callback_details) if callback_details else None + ) + return cls( + event_type=EventType.CALLBACK_FAILED.value, + event_timestamp=context.end_timestamp, + sub_type=context.sub_type, + event_id=context.event_id, + operation_id=context.operation.operation_id, + name=context.operation.name, + parent_id=context.operation.parent_id, + callback_failed_details=CallbackFailedDetails(error=event_error), + ) + + @classmethod + def create_callback_event_timed_out(cls, context: EventCreationContext) -> Event: + callback_details: CallbackDetails | None = context.operation.callback_details + event_error: EventError | None = ( + EventError.from_details(callback_details) if callback_details else None + ) + return cls( + event_type=EventType.CALLBACK_TIMED_OUT.value, + event_timestamp=context.end_timestamp, + sub_type=context.sub_type, + event_id=context.event_id, + operation_id=context.operation.operation_id, + name=context.operation.name, + parent_id=context.operation.parent_id, + callback_timed_out_details=CallbackTimedOutDetails(error=event_error), + ) + + @classmethod + def create_callback_event(cls, context: EventCreationContext) -> Event: + """Create callback event based on action.""" + match context.operation.status: + case OperationStatus.STARTED: + return cls.create_callback_event_started(context) + case OperationStatus.SUCCEEDED: + return cls.create_callback_event_succeeded(context) + case OperationStatus.FAILED: + return cls.create_callback_event_failed(context) + case OperationStatus.TIMED_OUT: + return cls.create_callback_event_timed_out(context) + case _: + msg = ( + f"Operation status {context.operation.status} is not valid for callback operations. " + f"Valid statuses are: STARTED, SUCCEEDED, FAILED, TIMED_OUT" + ) + raise InvalidParameterValueException(msg) + + # endregion callback + + @classmethod + def create_event_started(cls, context: EventCreationContext) -> Event: + """Convert operation to started event.""" + if context.operation.start_timestamp is None: + msg: str = "Operation start timestamp cannot be None when converting to started event" + raise InvalidParameterValueException(msg) + + match context.operation.operation_type: + case OperationType.EXECUTION: + return cls.create_execution_event_started(context) + case OperationType.CONTEXT: + return cls.create_context_event_started(context) + case OperationType.WAIT: + return cls.create_wait_event_started(context) + case OperationType.STEP: + return cls.create_step_event_started(context) + case OperationType.CHAINED_INVOKE: + return cls.create_chained_invoke_event_started(context) + case OperationType.CALLBACK: + return cls.create_callback_event_started(context) + case _: + msg = f"Unknown operation type: {context.operation.operation_type}" + raise InvalidParameterValueException(msg) + + @classmethod + def create_event_terminated(cls, context: EventCreationContext) -> Event: + """Convert operation to finished event.""" + operation: Operation = context.operation + if operation.end_timestamp is None: + msg: str = "Operation end timestamp cannot be None when converting to finished event" + raise InvalidParameterValueException(msg) + + if operation.status not in TERMINAL_STATUSES: + msg = f"Operation status must be one of SUCCEEDED, FAILED, TIMED_OUT, STOPPED, or CANCELLED. Got: {operation.status}" + raise InvalidParameterValueException(msg) + + match operation.operation_type: + case OperationType.EXECUTION: + return cls.create_execution_event(context) + case OperationType.CONTEXT: + return cls.create_context_event(context) + case OperationType.WAIT: + return cls.create_wait_event(context) + case OperationType.STEP: + return cls.create_step_event(context) + case OperationType.CHAINED_INVOKE: + return cls.create_chained_invoke_event(context) + case OperationType.CALLBACK: + return cls.create_callback_event(context) + case _: + msg = f"Unknown operation type: {operation.operation_type}" + raise InvalidParameterValueException(msg) + + +# endregion event_class + +# region history_models @dataclass(frozen=True) class HistoryEventTypeConfig: """Configuration for how to process a specific event type.""" @@ -1477,7 +2382,7 @@ def events_to_operations(events: list[Event]) -> list[Operation]: List of operations, one per unique operation ID Raises: - ValueError: When required fields are missing from an event + InvalidParameterValueException: When required fields are missing from an event Note: InvocationCompleted events are currently skipped as they don't represent @@ -1489,7 +2394,7 @@ def events_to_operations(events: list[Event]) -> list[Operation]: for event in events: if not event.event_type: msg = "Missing required 'event_type' field in event" - raise ValueError(msg) + raise InvalidParameterValueException(msg) # Get event type configuration event_config: HistoryEventTypeConfig | None = HISTORY_EVENT_TYPES.get( @@ -1497,7 +2402,7 @@ def events_to_operations(events: list[Event]) -> list[Operation]: ) if not event_config: msg = f"Unknown event type: {event.event_type}" - raise ValueError(msg) + raise InvalidParameterValueException(msg) # TODO: add support for populating invocation information from InvocationCompleted event if event.event_type == "InvocationCompleted": @@ -1505,7 +2410,7 @@ def events_to_operations(events: list[Event]) -> list[Operation]: if not event.operation_id: msg = f"Missing required 'operation_id' field in event {event.event_id}" - raise ValueError(msg) + raise InvalidParameterValueException(msg) # Get previous operation if it exists previous_operation: Operation | None = operations_map.get(event.operation_id) @@ -1523,8 +2428,8 @@ def events_to_operations(events: list[Event]) -> list[Operation]: if event.sub_type: try: sub_type = OperationSubType(event.sub_type) - except ValueError: - pass + except ValueError as e: + raise InvalidParameterValueException(str(e)) from e # Create base operation operation = Operation( @@ -1851,6 +2756,10 @@ def to_dict(self) -> dict[str, Any]: return result +# endregion history_models + + +# region callback_models # Callback-related models @dataclass(frozen=True) class SendDurableExecutionCallbackSuccessRequest: @@ -1927,6 +2836,10 @@ class SendDurableExecutionCallbackHeartbeatResponse: """Response from sending callback heartbeat.""" +# endregion callback_models + + +# region checkpoint_models # Checkpoint-related models @dataclass(frozen=True) class CheckpointUpdatedExecutionState: @@ -2053,6 +2966,10 @@ def to_dict(self) -> dict[str, Any]: return result +# endregion checkpoint_models + + +# region error_models # Error response structure for consistent error handling @dataclass(frozen=True) class ErrorResponse: @@ -2098,3 +3015,6 @@ def to_dict(self) -> dict[str, Any]: error_data["requestId"] = self.request_id return {"error": error_data} + + +# endregion error_models diff --git a/tests/checkpoint/processors/base_test.py b/tests/checkpoint/processors/base_test.py index d058944..ee588c4 100644 --- a/tests/checkpoint/processors/base_test.py +++ b/tests/checkpoint/processors/base_test.py @@ -318,9 +318,11 @@ def test_create_invoke_details_no_options(): def test_create_wait_details_with_current_operation(): processor = MockProcessor() - scheduled_time = datetime.datetime.now(tz=datetime.UTC) + scheduled_end_timestamp = datetime.datetime.now(tz=datetime.UTC) current_op = Mock() - current_op.wait_details = WaitDetails(scheduled_end_timestamp=scheduled_time) + current_op.wait_details = WaitDetails( + scheduled_end_timestamp=scheduled_end_timestamp + ) wait_options = WaitOptions(wait_seconds=30) update = OperationUpdate( @@ -333,7 +335,7 @@ def test_create_wait_details_with_current_operation(): result = processor.create_wait_details(update, current_op) assert isinstance(result, WaitDetails) - assert result.scheduled_end_timestamp == scheduled_time + assert result.scheduled_end_timestamp == scheduled_end_timestamp def test_create_wait_details_without_current_operation(): diff --git a/tests/event_factory_test.py b/tests/event_factory_test.py new file mode 100644 index 0000000..1f4238e --- /dev/null +++ b/tests/event_factory_test.py @@ -0,0 +1,2002 @@ +"""Tests for Event factory methods. + +This module tests all the event creation factory methods in the Event class. +""" + +from datetime import UTC, datetime +from unittest.mock import Mock + +import pytest +from aws_durable_execution_sdk_python.lambda_service import ( + ErrorObject, + OperationStatus, + OperationType, + StepDetails, + OperationUpdate, + OperationSubType, + OperationAction, + StepOptions, +) + +from aws_durable_execution_sdk_python_testing.exceptions import ( + InvalidParameterValueException, +) +from aws_durable_execution_sdk_python_testing.model import ( + CheckpointDurableExecutionRequest, + ErrorResponse, + Event, + EventCreationContext, + EventError, + EventInput, + EventResult, + Execution, + ExecutionStartedDetails, + LambdaContext, + StartDurableExecutionInput, +) + + +# Helper function to create mock operations +def create_mock_operation( + operation_id: str = "op-1", + name: str = "test_op", + parent_id=None, + status: OperationStatus = OperationStatus.STARTED, +): + from unittest.mock import Mock + + op = Mock() + op.operation_id = operation_id + op.name = name + op.parent_id = parent_id + op.status = status + return op + + +# region execution-tests +def test_create_execution_started(): + from unittest.mock import Mock + from aws_durable_execution_sdk_python.lambda_service import ExecutionDetails + + operation = Mock() + operation.operation_id = "op-1" + operation.name = "test_execution" + operation.parent_id = None + operation.status = OperationStatus.STARTED + operation.start_timestamp = datetime.now(UTC) + operation.operation_type = OperationType.EXECUTION + operation.sub_type = None + operation.execution_details = ExecutionDetails(input_payload='{"test": "data"}') + + context = EventCreationContext.create( + operation=operation, + event_id=1, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + include_execution_data=True, + ) + event = Event.create_execution_event(context) + + assert event.event_type == "ExecutionStarted" + assert event.operation_id == "op-1" + assert event.name == "test_execution" + assert event.execution_started_details.input.payload == '{"test": "data"}' + assert event.execution_started_details.execution_timeout == 300 + + +def test_create_execution_succeeded(): + from aws_durable_execution_sdk_python.execution import ( + DurableExecutionInvocationOutput, + InvocationStatus, + ) + + operation = create_mock_operation("op-1", status=OperationStatus.SUCCEEDED) + operation.end_timestamp = datetime.now(UTC) + + result = DurableExecutionInvocationOutput( + status=InvocationStatus.SUCCEEDED, result='{"result": "success"}' + ) + context = EventCreationContext.create( + operation=operation, + event_id=2, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + result=result, + include_execution_data=True, + ) + event = Event.create_execution_event(context) + + assert event.event_type == "ExecutionSucceeded" + assert event.execution_succeeded_details.result.payload == '{"result": "success"}' + + +def test_create_execution_failed(): + from aws_durable_execution_sdk_python.execution import ( + DurableExecutionInvocationOutput, + InvocationStatus, + ) + + operation = create_mock_operation("op-1", status=OperationStatus.FAILED) + operation.end_timestamp = datetime.now(UTC) + + error_result = DurableExecutionInvocationOutput( + status=InvocationStatus.FAILED, + error=ErrorObject.from_message("Execution failed"), + ) + context = EventCreationContext.create( + operation=operation, + event_id=3, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + result=error_result, + include_execution_data=True, + ) + event = Event.create_execution_event(context) + + assert event.event_type == "ExecutionFailed" + assert event.execution_failed_details.error.payload.message == "Execution failed" + + +def test_create_execution_timed_out(): + from aws_durable_execution_sdk_python.execution import ( + DurableExecutionInvocationOutput, + InvocationStatus, + ) + + operation = create_mock_operation("op-1", status=OperationStatus.TIMED_OUT) + operation.end_timestamp = datetime.now(UTC) + + error_result = DurableExecutionInvocationOutput( + status=InvocationStatus.FAILED, + error=ErrorObject.from_message("Execution timed out"), + ) + context = EventCreationContext.create( + operation=operation, + event_id=4, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + result=error_result, + include_execution_data=True, + ) + event = Event.create_execution_event(context) + + assert event.event_type == "ExecutionTimedOut" + assert ( + event.execution_timed_out_details.error.payload.message == "Execution timed out" + ) + + +def test_create_execution_stopped(): + from aws_durable_execution_sdk_python.execution import ( + DurableExecutionInvocationOutput, + InvocationStatus, + ) + + operation = create_mock_operation("op-1", status=OperationStatus.STOPPED) + operation.end_timestamp = datetime.now(UTC) + + error_result = DurableExecutionInvocationOutput( + status=InvocationStatus.FAILED, + error=ErrorObject.from_message("Execution stopped"), + ) + context = EventCreationContext.create( + operation=operation, + event_id=5, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + result=error_result, + include_execution_data=True, + ) + event = Event.create_execution_event(context) + + assert event.event_type == "ExecutionStopped" + assert event.execution_stopped_details.error.payload.message == "Execution stopped" + + +def test_create_execution_invalid_status(): + operation = create_mock_operation("op-1", status=OperationStatus.CANCELLED) + context = EventCreationContext.create( + operation=operation, + event_id=1, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + with pytest.raises( + InvalidParameterValueException, + match="Operation status .* is not valid for execution operations", + ): + Event.create_execution_event(context) + + +# endregion execution-tests + + +# region context-tests +def test_create_context_started(): + operation = create_mock_operation( + "ctx-1", "test_context", status=OperationStatus.STARTED + ) + context = EventCreationContext.create( + operation=operation, + event_id=1, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + event = Event.create_context_event(context) + + assert event.event_type == "ContextStarted" + assert event.operation_id == "ctx-1" + assert event.name == "test_context" + assert event.context_started_details is not None + + +def test_create_context_succeeded(): + operation = create_mock_operation("ctx-1", status=OperationStatus.SUCCEEDED) + operation.context_details = type( + "MockDetails", (), {"result": '{"context": "result"}', "error": None} + )() + context = EventCreationContext.create( + operation=operation, + event_id=2, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + include_execution_data=True, + ) + event = Event.create_context_event(context) + + assert event.event_type == "ContextSucceeded" + assert event.context_succeeded_details.result.payload == '{"context": "result"}' + + +def test_create_context_failed(): + operation = create_mock_operation("ctx-1", status=OperationStatus.FAILED) + error_obj = ErrorObject.from_message("Context failed") + operation.context_details = type( + "MockDetails", (), {"result": None, "error": error_obj} + )() + context = EventCreationContext.create( + operation=operation, + event_id=3, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + event = Event.create_context_event(context) + + assert event.event_type == "ContextFailed" + assert event.context_failed_details.error.payload.message == "Context failed" + + +def test_create_context_invalid_status(): + operation = create_mock_operation("ctx-1", status=OperationStatus.TIMED_OUT) + context = EventCreationContext.create( + operation=operation, + event_id=1, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + with pytest.raises( + InvalidParameterValueException, + match="Operation status .* is not valid for context operations", + ): + Event.create_context_event(context) + + +# endregion context-tests + + +# region wait-tests +def test_create_wait_started(): + operation = create_mock_operation("wait-1", status=OperationStatus.STARTED) + operation.start_timestamp = datetime.fromisoformat("2024-01-01T12:00:00Z") + operation.wait_details = type( + "MockDetails", + (), + {"scheduled_end_timestamp": datetime.fromisoformat("2024-01-01T12:05:00Z")}, + )() + context = EventCreationContext.create( + operation=operation, + event_id=1, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + event = Event.create_wait_event(context) + + assert event.event_type == "WaitStarted" + assert event.wait_started_details.duration == 300 + assert event.wait_started_details.scheduled_end_timestamp == datetime.fromisoformat( + "2024-01-01T12:05:00Z" + ) + + +def test_create_wait_succeeded(): + operation = create_mock_operation("wait-1", status=OperationStatus.SUCCEEDED) + operation.start_timestamp = datetime.fromisoformat("2024-01-01T12:00:00Z") + operation.wait_details = type( + "MockDetails", + (), + {"scheduled_end_timestamp": datetime.fromisoformat("2024-01-01T12:05:00Z")}, + )() + context = EventCreationContext.create( + operation=operation, + event_id=2, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + event = Event.create_wait_event(context) + + assert event.event_type == "WaitSucceeded" + assert event.wait_succeeded_details.duration == 300 + + +def test_create_wait_cancelled(): + operation = create_mock_operation("wait-1", status=OperationStatus.CANCELLED) + operation.wait_details = None + mock_operation_update = Mock() + mock_operation_update.operation_type = OperationType.WAIT + mock_operation_update.operation_update.action = OperationAction.CANCEL + context = EventCreationContext.create( + operation=operation, + event_id=3, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + operation_update=mock_operation_update, + ) + event = Event.create_wait_event(context) + + assert event.event_type == "WaitCancelled" + assert event.wait_cancelled_details is not None + + +def test_create_wait_invalid_status(): + operation = create_mock_operation("wait-1", status=OperationStatus.FAILED) + operation.wait_details.scheduled_end_timestamp = operation.start_timestamp = ( + datetime.fromisoformat("2024-01-01T12:00:00Z") + ) + context = EventCreationContext.create( + operation=operation, + event_id=1, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + with pytest.raises( + InvalidParameterValueException, + match="Operation status .* is not valid for wait operations", + ): + Event.create_wait_event(context) + + +# endregion wait-tests + + +# region step-tests +def test_create_step_started(): + operation = create_mock_operation( + "step-1", "test_step", status=OperationStatus.STARTED + ) + context = EventCreationContext.create( + operation=operation, + event_id=1, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + event = Event.create_step_event(context) + + assert event.event_type == "StepStarted" + assert event.operation_id == "step-1" + assert event.name == "test_step" + assert event.step_started_details is not None + + +def test_create_step_succeeded(): + operation = create_mock_operation("step-1", status=OperationStatus.SUCCEEDED) + operation.step_details = type( + "MockDetails", (), {"result": '{"step": "result"}', "error": None} + )() + context = EventCreationContext.create( + operation=operation, + event_id=2, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + include_execution_data=True, + ) + event = Event.create_step_event(context) + + assert event.event_type == "StepSucceeded" + assert event.step_succeeded_details.result.payload == '{"step": "result"}' + + +def test_create_step_failed(): + operation = create_mock_operation("step-1", status=OperationStatus.FAILED) + error_obj = ErrorObject.from_message("Step failed") + operation.step_details = type( + "MockDetails", (), {"result": None, "error": error_obj} + )() + context = EventCreationContext.create( + operation=operation, + event_id=3, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + event = Event.create_step_event(context) + + assert event.event_type == "StepFailed" + assert event.step_failed_details.error.payload.message == "Step failed" + + +def test_create_step_invalid_status(): + operation = create_mock_operation("step-1", status=OperationStatus.TIMED_OUT) + context = EventCreationContext.create( + operation=operation, + event_id=1, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + with pytest.raises( + InvalidParameterValueException, + match="Operation status .* is not valid for step operations", + ): + Event.create_step_event(context) + + +# endregion step-tests + + +# region chained_invoke +def test_create_chained_invoke_started(): + operation = create_mock_operation( + "invoke-1", "test_invoke", status=OperationStatus.STARTED + ) + context = EventCreationContext.create( + operation=operation, + event_id=1, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + event = Event.create_chained_invoke_event(context) + + assert event.event_type == "ChainedInvokeStarted" + assert event.operation_id == "invoke-1" + assert event.name == "test_invoke" + assert event.chained_invoke_started_details is not None + + +# endregion callback + + +# endregion helpers-test + + +def test_create_chained_invoke_succeeded(): + operation = create_mock_operation("invoke-1", status=OperationStatus.SUCCEEDED) + operation.chained_invoke_details = type( + "MockDetails", (), {"result": '{"invoke": "result"}', "error": None} + )() + context = EventCreationContext.create( + operation=operation, + event_id=2, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + include_execution_data=True, + ) + event = Event.create_chained_invoke_event(context) + + assert event.event_type == "ChainedInvokeSucceeded" + assert ( + event.chained_invoke_succeeded_details.result.payload == '{"invoke": "result"}' + ) + + +def test_create_chained_invoke_failed(): + operation = create_mock_operation("invoke-1", status=OperationStatus.FAILED) + error_obj = ErrorObject.from_message("Invoke failed") + operation.chained_invoke_details = type( + "MockDetails", (), {"result": None, "error": error_obj} + )() + context = EventCreationContext.create( + operation=operation, + event_id=3, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + event = Event.create_chained_invoke_event(context) + + assert event.event_type == "ChainedInvokeFailed" + assert event.chained_invoke_failed_details.error.payload.message == "Invoke failed" + + +def test_create_chained_invoke_timed_out(): + operation = create_mock_operation("invoke-1", status=OperationStatus.TIMED_OUT) + error_obj = ErrorObject.from_message("Invoke timed out") + operation.chained_invoke_details = type( + "MockDetails", (), {"result": None, "error": error_obj} + )() + context = EventCreationContext.create( + operation=operation, + event_id=4, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + event = Event.create_chained_invoke_event(context) + + assert event.event_type == "ChainedInvokeTimedOut" + assert ( + event.chained_invoke_timed_out_details.error.payload.message + == "Invoke timed out" + ) + + +def test_create_chained_invoke_stopped(): + operation = create_mock_operation("invoke-1", status=OperationStatus.STOPPED) + error_obj = ErrorObject.from_message("Invoke stopped") + operation.chained_invoke_details = type( + "MockDetails", (), {"result": None, "error": error_obj} + )() + context = EventCreationContext.create( + operation=operation, + event_id=5, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + event = Event.create_chained_invoke_event(context) + + assert event.event_type == "ChainedInvokeStopped" + assert ( + event.chained_invoke_stopped_details.error.payload.message == "Invoke stopped" + ) + + +def test_create_chained_invoke_invalid_status(): + operation = create_mock_operation("invoke-1", status=OperationStatus.CANCELLED) + context = EventCreationContext.create( + operation=operation, + event_id=1, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + with pytest.raises( + InvalidParameterValueException, + match="Operation status .* is not valid for chained invoke operations", + ): + Event.create_chained_invoke_event(context) + + +# endregion chained_invoke + + +# region callback-tests +def test_create_callback_started(): + operation = create_mock_operation( + "callback-1", "test_callback", status=OperationStatus.STARTED + ) + operation.callback_details = type( + "MockDetails", (), {"callback_id": "cb-123", "result": None, "error": None} + )() + context = EventCreationContext.create( + operation=operation, + event_id=1, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + event = Event.create_callback_event(context) + + assert event.event_type == "CallbackStarted" + assert event.operation_id == "callback-1" + assert event.name == "test_callback" + assert event.callback_started_details.callback_id == "cb-123" + + +def test_create_callback_succeeded(): + operation = create_mock_operation("callback-1", status=OperationStatus.SUCCEEDED) + operation.callback_details = type( + "MockDetails", + (), + {"callback_id": None, "result": '{"callback": "result"}', "error": None}, + )() + context = EventCreationContext.create( + operation=operation, + event_id=2, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + include_execution_data=True, + ) + event = Event.create_callback_event(context) + + assert event.event_type == "CallbackSucceeded" + assert event.callback_succeeded_details.result.payload == '{"callback": "result"}' + + +def test_create_callback_failed(): + operation = create_mock_operation("callback-1", status=OperationStatus.FAILED) + error_obj = ErrorObject.from_message("Callback failed") + operation.callback_details = type( + "MockDetails", (), {"callback_id": None, "result": None, "error": error_obj} + )() + context = EventCreationContext.create( + operation=operation, + event_id=3, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + event = Event.create_callback_event(context) + + assert event.event_type == "CallbackFailed" + assert event.callback_failed_details.error.payload.message == "Callback failed" + + +def test_create_callback_timed_out(): + operation = create_mock_operation("callback-1", status=OperationStatus.TIMED_OUT) + error_obj = ErrorObject.from_message("Callback timed out") + operation.callback_details = type( + "MockDetails", (), {"callback_id": None, "result": None, "error": error_obj} + )() + context = EventCreationContext.create( + operation=operation, + event_id=4, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + event = Event.create_callback_event(context) + + assert event.event_type == "CallbackTimedOut" + assert ( + event.callback_timed_out_details.error.payload.message == "Callback timed out" + ) + + +def test_create_callback_invalid_status(): + operation = create_mock_operation("callback-1", status=OperationStatus.STOPPED) + context = EventCreationContext.create( + operation=operation, + event_id=1, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + with pytest.raises( + InvalidParameterValueException, + match="Operation status .* is not valid for callback operations", + ): + Event.create_callback_event(context) + + +# endregion callback-tests + + +# region model-tests +def test_lambda_context(): + context = LambdaContext(aws_request_id="test-123") + assert context.aws_request_id == "test-123" + assert context.get_remaining_time_in_millis() == 900000 + context.log("test message") # Should not raise + + +def test_start_durable_execution_input_missing_field(): + with pytest.raises( + InvalidParameterValueException, match="Missing required field: AccountId" + ): + StartDurableExecutionInput.from_dict({}) + + +def test_start_durable_execution_input_to_dict_with_optionals(): + input_obj = StartDurableExecutionInput( + account_id="123456789", + function_name="test-func", + function_qualifier="$LATEST", + execution_name="test-exec", + execution_timeout_seconds=300, + execution_retention_period_days=7, + invocation_id="inv-123", + trace_fields={"key": "value"}, + tenant_id="tenant-123", + input='{"test": "data"}', + ) + result = input_obj.to_dict() + assert result["InvocationId"] == "inv-123" + assert result["TraceFields"] == {"key": "value"} + assert result["TenantId"] == "tenant-123" + assert result["Input"] == '{"test": "data"}' + + +def test_execution_from_dict_empty_function_arn(): + data = { + "DurableExecutionArn": "arn:aws:lambda:us-east-1:123456789:function:test", + "DurableExecutionName": "test-exec", + "Status": "SUCCEEDED", + "StartTimestamp": 1640995200.0, + } + execution = Execution.from_dict(data) + assert execution.function_arn == "" + + +def test_execution_to_dict_with_function_arn(): + execution = Execution( + durable_execution_arn="arn:aws:lambda:us-east-1:123456789:function:test", + durable_execution_name="test-exec", + function_arn="arn:aws:lambda:us-east-1:123456789:function:test", + status="SUCCEEDED", + start_timestamp=1640995200.0, + ) + result = execution.to_dict() + assert "FunctionArn" in result + + +def test_event_input_from_details(): + from aws_durable_execution_sdk_python.lambda_service import ExecutionDetails + + details = ExecutionDetails(input_payload='{"test": "data"}') + event_input = EventInput.from_details(details, include=True) + assert event_input.payload == '{"test": "data"}' + assert not event_input.truncated + + event_input_truncated = EventInput.from_details(details, include=False) + assert event_input_truncated.payload is None + assert event_input_truncated.truncated + + +def test_event_result_from_details(): + from aws_durable_execution_sdk_python.lambda_service import StepDetails + + details = StepDetails(result='{"result": "success"}') + event_result = EventResult.from_details(details, include=True) + assert event_result.payload == '{"result": "success"}' + assert not event_result.truncated + + +def test_event_error_from_details(): + from aws_durable_execution_sdk_python.lambda_service import StepDetails + + error_obj = ErrorObject.from_message("Test error") + details = StepDetails(error=error_obj) + event_error = EventError.from_details(details) + assert event_error.payload.message == "Test error" + + +def test_event_from_dict_with_all_details(): + data = { + "EventType": "ExecutionStarted", + "EventTimestamp": datetime.fromisoformat("2024-01-01T12:00:00Z"), + "EventId": 1, + "Id": "op-1", + "Name": "test", + "ParentId": "parent-1", + "SubType": "test-subtype", + "ExecutionStartedDetails": { + "Input": {"Payload": '{"test": "data"}', "Truncated": False}, + "ExecutionTimeout": 300, + }, + } + event = Event.from_dict(data) + assert event.sub_type == "test-subtype" + assert event.parent_id == "parent-1" + + +def test_event_to_dict_with_all_details(): + event = Event( + event_type="ExecutionStarted", + event_timestamp=datetime.fromisoformat("2024-01-01T12:00:00Z"), + event_id=1, + operation_id="op-1", + name="test", + parent_id="parent-1", + sub_type="test-subtype", + execution_started_details=ExecutionStartedDetails( + input=EventInput(payload='{"test": "data"}', truncated=False), + execution_timeout=300, + ), + ) + result = event.to_dict() + assert result["SubType"] == "test-subtype" + assert result["ParentId"] == "parent-1" + assert result["ExecutionStartedDetails"]["ExecutionTimeout"] == 300 + + +def test_error_response_from_dict_nested(): + data = { + "error": { + "type": "ValidationError", + "message": "Invalid input", + "code": "400", + "requestId": "req-123", + } + } + error_response = ErrorResponse.from_dict(data) + assert error_response.error_type == "ValidationError" + assert error_response.error_message == "Invalid input" + assert error_response.error_code == "400" + assert error_response.request_id == "req-123" + + +def test_error_response_from_dict_flat(): + data = {"type": "ValidationError", "message": "Invalid input"} + error_response = ErrorResponse.from_dict(data) + assert error_response.error_type == "ValidationError" + assert error_response.error_message == "Invalid input" + + +def test_checkpoint_durable_execution_request_from_dict(): + token: str = "token-123" + data = { + "CheckpointToken": token, + "Updates": [ + {"Id": "op-1", "Type": "STEP", "Action": "START", "SubType": "Step"} + ], + } + request = CheckpointDurableExecutionRequest.from_dict(data, "arn:test") + assert request.checkpoint_token == token + assert len(request.updates) == 1 + assert request.updates[0].operation_id == "op-1" + + +# endregion model-tests + + +# region from_operation_started_tests +class TestFromOperationStarted: + """Tests for Event.from_operation_started method.""" + + def test_from_operation_started_execution(self): + """Test converting execution operation to started event.""" + operation = Mock() + operation.operation_id = "exec-123" + operation.name = "test_execution" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.EXECUTION + operation.start_timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=UTC) + + execution_details = Mock() + execution_details.input_payload = '{"test": "data"}' + operation.execution_details = execution_details + + context = EventCreationContext.create( + operation=operation, + event_id=1, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + include_execution_data=True, + ) + event = Event.create_event_started(context) + + assert event.event_type == "ExecutionStarted" + assert event.operation_id == "exec-123" + assert event.name == "test_execution" + assert event.parent_id == "parent-123" + assert event.execution_started_details.input.payload == '{"test": "data"}' + assert not event.execution_started_details.input.truncated + + def test_from_operation_started_execution_no_data(self): + """Test execution operation with include_execution_data=False.""" + operation = Mock() + operation.operation_id = "exec-123" + operation.name = "test_execution" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.EXECUTION + operation.start_timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=UTC) + + execution_details = Mock() + execution_details.input_payload = '{"test": "data"}' + operation.execution_details = execution_details + + context = EventCreationContext.create( + operation=operation, + event_id=1, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + include_execution_data=False, + ) + event = Event.create_event_started(context) + + assert event.event_type == "ExecutionStarted" + assert event.execution_started_details.input.payload is None + assert event.execution_started_details.input.truncated + + def test_from_operation_started_step(self): + """Test converting step operation to started event.""" + operation = Mock() + operation.operation_id = "step-123" + operation.name = "test_step" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.STEP + operation.start_timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=UTC) + + context = EventCreationContext.create( + operation=operation, + event_id=2, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + event = Event.create_event_started(context) + + assert event.event_type == "StepStarted" + assert event.operation_id == "step-123" + assert event.name == "test_step" + assert event.parent_id == "parent-123" + assert event.step_started_details is not None + + def test_from_operation_started_wait(self): + """Test converting wait operation to started event.""" + operation = Mock() + operation.operation_id = "wait-123" + operation.name = "test_wait" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.WAIT + operation.start_timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=UTC) + + wait_details = Mock() + wait_details.scheduled_end_timestamp = datetime( + 2024, 1, 1, 12, 5, 0, tzinfo=UTC + ) + operation.wait_details = wait_details + + context = EventCreationContext.create( + operation=operation, + event_id=3, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + event = Event.create_event_started(context) + + assert event.event_type == "WaitStarted" + assert event.operation_id == "wait-123" + assert event.name == "test_wait" + assert event.parent_id == "parent-123" + assert event.wait_started_details.duration == 300 + assert ( + event.wait_started_details.scheduled_end_timestamp + == datetime.fromisoformat("2024-01-01T12:05:00+00:00") + ) + + def test_from_operation_started_callback(self): + """Test converting callback operation to started event.""" + operation = Mock() + operation.operation_id = "callback-123" + operation.name = "test_callback" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.CALLBACK + operation.start_timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=UTC) + + callback_details = Mock() + callback_details.callback_id = "cb-456" + operation.callback_details = callback_details + + context = EventCreationContext.create( + operation=operation, + event_id=4, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + event = Event.create_event_started(context) + + assert event.event_type == "CallbackStarted" + assert event.operation_id == "callback-123" + assert event.name == "test_callback" + assert event.parent_id == "parent-123" + assert event.callback_started_details.callback_id == "cb-456" + + def test_from_operation_started_chained_invoke(self): + """Test converting chained invoke operation to started event.""" + operation = Mock() + operation.operation_id = "invoke-123" + operation.name = "test_invoke" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.CHAINED_INVOKE + operation.start_timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=UTC) + + context = EventCreationContext.create( + operation=operation, + event_id=5, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + event = Event.create_event_started(context) + + assert event.event_type == "ChainedInvokeStarted" + assert event.operation_id == "invoke-123" + assert event.name == "test_invoke" + assert event.parent_id == "parent-123" + assert event.chained_invoke_started_details is not None + + def test_from_operation_started_context(self): + """Test converting context operation to started event.""" + operation = Mock() + operation.operation_id = "context-123" + operation.name = "test_context" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.CONTEXT + operation.start_timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=UTC) + + context = EventCreationContext.create( + operation=operation, + event_id=6, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + event = Event.create_event_started(context) + + assert event.event_type == "ContextStarted" + assert event.operation_id == "context-123" + assert event.name == "test_context" + assert event.parent_id == "parent-123" + assert event.context_started_details is not None + + def test_from_operation_started_no_timestamp(self): + """Test error when operation has no start timestamp.""" + operation = Mock() + operation.start_timestamp = None + + context = EventCreationContext.create( + operation=operation, + event_id=1, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + with pytest.raises( + InvalidParameterValueException, + match="Operation start timestamp cannot be None", + ): + Event.create_event_started(context) + + def test_from_operation_started_unknown_type(self): + """Test error with unknown operation type.""" + operation = Mock() + operation.operation_type = "UNKNOWN_TYPE" + operation.start_timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=UTC) + + context = EventCreationContext.create( + operation=operation, + event_id=1, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + with pytest.raises( + InvalidParameterValueException, match="Unknown operation type: UNKNOWN_TYPE" + ): + Event.create_event_started(context) + + +# endregion from_operation_started_tests + + +# region from_operation_finished_tests +class TestFromOperationFinished: + """Tests for Event.from_operation_finished method.""" + + def test_from_operation_finished_execution_succeeded(self): + """Test converting succeeded execution operation to finished event.""" + operation = Mock() + operation.operation_id = "exec-123" + operation.name = "test_execution" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.EXECUTION + operation.status = OperationStatus.SUCCEEDED + operation.end_timestamp = datetime(2024, 1, 1, 12, 5, 0, tzinfo=UTC) + + context = EventCreationContext.create( + operation=operation, + event_id=1, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + event = Event.create_event_terminated(context) + + assert event.event_type == "ExecutionSucceeded" + assert event.operation_id == "exec-123" + assert event.name == "test_execution" + assert event.parent_id == "parent-123" + + def test_from_operation_finished_execution_failed(self): + """Test converting failed execution operation to finished event.""" + operation = Mock() + operation.operation_id = "exec-123" + operation.name = "test_execution" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.EXECUTION + operation.status = OperationStatus.FAILED + operation.end_timestamp = datetime(2024, 1, 1, 12, 5, 0, tzinfo=UTC) + + context = EventCreationContext.create( + operation=operation, + event_id=1, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + event = Event.create_event_terminated(context) + + assert event.event_type == "ExecutionFailed" + assert event.operation_id == "exec-123" + + def test_from_operation_finished_step_with_result(self): + """Test converting succeeded step operation with result.""" + operation = Mock() + operation.operation_id = "step-123" + operation.name = "test_step" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.STEP + operation.status = OperationStatus.SUCCEEDED + operation.end_timestamp = datetime(2024, 1, 1, 12, 5, 0, tzinfo=UTC) + + step_details = Mock() + step_details.result = '{"result": "success"}' + step_details.error = None + operation.step_details = step_details + + context = EventCreationContext.create( + operation=operation, + event_id=2, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + include_execution_data=True, + ) + event = Event.create_event_terminated(context) + + assert event.event_type == "StepSucceeded" + assert event.operation_id == "step-123" + assert event.step_succeeded_details.result.payload == '{"result": "success"}' + + def test_from_operation_finished_step_with_error(self): + """Test converting failed step operation with error.""" + operation = Mock() + operation.operation_id = "step-123" + operation.name = "test_step" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.STEP + operation.status = OperationStatus.FAILED + operation.end_timestamp = datetime(2024, 1, 1, 12, 5, 0, tzinfo=UTC) + + step_details = Mock() + step_details.result = None + step_details.error = ErrorObject.from_message("Step failed") + operation.step_details = step_details + + context = EventCreationContext.create( + operation=operation, + event_id=2, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + event = Event.create_event_terminated(context) + + assert event.event_type == "StepFailed" + assert event.step_failed_details.error.payload.message == "Step failed" + + def test_from_operation_finished_wait_succeeded(self): + """Test converting succeeded wait operation.""" + operation = Mock() + operation.operation_id = "wait-123" + operation.name = "test_wait" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.WAIT + operation.status = OperationStatus.SUCCEEDED + operation.start_timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=UTC) + operation.end_timestamp = datetime(2024, 1, 1, 12, 5, 0, tzinfo=UTC) + + wait_details = Mock() + wait_details.scheduled_end_timestamp = datetime( + 2024, 1, 1, 12, 5, 0, tzinfo=UTC + ) + operation.wait_details = wait_details + + context = EventCreationContext.create( + operation=operation, + event_id=3, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + event = Event.create_event_terminated(context) + + assert event.event_type == "WaitSucceeded" + assert event.wait_succeeded_details.duration == 300 + + def test_from_operation_finished_wait_cancelled(self): + """Test converting cancelled wait operation.""" + operation = Mock() + operation.operation_id = "wait-123" + operation.name = "test_wait" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.WAIT + operation.status = OperationStatus.CANCELLED + operation.end_timestamp = datetime(2024, 1, 1, 12, 3, 0, tzinfo=UTC) + operation.wait_details = None + + context = EventCreationContext.create( + operation=operation, + event_id=3, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + event = Event.create_event_terminated(context) + + assert event.event_type == "WaitCancelled" + assert event.wait_cancelled_details is not None + + def test_from_operation_finished_callback_succeeded(self): + """Test converting succeeded callback operation.""" + operation = Mock() + operation.operation_id = "callback-123" + operation.name = "test_callback" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.CALLBACK + operation.status = OperationStatus.SUCCEEDED + operation.end_timestamp = datetime(2024, 1, 1, 12, 5, 0, tzinfo=UTC) + + callback_details = Mock() + callback_details.result = '{"callback": "result"}' + callback_details.error = None + operation.callback_details = callback_details + + context = EventCreationContext.create( + operation=operation, + event_id=4, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + include_execution_data=True, + ) + event = Event.create_event_terminated(context) + + assert event.event_type == "CallbackSucceeded" + assert ( + event.callback_succeeded_details.result.payload == '{"callback": "result"}' + ) + + def test_from_operation_finished_callback_timed_out(self): + """Test converting timed out callback operation.""" + operation = Mock() + operation.operation_id = "callback-123" + operation.name = "test_callback" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.CALLBACK + operation.status = OperationStatus.TIMED_OUT + operation.end_timestamp = datetime(2024, 1, 1, 12, 5, 0, tzinfo=UTC) + + callback_details = Mock() + callback_details.result = None + callback_details.error = ErrorObject.from_message("Callback timed out") + operation.callback_details = callback_details + + context = EventCreationContext.create( + operation=operation, + event_id=4, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + event = Event.create_event_terminated(context) + + assert event.event_type == "CallbackTimedOut" + assert ( + event.callback_timed_out_details.error.payload.message + == "Callback timed out" + ) + + def test_from_operation_finished_chained_invoke_succeeded(self): + """Test converting succeeded chained invoke operation.""" + operation = Mock() + operation.operation_id = "invoke-123" + operation.name = "test_invoke" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.CHAINED_INVOKE + operation.status = OperationStatus.SUCCEEDED + operation.end_timestamp = datetime(2024, 1, 1, 12, 5, 0, tzinfo=UTC) + + chained_invoke_details = Mock() + chained_invoke_details.result = '{"invoke": "result"}' + chained_invoke_details.error = None + operation.chained_invoke_details = chained_invoke_details + + context = EventCreationContext.create( + operation=operation, + event_id=5, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + include_execution_data=True, + ) + event = Event.create_event_terminated(context) + + assert event.event_type == "ChainedInvokeSucceeded" + assert ( + event.chained_invoke_succeeded_details.result.payload + == '{"invoke": "result"}' + ) + + def test_from_operation_finished_chained_invoke_stopped(self): + """Test converting stopped chained invoke operation.""" + operation = Mock() + operation.operation_id = "invoke-123" + operation.name = "test_invoke" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.CHAINED_INVOKE + operation.status = OperationStatus.STOPPED + operation.end_timestamp = datetime(2024, 1, 1, 12, 5, 0, tzinfo=UTC) + + chained_invoke_details = Mock() + chained_invoke_details.result = None + chained_invoke_details.error = ErrorObject.from_message("Invoke stopped") + operation.chained_invoke_details = chained_invoke_details + + context = EventCreationContext.create( + operation=operation, + event_id=5, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + event = Event.create_event_terminated(context) + + assert event.event_type == "ChainedInvokeStopped" + assert ( + event.chained_invoke_stopped_details.error.payload.message + == "Invoke stopped" + ) + + def test_from_operation_finished_context_succeeded(self): + """Test converting succeeded context operation.""" + operation = Mock() + operation.operation_id = "context-123" + operation.name = "test_context" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.CONTEXT + operation.status = OperationStatus.SUCCEEDED + operation.end_timestamp = datetime(2024, 1, 1, 12, 5, 0, tzinfo=UTC) + + context_details = Mock() + context_details.result = '{"context": "result"}' + context_details.error = None + operation.context_details = context_details + operation.result = None + operation.error = None + + context = EventCreationContext.create( + operation=operation, + event_id=6, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + include_execution_data=True, + ) + event = Event.create_event_terminated(context) + + assert event.event_type == "ContextSucceeded" + assert event.context_succeeded_details.result.payload == '{"context": "result"}' + + def test_from_operation_finished_context_failed(self): + """Test converting failed context operation.""" + operation = Mock() + operation.operation_id = "context-123" + operation.name = "test_context" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.CONTEXT + operation.status = OperationStatus.FAILED + operation.end_timestamp = datetime(2024, 1, 1, 12, 5, 0, tzinfo=UTC) + + context_details = Mock() + context_details.result = None + context_details.error = ErrorObject.from_message("Context failed") + operation.context_details = context_details + operation.result = None + operation.error = None + + context = EventCreationContext.create( + operation=operation, + event_id=6, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + event = Event.create_event_terminated(context) + + assert event.event_type == "ContextFailed" + assert event.context_failed_details.error.payload.message == "Context failed" + + def test_from_operation_finished_no_end_timestamp(self): + """Test error when operation has no end timestamp.""" + operation = Mock() + operation.end_timestamp = None + + context = EventCreationContext.create( + operation=operation, + event_id=1, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + with pytest.raises( + InvalidParameterValueException, + match="Operation end timestamp cannot be None", + ): + Event.create_event_terminated(context) + + def test_from_operation_finished_invalid_status(self): + """Test error with invalid operation status.""" + operation = Mock() + operation.status = OperationStatus.STARTED + operation.end_timestamp = datetime(2024, 1, 1, 12, 5, 0, tzinfo=UTC) + + context = EventCreationContext.create( + operation=operation, + event_id=1, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + with pytest.raises( + InvalidParameterValueException, + match="Operation status must be one of SUCCEEDED, FAILED, TIMED_OUT, STOPPED, or CANCELLED", + ): + Event.create_event_terminated(context) + + def test_from_operation_finished_unknown_type(self): + """Test error with unknown operation type.""" + operation = Mock() + operation.operation_type = "UNKNOWN_TYPE" + operation.status = OperationStatus.SUCCEEDED + operation.end_timestamp = datetime(2024, 1, 1, 12, 5, 0, tzinfo=UTC) + + context = EventCreationContext.create( + operation=operation, + event_id=1, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + with pytest.raises( + InvalidParameterValueException, match="Unknown operation type: UNKNOWN_TYPE" + ): + Event.create_event_terminated(context) + + def test_from_operation_finished_no_details(self): + """Test operations with no detail objects.""" + operation = Mock() + operation.operation_id = "step-123" + operation.name = "test_step" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.STEP + operation.status = OperationStatus.SUCCEEDED + operation.end_timestamp = datetime(2024, 1, 1, 12, 5, 0, tzinfo=UTC) + operation.step_details = None + + context = EventCreationContext.create( + operation=operation, + event_id=2, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + event = Event.create_event_terminated(context) + + assert event.event_type == "StepSucceeded" + assert event.step_succeeded_details.result is None + + +# endregion from_operation_finished_tests + + +def test_chained_invoke_pending_details_from_dict(): + """Test ChainedInvokePendingDetails parsing in Event.from_dict.""" + data = { + "EventType": "ChainedInvokeStarted", + "EventTimestamp": datetime.now(UTC), + "ChainedInvokePendingDetails": { + "Input": {"Payload": "test-input", "Truncated": False}, + "FunctionName": "test-function", + }, + } + + event = Event.from_dict(data) + assert event.chained_invoke_pending_details is not None + assert event.chained_invoke_pending_details.input.payload == "test-input" + assert event.chained_invoke_pending_details.function_name == "test-function" + + +def test_event_creation_context_sub_type_property(): + """Test EventCreationContext.sub_type property with and without sub_type.""" + # Test with sub_type + operation = Mock() + operation.sub_type = OperationSubType.STEP + + context = EventCreationContext.create( + operation=operation, + event_id=1, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + + assert context.sub_type == "Step" + + # Test without sub_type + operation.sub_type = None + context = EventCreationContext.create( + operation=operation, + event_id=1, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + + assert context.sub_type is None + + +def test_event_creation_context_get_retry_details(): + """Test EventCreationContext.get_retry_details method.""" + operation = Mock() + operation.step_details = StepDetails(attempt=2) + + operation_update = OperationUpdate( + operation_id="step-1", + operation_type=OperationType.STEP, + action=OperationAction.SUCCEED, + step_options=StepOptions(next_attempt_delay_seconds=30), + ) + + context = EventCreationContext.create( + operation=operation, + event_id=1, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + operation_update=operation_update, + ) + + retry_details = context.get_retry_details() + assert retry_details is not None + assert retry_details.current_attempt == 2 + assert retry_details.next_attempt_delay_seconds == 30 + + # Test with no step_details + operation.step_details = None + context = EventCreationContext.create( + operation=operation, + event_id=1, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + operation_update=operation_update, + ) + + retry_details = context.get_retry_details() + assert retry_details is None + + # Test with no operation_update + operation.step_details = StepDetails(attempt=2) + context = EventCreationContext.create( + operation=operation, + event_id=1, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + ) + + retry_details = context.get_retry_details() + assert retry_details is None + + +def test_create_chained_invoke_event_pending(): + """Test Event.create_chained_invoke_event_pending method.""" + operation = Mock() + operation.operation_id = "invoke-1" + operation.name = "test_invoke" + operation.parent_id = None + operation.status = OperationStatus.PENDING + operation.start_timestamp = datetime.now(UTC) + operation.sub_type = None + + context = EventCreationContext.create( + operation=operation, + event_id=1, + durable_execution_arn="arn:test", + start_input=StartDurableExecutionInput( + account_id="123", + function_name="test", + function_qualifier="$LATEST", + execution_name="test", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ), + include_execution_data=True, + ) + + event = Event.create_chained_invoke_event_pending(context) + + assert event.event_type == "ChainedInvokeStarted" + assert event.operation_id == "invoke-1" + assert event.name == "test_invoke" + assert event.chained_invoke_pending_details is not None + assert event.chained_invoke_pending_details.function_name == "test" diff --git a/tests/executor_test.py b/tests/executor_test.py index 75d6de1..2d8ab06 100644 --- a/tests/executor_test.py +++ b/tests/executor_test.py @@ -1997,6 +1997,12 @@ def test_get_execution_state_invalid_token(executor, mock_store): def test_get_execution_history(executor, mock_store): """Test get_execution_history method.""" mock_execution = Mock() + mock_execution.operations = [] # Empty operations list + mock_execution.updates = [] + mock_execution.durable_execution_arn = "" + mock_execution.start_input = Mock() + mock_execution.result = Mock() + mock_store.load.return_value = mock_execution result = executor.get_execution_history("test-arn") @@ -2006,6 +2012,134 @@ def test_get_execution_history(executor, mock_store): mock_store.load.assert_called_once_with("test-arn") +def test_get_execution_history_with_events(executor, mock_store): + """Test get_execution_history with actual events.""" + from aws_durable_execution_sdk_python.lambda_service import StepDetails + + # Create operations that will generate events + op1 = Operation( + operation_id="op-1", + operation_type=OperationType.STEP, + status=OperationStatus.SUCCEEDED, + start_timestamp=datetime.now(UTC), + end_timestamp=datetime.now(UTC), + step_details=StepDetails(result="test_result"), + ) + mock_execution = Mock() + mock_execution.operations = [op1] + mock_execution.updates = [] + mock_execution.durable_execution_arn = "" + mock_execution.start_input = Mock() + mock_execution.result = Mock() + mock_store.load.return_value = mock_execution + + result = executor.get_execution_history("test-arn", include_execution_data=True) + + assert len(result.events) == 2 # Started + Succeeded events + assert result.events[0].event_type == "StepStarted" + assert result.events[1].event_type == "StepSucceeded" + + +def test_get_execution_history_reverse_order(executor, mock_store): + """Test get_execution_history with reverse order.""" + op1 = Operation( + operation_id="op-1", + operation_type=OperationType.STEP, + status=OperationStatus.SUCCEEDED, + start_timestamp=datetime.now(UTC), + end_timestamp=datetime.now(UTC), + ) + + mock_execution = Mock() + mock_execution.operations = [op1] + mock_execution.updates = [] + mock_execution.durable_execution_arn = "" + mock_execution.start_input = Mock() + mock_execution.result = Mock() + mock_store.load.return_value = mock_execution + + result = executor.get_execution_history("test-arn", reverse_order=True) + + assert len(result.events) == 2 + # In reverse order, succeeded event should come first + assert result.events[0].event_type == "StepSucceeded" + assert result.events[1].event_type == "StepStarted" + + +def test_get_execution_history_pagination(executor, mock_store): + """Test get_execution_history with pagination.""" + # Create multiple operations to generate many events + operations = [] + for i in range(3): + op = Operation( + operation_id=f"op-{i}", + operation_type=OperationType.STEP, + status=OperationStatus.SUCCEEDED, + start_timestamp=datetime.now(UTC), + end_timestamp=datetime.now(UTC), + ) + operations.append(op) + + mock_execution = Mock() + mock_execution.operations = operations + mock_execution.updates = [] + mock_execution.durable_execution_arn = "" + mock_execution.start_input = Mock() + mock_execution.result = Mock() + mock_store.load.return_value = mock_execution + + # Test with max_items=2 + result = executor.get_execution_history("test-arn", max_items=2) + + assert len(result.events) == 2 + assert result.next_marker == "3" # Next event_id + + +def test_get_execution_history_pagination_with_marker(executor, mock_store): + """Test get_execution_history pagination with marker.""" + operations = [] + for i in range(3): + op = Operation( + operation_id=f"op-{i}", + operation_type=OperationType.STEP, + status=OperationStatus.SUCCEEDED, + start_timestamp=datetime.now(UTC), + end_timestamp=datetime.now(UTC), + ) + operations.append(op) + + mock_execution = Mock() + mock_execution.operations = operations + mock_execution.updates = [] + mock_execution.durable_execution_arn = "" + mock_execution.start_input = Mock() + mock_execution.result = Mock() + mock_store.load.return_value = mock_execution + + # Test with marker (start from event_id 3) + result = executor.get_execution_history("test-arn", marker="3", max_items=2) + + assert len(result.events) == 2 + # Should get events with event_id >= 3 + + +def test_get_execution_history_invalid_marker(executor, mock_store): + """Test get_execution_history with invalid marker.""" + mock_execution = Mock() + mock_execution.operations = [] + mock_execution.updates = [] + mock_execution.durable_execution_arn = "" + mock_execution.start_input = Mock() + mock_execution.result = Mock() + mock_store.load.return_value = mock_execution + + # Invalid marker should default to 1 + result = executor.get_execution_history("test-arn", marker="invalid") + + assert result.events == [] + assert result.next_marker is None + + def test_checkpoint_execution(executor, mock_store): """Test checkpoint_execution method.""" mock_execution = Mock() diff --git a/tests/model_test.py b/tests/model_test.py index 2cd3fde..015ac9d 100644 --- a/tests/model_test.py +++ b/tests/model_test.py @@ -6,6 +6,10 @@ import pytest +from aws_durable_execution_sdk_python.lambda_service import ( + OperationStatus, + OperationType, +) from aws_durable_execution_sdk_python_testing.exceptions import ( InvalidParameterValueException, ) @@ -63,6 +67,7 @@ WaitCancelledDetails, WaitStartedDetails, WaitSucceededDetails, + events_to_operations, ) @@ -1848,17 +1853,10 @@ def test_step_failed_details_with_error_only(): def test_invoke_started_details_serialization(): """Test ChainedInvokeStartedDetails from_dict/to_dict round-trip.""" data = { - "Input": {"Payload": "invoke-input", "Truncated": False}, - "FunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:target-function", "DurableExecutionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-function:execution:test", } details = ChainedInvokeStartedDetails.from_dict(data) - assert details.input.payload == "invoke-input" - assert ( - details.function_arn - == "arn:aws:lambda:us-east-1:123456789012:function:target-function" - ) assert ( details.durable_execution_arn == "arn:aws:lambda:us-east-1:123456789012:function:my-function:execution:test" @@ -1873,8 +1871,6 @@ def test_invoke_started_details_minimal(): data = {} details = ChainedInvokeStartedDetails.from_dict(data) - assert details.input is None - assert details.function_arn is None assert details.durable_execution_arn is None result_data = details.to_dict() @@ -1883,22 +1879,10 @@ def test_invoke_started_details_minimal(): def test_invoke_started_details_partial(): """Test ChainedInvokeStartedDetails with partial data.""" - data = { - "Input": {"Payload": "invoke-input", "Truncated": False}, - "FunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:target-function", - } - + data = {} details = ChainedInvokeStartedDetails.from_dict(data) - assert details.input.payload == "invoke-input" - assert ( - details.function_arn - == "arn:aws:lambda:us-east-1:123456789012:function:target-function" - ) assert details.durable_execution_arn is None - result_data = details.to_dict() - assert result_data == data - # Tests for ChainedInvokeSucceededDetails def test_invoke_succeeded_details_serialization(): @@ -2504,15 +2488,13 @@ def test_event_with_invoke_started_details(): "EventType": "ChainedInvokeStarted", "EventTimestamp": TIMESTAMP_2023_01_01_00_01, "ChainedInvokeStartedDetails": { - "Input": {"Payload": "invoke input", "Truncated": False}, - "FunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:target", + "DurableExecutionArn": "arn:aws:durable-execution:us-east-1:123456789012:execution:my-execution:1234567890", }, } event_obj = Event.from_dict(data) assert event_obj.event_type == "ChainedInvokeStarted" assert event_obj.chained_invoke_started_details is not None - assert event_obj.chained_invoke_started_details.input.payload == "invoke input" result_data = event_obj.to_dict() expected_data = { @@ -2520,8 +2502,7 @@ def test_event_with_invoke_started_details(): "EventTimestamp": TIMESTAMP_2023_01_01_00_01, "EventId": 1, "ChainedInvokeStartedDetails": { - "Input": {"Payload": "invoke input", "Truncated": False}, - "FunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:target", + "DurableExecutionArn": "arn:aws:durable-execution:us-east-1:123456789012:execution:my-execution:1234567890", }, } assert result_data == expected_data @@ -2955,20 +2936,6 @@ def test_events_to_operations_empty_list(): def test_events_to_operations_execution_started(): """Test events_to_operations with ExecutionStarted event.""" - import datetime - - from aws_durable_execution_sdk_python.lambda_service import ( - OperationStatus, - OperationType, - ) - - from aws_durable_execution_sdk_python_testing.model import ( - Event, - EventInput, - ExecutionStartedDetails, - events_to_operations, - ) - event = Event( event_type="ExecutionStarted", event_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), @@ -2990,20 +2957,6 @@ def test_events_to_operations_execution_started(): def test_events_to_operations_callback_lifecycle(): """Test events_to_operations with complete callback lifecycle.""" - import datetime - - from aws_durable_execution_sdk_python.lambda_service import ( - OperationStatus, - OperationType, - ) - - from aws_durable_execution_sdk_python_testing.model import ( - CallbackStartedDetails, - CallbackSucceededDetails, - Event, - EventResult, - events_to_operations, - ) started_event = Event( event_type="CallbackStarted", @@ -3036,57 +2989,42 @@ def test_events_to_operations_callback_lifecycle(): def test_events_to_operations_missing_event_type(): """Test events_to_operations raises error for missing event_type.""" - import datetime - - from aws_durable_execution_sdk_python_testing.model import ( - Event, - events_to_operations, - ) - event = Event( event_type=None, event_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), ) - with pytest.raises(ValueError, match="Missing required 'event_type' field"): + with pytest.raises( + InvalidParameterValueException, match="Missing required 'event_type' field" + ): events_to_operations([event]) def test_events_to_operations_unknown_event_type(): """Test events_to_operations raises error for unknown event type.""" - import datetime - - from aws_durable_execution_sdk_python_testing.model import ( - Event, - events_to_operations, - ) - event = Event( event_type="UnknownEventType", event_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), operation_id="op-1", ) - with pytest.raises(ValueError, match="Unknown event type: UnknownEventType"): + with pytest.raises( + InvalidParameterValueException, match="Unknown event type: UnknownEventType" + ): events_to_operations([event]) def test_events_to_operations_missing_operation_id(): """Test events_to_operations raises error for missing operation_id.""" - import datetime - - from aws_durable_execution_sdk_python_testing.model import ( - Event, - events_to_operations, - ) - event = Event( event_type="StepStarted", event_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), operation_id=None, ) - with pytest.raises(ValueError, match="Missing required 'operation_id' field"): + with pytest.raises( + InvalidParameterValueException, match="Missing required 'operation_id' field" + ): events_to_operations([event]) @@ -3243,13 +3181,6 @@ def test_events_to_operations_chained_invoke_succeeded(): def test_events_to_operations_skips_invocation_completed(): """Test events_to_operations skips InvocationCompleted events.""" - import datetime - - from aws_durable_execution_sdk_python_testing.model import ( - Event, - events_to_operations, - ) - invocation_event = Event( event_type="InvocationCompleted", event_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), @@ -3550,13 +3481,6 @@ def test_events_to_operations_merges_timestamps(): def test_events_to_operations_preserves_parent_id(): """Test events_to_operations preserves parent_id from events.""" - import datetime - - from aws_durable_execution_sdk_python_testing.model import ( - Event, - events_to_operations, - ) - event = Event( event_type="StepStarted", event_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), @@ -3573,13 +3497,6 @@ def test_events_to_operations_preserves_parent_id(): def test_events_to_operations_preserves_sub_type(): """Test events_to_operations preserves sub_type from events.""" - import datetime - - from aws_durable_execution_sdk_python_testing.model import ( - Event, - events_to_operations, - ) - event = Event( event_type="StepStarted", event_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), @@ -3595,23 +3512,17 @@ def test_events_to_operations_preserves_sub_type(): def test_events_to_operations_invalid_sub_type(): - """Test events_to_operations handles invalid sub_type gracefully.""" - import datetime - - from aws_durable_execution_sdk_python_testing.model import ( - Event, - events_to_operations, - ) - + """Test events_to_operations raises InvalidParameterValueException when sub_type is invalid.""" + invalid_sub_type: str = "INVALID_SUB_TYPE" event = Event( event_type="StepStarted", event_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), operation_id="step-1", - sub_type="INVALID_SUB_TYPE", + sub_type=invalid_sub_type, ) - operations = events_to_operations([event]) - - assert len(operations) == 1 - # Invalid sub_type should be ignored (set to None) - assert operations[0].sub_type is None + with pytest.raises( + InvalidParameterValueException, + match=f"'{invalid_sub_type}' is not a valid OperationSubType", + ): + events_to_operations([event])