33import json
44from dataclasses import replace
55from datetime import UTC , datetime
6+ from enum import Enum
67from threading import Lock
78from typing import Any
89from uuid import uuid4
2021 OperationUpdate ,
2122)
2223
23- # Import AWS exceptions
2424from aws_durable_execution_sdk_python_testing .exceptions import (
2525 IllegalStateException ,
2626 InvalidParameterValueException ,
2727)
28+
29+ # Import AWS exceptions
2830from aws_durable_execution_sdk_python_testing .model import (
2931 StartDurableExecutionInput ,
3032)
3436)
3537
3638
39+ class ExecutionStatus (Enum ):
40+ """Execution status for API responses."""
41+
42+ RUNNING = "RUNNING"
43+ SUCCEEDED = "SUCCEEDED"
44+ FAILED = "FAILED"
45+ STOPPED = "STOPPED"
46+ TIMED_OUT = "TIMED_OUT"
47+
48+
3749class Execution :
3850 """Execution state."""
3951
@@ -55,12 +67,24 @@ def __init__(
5567 self .is_complete : bool = False
5668 self .result : DurableExecutionInvocationOutput | None = None
5769 self .consecutive_failed_invocation_attempts : int = 0
70+ self .close_status : ExecutionStatus | None = None
5871
5972 @property
6073 def token_sequence (self ) -> int :
6174 """Get current token sequence value."""
6275 return self ._token_sequence
6376
77+ def current_status (self ) -> ExecutionStatus :
78+ """Get execution status."""
79+ if not self .is_complete :
80+ return ExecutionStatus .RUNNING
81+
82+ if not self .close_status :
83+ msg : str = "close_status must be set when execution is complete"
84+ raise IllegalStateException (msg )
85+
86+ return self .close_status
87+
6488 @staticmethod
6589 def new (input : StartDurableExecutionInput ) -> Execution : # noqa: A002
6690 # make a nicer arn
@@ -82,6 +106,7 @@ def to_dict(self) -> dict[str, Any]:
82106 "IsComplete" : self .is_complete ,
83107 "Result" : self .result .to_dict () if self .result else None ,
84108 "ConsecutiveFailedInvocationAttempts" : self .consecutive_failed_invocation_attempts ,
109+ "CloseStatus" : self .close_status .value if self .close_status else None ,
85110 }
86111
87112 @classmethod
@@ -115,6 +140,10 @@ def from_dict(cls, data: dict[str, Any]) -> Execution:
115140 execution .consecutive_failed_invocation_attempts = data [
116141 "ConsecutiveFailedInvocationAttempts"
117142 ]
143+ close_status_str = data .get ("CloseStatus" )
144+ execution .close_status = (
145+ ExecutionStatus (close_status_str ) if close_status_str else None
146+ )
118147
119148 return execution
120149
@@ -187,16 +216,40 @@ def has_pending_operations(self, execution: Execution) -> bool:
187216 return False
188217
189218 def complete_success (self , result : str | None ) -> None :
219+ """Complete execution successfully (DecisionType.COMPLETE_WORKFLOW_EXECUTION)."""
190220 self .result = DurableExecutionInvocationOutput (
191221 status = InvocationStatus .SUCCEEDED , result = result
192222 )
193223 self .is_complete = True
224+ self .close_status = ExecutionStatus .SUCCEEDED
225+ self ._end_execution (OperationStatus .SUCCEEDED )
194226
195227 def complete_fail (self , error : ErrorObject ) -> None :
228+ """Complete execution with failure (DecisionType.FAIL_WORKFLOW_EXECUTION)."""
196229 self .result = DurableExecutionInvocationOutput (
197230 status = InvocationStatus .FAILED , error = error
198231 )
199232 self .is_complete = True
233+ self .close_status = ExecutionStatus .FAILED
234+ self ._end_execution (OperationStatus .FAILED )
235+
236+ def complete_timeout (self , error : ErrorObject ) -> None :
237+ """Complete execution with timeout."""
238+ self .result = DurableExecutionInvocationOutput (
239+ status = InvocationStatus .FAILED , error = error
240+ )
241+ self .is_complete = True
242+ self .close_status = ExecutionStatus .TIMED_OUT
243+ self ._end_execution (OperationStatus .TIMED_OUT )
244+
245+ def complete_stopped (self , error : ErrorObject ) -> None :
246+ """Complete execution as terminated (TerminateWorkflowExecutionV2Request)."""
247+ self .result = DurableExecutionInvocationOutput (
248+ status = InvocationStatus .FAILED , error = error
249+ )
250+ self .is_complete = True
251+ self .close_status = ExecutionStatus .STOPPED
252+ self ._end_execution (OperationStatus .STOPPED )
200253
201254 def find_operation (self , operation_id : str ) -> tuple [int , Operation ]:
202255 """Find operation by ID, return index and operation."""
@@ -327,3 +380,14 @@ def complete_callback_failure(
327380 callback_details = updated_callback_details ,
328381 )
329382 return self .operations [index ]
383+
384+ def _end_execution (self , status : OperationStatus ) -> None :
385+ """Set the end_timestamp on the main EXECUTION operation when execution completes."""
386+ execution_op : Operation = self .get_operation_execution_started ()
387+ if execution_op .operation_type == OperationType .EXECUTION :
388+ with self ._state_lock :
389+ self .operations [0 ] = replace (
390+ execution_op ,
391+ status = status ,
392+ end_timestamp = datetime .now (UTC ),
393+ )
0 commit comments