@@ -115,18 +115,7 @@ def get_execution_details(self, execution_arn: str) -> GetDurableExecutionRespon
115115
116116 # Extract execution details from the first operation (EXECUTION type)
117117 execution_op = execution .get_operation_execution_started ()
118-
119- # Determine status based on execution state
120- if execution .is_complete :
121- if (
122- execution .result
123- and execution .result .status == InvocationStatus .SUCCEEDED
124- ):
125- status = "SUCCEEDED"
126- else :
127- status = "FAILED"
128- else :
129- status = "RUNNING"
118+ status = execution .current_status ().value
130119
131120 # Extract result and error from execution result
132121 result = None
@@ -162,8 +151,8 @@ def list_executions(
162151 function_version : str | None = None , # noqa: ARG002
163152 execution_name : str | None = None ,
164153 status_filter : str | None = None ,
165- time_after : str | None = None , # noqa: ARG002
166- time_before : str | None = None , # noqa: ARG002
154+ started_after : str | None = None ,
155+ started_before : str | None = None ,
167156 marker : str | None = None ,
168157 max_items : int | None = None ,
169158 reverse_order : bool = False , # noqa: FBT001, FBT002
@@ -175,86 +164,43 @@ def list_executions(
175164 function_version: Filter by function version
176165 execution_name: Filter by execution name
177166 status_filter: Filter by status (RUNNING, SUCCEEDED, FAILED)
178- time_after : Filter executions started after this time
179- time_before : Filter executions started before this time
167+ started_after : Filter executions started after this time
168+ started_before : Filter executions started before this time
180169 marker: Pagination marker
181170 max_items: Maximum items to return (default 50)
182171 reverse_order: Return results in reverse chronological order
183172
184173 Returns:
185174 ListDurableExecutionsResponse: List of executions with pagination
186175 """
187- # Get all executions from store
188- all_executions = self ._store .list_all ()
189-
190- # Apply filters
191- filtered_executions = []
192- for execution in all_executions :
193- # Filter by function name
194- if function_name and execution .start_input .function_name != function_name :
195- continue
196-
197- # Filter by execution name
198- if (
199- execution_name
200- and execution .start_input .execution_name != execution_name
201- ):
202- continue
203-
204- # Determine execution status
205- execution_status = "RUNNING"
206- if execution .is_complete :
207- if (
208- execution .result
209- and execution .result .status == InvocationStatus .SUCCEEDED
210- ):
211- execution_status = "SUCCEEDED"
212- else :
213- execution_status = "FAILED"
214-
215- # Filter by status
216- if status_filter and execution_status != status_filter :
217- continue
218-
219- # Convert to ExecutionSummary
220- execution_op = execution .get_operation_execution_started ()
221- execution_summary = ExecutionSummary (
222- durable_execution_arn = execution .durable_execution_arn ,
223- durable_execution_name = execution .start_input .execution_name ,
224- function_arn = f"arn:aws:lambda:us-east-1:123456789012:function:{ execution .start_input .function_name } " ,
225- status = execution_status ,
226- start_timestamp = execution_op .start_timestamp
227- if execution_op .start_timestamp
228- else datetime .now (UTC ),
229- end_timestamp = execution_op .end_timestamp
230- if execution_op .end_timestamp
231- else None ,
232- )
233- filtered_executions .append (execution_summary )
234-
235- # Sort by start date
236- filtered_executions .sort (key = lambda e : e .start_timestamp , reverse = reverse_order )
237-
238- # Apply pagination
239- if max_items is None :
240- max_items = 50
241-
242- start_index = 0
176+ # Convert marker to offset
177+ offset : int = 0
243178 if marker :
244179 try :
245- start_index = int (marker )
180+ offset = int (marker )
246181 except ValueError :
247- start_index = 0
182+ offset = 0
248183
249- end_index = start_index + max_items
250- paginated_executions = filtered_executions [start_index :end_index ]
184+ # Query store directly with parameters
185+ executions , next_marker = self ._store .query (
186+ function_name = function_name ,
187+ execution_name = execution_name ,
188+ status_filter = status_filter ,
189+ started_after = started_after ,
190+ started_before = started_before ,
191+ limit = max_items or 50 ,
192+ offset = offset ,
193+ reverse_order = reverse_order ,
194+ )
251195
252- next_marker = None
253- if end_index < len (filtered_executions ):
254- next_marker = str (end_index )
196+ # Convert to ExecutionSummary objects
197+ execution_summaries : list [ExecutionSummary ] = [
198+ ExecutionSummary .from_execution (execution , execution .current_status ().value )
199+ for execution in executions
200+ ]
255201
256202 return ListDurableExecutionsResponse (
257- durable_executions = paginated_executions , next_marker = next_marker
203+ durable_executions = execution_summaries , next_marker = next_marker
258204 )
259205
260206 def list_executions_by_function (
@@ -263,8 +209,8 @@ def list_executions_by_function(
263209 qualifier : str | None = None , # noqa: ARG002
264210 execution_name : str | None = None ,
265211 status_filter : str | None = None ,
266- time_after : str | None = None ,
267- time_before : str | None = None ,
212+ started_after : str | None = None ,
213+ started_before : str | None = None ,
268214 marker : str | None = None ,
269215 max_items : int | None = None ,
270216 reverse_order : bool = False , # noqa: FBT001, FBT002
@@ -276,8 +222,8 @@ def list_executions_by_function(
276222 qualifier: Function qualifier/version
277223 execution_name: Filter by execution name
278224 status_filter: Filter by status (RUNNING, SUCCEEDED, FAILED)
279- time_after : Filter executions started after this time
280- time_before : Filter executions started before this time
225+ started_after : Filter executions started after this time
226+ started_before : Filter executions started before this time
281227 marker: Pagination marker
282228 max_items: Maximum items to return (default 50)
283229 reverse_order: Return results in reverse chronological order
@@ -290,8 +236,8 @@ def list_executions_by_function(
290236 function_name = function_name ,
291237 execution_name = execution_name ,
292238 status_filter = status_filter ,
293- time_after = time_after ,
294- time_before = time_before ,
239+ started_after = started_after ,
240+ started_before = started_before ,
295241 marker = marker ,
296242 max_items = max_items ,
297243 reverse_order = reverse_order ,
@@ -330,8 +276,11 @@ def stop_execution(
330276 "Execution stopped by user request"
331277 )
332278
333- # Stop the execution
334- self .fail_execution (execution_arn , stop_error )
279+ # Stop sets TERMINATED close status (different from fail)
280+ logger .exception ("[%s] Stopping execution." , execution_arn )
281+ execution .complete_stopped (error = stop_error ) # Sets CloseStatus.TERMINATED
282+ self ._store .update (execution )
283+ self ._complete_events (execution_arn = execution_arn )
335284
336285 return StopDurableExecutionResponse (stop_timestamp = datetime .now (UTC ))
337286
@@ -772,27 +721,24 @@ def wait_until_complete(
772721 raise ResourceNotFoundException (msg )
773722
774723 def complete_execution (self , execution_arn : str , result : str | None = None ) -> None :
775- """Complete execution successfully."""
724+ """Complete execution successfully (COMPLETE_WORKFLOW_EXECUTION decision) ."""
776725 logger .debug ("[%s] Completing execution with result: %s" , execution_arn , result )
777726 execution : Execution = self ._store .load (execution_arn = execution_arn )
778- execution .complete_success (result = result )
727+ execution .complete_success (result = result ) # Sets CloseStatus.COMPLETED
779728 self ._store .update (execution )
780729 if execution .result is None :
781730 msg : str = "Execution result is required"
782-
783731 raise IllegalStateException (msg )
784732 self ._complete_events (execution_arn = execution_arn )
785733
786734 def fail_execution (self , execution_arn : str , error : ErrorObject ) -> None :
787- """Fail execution with error."""
735+ """Fail execution with error (FAIL_WORKFLOW_EXECUTION decision) ."""
788736 logger .exception ("[%s] Completing execution with error." , execution_arn )
789737 execution : Execution = self ._store .load (execution_arn = execution_arn )
790- execution .complete_fail (error = error )
738+ execution .complete_fail (error = error ) # Sets CloseStatus.FAILED
791739 self ._store .update (execution )
792- # set by complete_fail
793740 if execution .result is None :
794741 msg : str = "Execution result is required"
795-
796742 raise IllegalStateException (msg )
797743 self ._complete_events (execution_arn = execution_arn )
798744
@@ -844,6 +790,19 @@ def on_failed(self, execution_arn: str, error: ErrorObject) -> None:
844790 """Fail execution. Observer method triggered by notifier."""
845791 self .fail_execution (execution_arn , error )
846792
793+ def on_timed_out (self , execution_arn : str , error : ErrorObject ) -> None :
794+ """Handle execution timeout (workflow timeout). Observer method triggered by notifier."""
795+ logger .exception ("[%s] Execution timed out." , execution_arn )
796+ execution : Execution = self ._store .load (execution_arn = execution_arn )
797+ execution .complete_timeout (error = error ) # Sets CloseStatus.TIMED_OUT
798+ self ._store .update (execution )
799+ self ._complete_events (execution_arn = execution_arn )
800+
801+ def on_stopped (self , execution_arn : str , error : ErrorObject ) -> None :
802+ """Handle execution stop. Observer method triggered by notifier."""
803+ # This should not be called directly - stop_execution handles termination
804+ self .fail_execution (execution_arn , error )
805+
847806 def on_wait_timer_scheduled (
848807 self , execution_arn : str , operation_id : str , delay : float
849808 ) -> None :
0 commit comments