33import json
44from dataclasses import replace
55from datetime import UTC , datetime
6+ from enum import Enum
67from threading import Lock
78from typing import Any
89from uuid import uuid4
2021 OperationUpdate ,
2122)
2223
23- # Import AWS exceptions
2424from aws_durable_execution_sdk_python_testing .exceptions import (
2525 IllegalStateException ,
2626 InvalidParameterValueException ,
2727)
28+
29+ # Import AWS exceptions
2830from aws_durable_execution_sdk_python_testing .model import (
2931 StartDurableExecutionInput ,
3032)
3436)
3537
3638
39+ class CloseStatus (Enum ):
40+ """Close status for completed executions (mimics backend SWF CloseStatus)."""
41+
42+ COMPLETED = "COMPLETED"
43+ FAILED = "FAILED"
44+ TERMINATED = "TERMINATED"
45+ TIMED_OUT = "TIMED_OUT"
46+
47+
48+ class ExecutionStatus (Enum ):
49+ """Execution status for API responses (mimics backend ExecutionStatus)."""
50+
51+ RUNNING = "RUNNING"
52+ SUCCEEDED = "SUCCEEDED"
53+ FAILED = "FAILED"
54+ STOPPED = "STOPPED"
55+ TIMED_OUT = "TIMED_OUT"
56+
57+ @classmethod
58+ def from_close_status (cls , close_status : CloseStatus ) -> ExecutionStatus :
59+ """Convert CloseStatus to ExecutionStatus."""
60+ match close_status :
61+ case CloseStatus .COMPLETED :
62+ return cls .SUCCEEDED
63+ case CloseStatus .FAILED :
64+ return cls .FAILED
65+ case CloseStatus .TERMINATED :
66+ return cls .STOPPED
67+ case CloseStatus .TIMED_OUT :
68+ return cls .TIMED_OUT
69+ case _:
70+ error_msg : str = f"Unexpected close status: { close_status } "
71+ raise InvalidParameterValueException (error_msg )
72+
73+
3774class Execution :
3875 """Execution state."""
3976
@@ -55,12 +92,27 @@ def __init__(
5592 self .is_complete : bool = False
5693 self .result : DurableExecutionInvocationOutput | None = None
5794 self .consecutive_failed_invocation_attempts : int = 0
95+ self .close_status : CloseStatus | None = (
96+ None # Track close status like backend SWF
97+ )
5898
5999 @property
60100 def token_sequence (self ) -> int :
61101 """Get current token sequence value."""
62102 return self ._token_sequence
63103
104+ def current_status (self ) -> ExecutionStatus :
105+ """Get execution status."""
106+ if not self .is_complete :
107+ return ExecutionStatus .RUNNING
108+
109+ if not self .close_status :
110+ msg : str = "close_status must be set when execution is complete"
111+ raise IllegalStateException (msg )
112+
113+ # Convert CloseStatus to ExecutionStatus
114+ return ExecutionStatus .from_close_status (self .close_status )
115+
64116 @staticmethod
65117 def new (input : StartDurableExecutionInput ) -> Execution : # noqa: A002
66118 # make a nicer arn
@@ -82,6 +134,7 @@ def to_dict(self) -> dict[str, Any]:
82134 "IsComplete" : self .is_complete ,
83135 "Result" : self .result .to_dict () if self .result else None ,
84136 "ConsecutiveFailedInvocationAttempts" : self .consecutive_failed_invocation_attempts ,
137+ "CloseStatus" : self .close_status .value if self .close_status else None ,
85138 }
86139
87140 @classmethod
@@ -115,6 +168,10 @@ def from_dict(cls, data: dict[str, Any]) -> Execution:
115168 execution .consecutive_failed_invocation_attempts = data [
116169 "ConsecutiveFailedInvocationAttempts"
117170 ]
171+ close_status_str = data .get ("CloseStatus" )
172+ execution .close_status = (
173+ CloseStatus (close_status_str ) if close_status_str else None
174+ )
118175
119176 return execution
120177
@@ -187,16 +244,40 @@ def has_pending_operations(self, execution: Execution) -> bool:
187244 return False
188245
189246 def complete_success (self , result : str | None ) -> None :
247+ """Complete execution successfully (DecisionType.COMPLETE_WORKFLOW_EXECUTION)."""
190248 self .result = DurableExecutionInvocationOutput (
191249 status = InvocationStatus .SUCCEEDED , result = result
192250 )
193251 self .is_complete = True
252+ self .close_status = CloseStatus .COMPLETED
253+ self ._end_execution (OperationStatus .SUCCEEDED )
194254
195255 def complete_fail (self , error : ErrorObject ) -> None :
256+ """Complete execution with failure (DecisionType.FAIL_WORKFLOW_EXECUTION)."""
257+ self .result = DurableExecutionInvocationOutput (
258+ status = InvocationStatus .FAILED , error = error
259+ )
260+ self .is_complete = True
261+ self .close_status = CloseStatus .FAILED
262+ self ._end_execution (OperationStatus .FAILED )
263+
264+ def complete_timeout (self , error : ErrorObject ) -> None :
265+ """Complete execution with timeout (SWF workflow timeout)."""
196266 self .result = DurableExecutionInvocationOutput (
197267 status = InvocationStatus .FAILED , error = error
198268 )
199269 self .is_complete = True
270+ self .close_status = CloseStatus .TIMED_OUT
271+ self ._end_execution (OperationStatus .TIMED_OUT )
272+
273+ def complete_stopped (self , error : ErrorObject ) -> None :
274+ """Complete execution as terminated (TerminateWorkflowExecutionV2Request)."""
275+ self .result = DurableExecutionInvocationOutput (
276+ status = InvocationStatus .FAILED , error = error
277+ )
278+ self .is_complete = True
279+ self .close_status = CloseStatus .TERMINATED
280+ self ._end_execution (OperationStatus .STOPPED )
200281
201282 def find_operation (self , operation_id : str ) -> tuple [int , Operation ]:
202283 """Find operation by ID, return index and operation."""
@@ -327,3 +408,14 @@ def complete_callback_failure(
327408 callback_details = updated_callback_details ,
328409 )
329410 return self .operations [index ]
411+
412+ def _end_execution (self , status : OperationStatus ) -> None :
413+ """Set the end_timestamp on the main EXECUTION operation when execution completes."""
414+ execution_op : Operation = self .get_operation_execution_started ()
415+ if execution_op .operation_type == OperationType .EXECUTION :
416+ with self ._state_lock :
417+ self .operations [0 ] = replace (
418+ execution_op ,
419+ status = status ,
420+ end_timestamp = datetime .now (UTC ),
421+ )
0 commit comments