@@ -128,18 +128,7 @@ def get_execution_details(self, execution_arn: str) -> GetDurableExecutionRespon
128128
129129 # Extract execution details from the first operation (EXECUTION type)
130130 execution_op = execution .get_operation_execution_started ()
131-
132- # Determine status based on execution state
133- if execution .is_complete :
134- if (
135- execution .result
136- and execution .result .status == InvocationStatus .SUCCEEDED
137- ):
138- status = "SUCCEEDED"
139- else :
140- status = "FAILED"
141- else :
142- status = "RUNNING"
131+ status = execution .status
143132
144133 # Extract result and error from execution result
145134 result = None
@@ -175,8 +164,8 @@ def list_executions(
175164 function_version : str | None = None , # noqa: ARG002
176165 execution_name : str | None = None ,
177166 status_filter : str | None = None ,
178- time_after : str | None = None , # noqa: ARG002
179- time_before : str | None = None , # noqa: ARG002
167+ started_after : str | None = None ,
168+ started_before : str | None = None ,
180169 marker : str | None = None ,
181170 max_items : int | None = None ,
182171 reverse_order : bool = False , # noqa: FBT001, FBT002
@@ -188,86 +177,43 @@ def list_executions(
188177 function_version: Filter by function version
189178 execution_name: Filter by execution name
190179 status_filter: Filter by status (RUNNING, SUCCEEDED, FAILED)
191- time_after : Filter executions started after this time
192- time_before : Filter executions started before this time
180+ started_after : Filter executions started after this time
181+ started_before : Filter executions started before this time
193182 marker: Pagination marker
194183 max_items: Maximum items to return (default 50)
195184 reverse_order: Return results in reverse chronological order
196185
197186 Returns:
198187 ListDurableExecutionsResponse: List of executions with pagination
199188 """
200- # Get all executions from store
201- all_executions = self ._store .list_all ()
202-
203- # Apply filters
204- filtered_executions = []
205- for execution in all_executions :
206- # Filter by function name
207- if function_name and execution .start_input .function_name != function_name :
208- continue
209-
210- # Filter by execution name
211- if (
212- execution_name
213- and execution .start_input .execution_name != execution_name
214- ):
215- continue
216-
217- # Determine execution status
218- execution_status = "RUNNING"
219- if execution .is_complete :
220- if (
221- execution .result
222- and execution .result .status == InvocationStatus .SUCCEEDED
223- ):
224- execution_status = "SUCCEEDED"
225- else :
226- execution_status = "FAILED"
227-
228- # Filter by status
229- if status_filter and execution_status != status_filter :
230- continue
231-
232- # Convert to ExecutionSummary
233- execution_op = execution .get_operation_execution_started ()
234- execution_summary = ExecutionSummary (
235- durable_execution_arn = execution .durable_execution_arn ,
236- durable_execution_name = execution .start_input .execution_name ,
237- function_arn = f"arn:aws:lambda:us-east-1:123456789012:function:{ execution .start_input .function_name } " ,
238- status = execution_status ,
239- start_timestamp = execution_op .start_timestamp
240- if execution_op .start_timestamp
241- else datetime .now (UTC ),
242- end_timestamp = execution_op .end_timestamp
243- if execution_op .end_timestamp
244- else None ,
245- )
246- filtered_executions .append (execution_summary )
247-
248- # Sort by start date
249- filtered_executions .sort (key = lambda e : e .start_timestamp , reverse = reverse_order )
250-
251- # Apply pagination
252- if max_items is None :
253- max_items = 50
254-
255- start_index = 0
189+ # Convert marker to offset
190+ offset : int = 0
256191 if marker :
257192 try :
258- start_index = int (marker )
193+ offset = int (marker )
259194 except ValueError :
260- start_index = 0
195+ offset = 0
261196
262- end_index = start_index + max_items
263- paginated_executions = filtered_executions [start_index :end_index ]
197+ # Query store directly with parameters
198+ executions , next_marker = self ._store .query (
199+ function_name = function_name ,
200+ execution_name = execution_name ,
201+ status_filter = status_filter ,
202+ started_after = started_after ,
203+ started_before = started_before ,
204+ limit = max_items or 50 ,
205+ offset = offset ,
206+ reverse_order = reverse_order ,
207+ )
264208
265- next_marker = None
266- if end_index < len (filtered_executions ):
267- next_marker = str (end_index )
209+ # Convert to ExecutionSummary objects
210+ execution_summaries : list [ExecutionSummary ] = [
211+ ExecutionSummary .from_execution (execution , execution .status )
212+ for execution in executions
213+ ]
268214
269215 return ListDurableExecutionsResponse (
270- durable_executions = paginated_executions , next_marker = next_marker
216+ durable_executions = execution_summaries , next_marker = next_marker
271217 )
272218
273219 def list_executions_by_function (
@@ -276,8 +222,8 @@ def list_executions_by_function(
276222 qualifier : str | None = None , # noqa: ARG002
277223 execution_name : str | None = None ,
278224 status_filter : str | None = None ,
279- time_after : str | None = None ,
280- time_before : str | None = None ,
225+ started_after : str | None = None ,
226+ started_before : str | None = None ,
281227 marker : str | None = None ,
282228 max_items : int | None = None ,
283229 reverse_order : bool = False , # noqa: FBT001, FBT002
@@ -289,8 +235,8 @@ def list_executions_by_function(
289235 qualifier: Function qualifier/version
290236 execution_name: Filter by execution name
291237 status_filter: Filter by status (RUNNING, SUCCEEDED, FAILED)
292- time_after : Filter executions started after this time
293- time_before : Filter executions started before this time
238+ started_after : Filter executions started after this time
239+ started_before : Filter executions started before this time
294240 marker: Pagination marker
295241 max_items: Maximum items to return (default 50)
296242 reverse_order: Return results in reverse chronological order
@@ -303,8 +249,8 @@ def list_executions_by_function(
303249 function_name = function_name ,
304250 execution_name = execution_name ,
305251 status_filter = status_filter ,
306- time_after = time_after ,
307- time_before = time_before ,
252+ started_after = started_after ,
253+ started_before = started_before ,
308254 marker = marker ,
309255 max_items = max_items ,
310256 reverse_order = reverse_order ,
@@ -343,8 +289,11 @@ def stop_execution(
343289 "Execution stopped by user request"
344290 )
345291
346- # Stop the execution
347- self .fail_execution (execution_arn , stop_error )
292+ # Stop sets TERMINATED close status (different from fail)
293+ logger .exception ("[%s] Stopping execution." , execution_arn )
294+ execution .complete_stopped (error = stop_error ) # Sets CloseStatus.TERMINATED
295+ self ._store .update (execution )
296+ self ._complete_events (execution_arn = execution_arn )
348297
349298 return StopDurableExecutionResponse (stop_timestamp = datetime .now (UTC ))
350299
@@ -836,27 +785,24 @@ def wait_until_complete(
836785 raise ResourceNotFoundException (msg )
837786
838787 def complete_execution (self , execution_arn : str , result : str | None = None ) -> None :
839- """Complete execution successfully."""
788+ """Complete execution successfully (COMPLETE_WORKFLOW_EXECUTION decision) ."""
840789 logger .debug ("[%s] Completing execution with result: %s" , execution_arn , result )
841790 execution : Execution = self ._store .load (execution_arn = execution_arn )
842- execution .complete_success (result = result )
791+ execution .complete_success (result = result ) # Sets CloseStatus.COMPLETED
843792 self ._store .update (execution )
844793 if execution .result is None :
845794 msg : str = "Execution result is required"
846-
847795 raise IllegalStateException (msg )
848796 self ._complete_events (execution_arn = execution_arn )
849797
850798 def fail_execution (self , execution_arn : str , error : ErrorObject ) -> None :
851- """Fail execution with error."""
799+ """Fail execution with error (FAIL_WORKFLOW_EXECUTION decision) ."""
852800 logger .exception ("[%s] Completing execution with error." , execution_arn )
853801 execution : Execution = self ._store .load (execution_arn = execution_arn )
854- execution .complete_fail (error = error )
802+ execution .complete_fail (error = error ) # Sets CloseStatus.FAILED
855803 self ._store .update (execution )
856- # set by complete_fail
857804 if execution .result is None :
858805 msg : str = "Execution result is required"
859-
860806 raise IllegalStateException (msg )
861807 self ._complete_events (execution_arn = execution_arn )
862808
@@ -908,6 +854,19 @@ def on_failed(self, execution_arn: str, error: ErrorObject) -> None:
908854 """Fail execution. Observer method triggered by notifier."""
909855 self .fail_execution (execution_arn , error )
910856
857+ def on_timed_out (self , execution_arn : str , error : ErrorObject ) -> None :
858+ """Handle execution timeout (workflow timeout). Observer method triggered by notifier."""
859+ logger .exception ("[%s] Execution timed out." , execution_arn )
860+ execution : Execution = self ._store .load (execution_arn = execution_arn )
861+ execution .complete_timeout (error = error ) # Sets CloseStatus.TIMED_OUT
862+ self ._store .update (execution )
863+ self ._complete_events (execution_arn = execution_arn )
864+
865+ def on_stopped (self , execution_arn : str , error : ErrorObject ) -> None :
866+ """Handle execution stop. Observer method triggered by notifier."""
867+ # This should not be called directly - stop_execution handles termination
868+ self .fail_execution (execution_arn , error )
869+
911870 def on_wait_timer_scheduled (
912871 self , execution_arn : str , operation_id : str , delay : float
913872 ) -> None :
0 commit comments