Skip to content

Commit 30d72cb

Browse files
author
Rares Polenciuc
committed
feat: complete sqlite store and function handler implementation
- Add SQLiteExecutionStore with database persistence and indexing - Implement query system with pagination support - Add BaseExecutionStore with shared query processing logic - Update Executor to use new query system for efficient operations - Complete ListDurableExecutionsByFunctionHandler with proper filtering - Add function name validation and error handling - Add comprehensive test coverage for all implementations - Support concurrent access patterns with proper database handling
1 parent 4e8ecb8 commit 30d72cb

File tree

19 files changed

+2579
-303
lines changed

19 files changed

+2579
-303
lines changed

src/aws_durable_execution_sdk_python_testing/checkpoint/processors/execution.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ def process(
4545
"There is no error details but EXECUTION checkpoint action is not SUCCEED."
4646
)
4747
)
48+
# All EXECUTION failures go through normal fail path
49+
# Timeout/Stop status is set by executor based on the operation that caused it
4850
notifier.notify_failed(execution_arn=execution_arn, error=error)
4951
# TODO: Svc doesn't actually create checkpoint for EXECUTION. might have to for localrunner though.
5052
return None

src/aws_durable_execution_sdk_python_testing/execution.py

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import json
44
from dataclasses import replace
55
from datetime import UTC, datetime
6+
from enum import Enum
67
from threading import Lock
78
from typing import Any
89
from uuid import uuid4
@@ -20,11 +21,12 @@
2021
OperationUpdate,
2122
)
2223

23-
# Import AWS exceptions
2424
from aws_durable_execution_sdk_python_testing.exceptions import (
2525
IllegalStateException,
2626
InvalidParameterValueException,
2727
)
28+
29+
# Import AWS exceptions
2830
from aws_durable_execution_sdk_python_testing.model import (
2931
StartDurableExecutionInput,
3032
)
@@ -34,6 +36,25 @@
3436
)
3537

3638

39+
class CloseStatus(Enum):
40+
"""Close status for completed executions (mimics backend SWF CloseStatus)."""
41+
42+
COMPLETED = "COMPLETED"
43+
FAILED = "FAILED"
44+
TERMINATED = "TERMINATED"
45+
TIMED_OUT = "TIMED_OUT"
46+
47+
48+
class ExecutionStatus(Enum):
49+
"""Execution status for API responses (mimics backend ExecutionStatus)."""
50+
51+
RUNNING = "RUNNING"
52+
SUCCEEDED = "SUCCEEDED"
53+
FAILED = "FAILED"
54+
STOPPED = "STOPPED"
55+
TIMED_OUT = "TIMED_OUT"
56+
57+
3758
class Execution:
3859
"""Execution state."""
3960

@@ -55,12 +76,38 @@ def __init__(
5576
self.is_complete: bool = False
5677
self.result: DurableExecutionInvocationOutput | None = None
5778
self.consecutive_failed_invocation_attempts: int = 0
79+
self.close_status: CloseStatus | None = (
80+
None # Track close status like backend SWF
81+
)
5882

5983
@property
6084
def token_sequence(self) -> int:
6185
"""Get current token sequence value."""
6286
return self._token_sequence
6387

88+
def current_status(self) -> ExecutionStatus:
89+
"""Get execution status."""
90+
if not self.is_complete:
91+
return ExecutionStatus.RUNNING
92+
93+
if not self.close_status:
94+
msg: str = "close_status must be set when execution is complete"
95+
raise IllegalStateException(msg)
96+
97+
# Convert CloseStatus to ExecutionStatus (like backend ExecutionStatusConverter)
98+
match self.close_status:
99+
case CloseStatus.COMPLETED:
100+
return ExecutionStatus.SUCCEEDED
101+
case CloseStatus.FAILED:
102+
return ExecutionStatus.FAILED
103+
case CloseStatus.TERMINATED:
104+
return ExecutionStatus.STOPPED
105+
case CloseStatus.TIMED_OUT:
106+
return ExecutionStatus.TIMED_OUT
107+
case _:
108+
error_msg: str = f"Unexpected close status: {self.close_status}"
109+
raise InvalidParameterValueException(error_msg)
110+
64111
@staticmethod
65112
def new(input: StartDurableExecutionInput) -> Execution: # noqa: A002
66113
# make a nicer arn
@@ -82,6 +129,7 @@ def to_dict(self) -> dict[str, Any]:
82129
"IsComplete": self.is_complete,
83130
"Result": self.result.to_dict() if self.result else None,
84131
"ConsecutiveFailedInvocationAttempts": self.consecutive_failed_invocation_attempts,
132+
"CloseStatus": self.close_status.value if self.close_status else None,
85133
}
86134

87135
@classmethod
@@ -115,6 +163,10 @@ def from_dict(cls, data: dict[str, Any]) -> Execution:
115163
execution.consecutive_failed_invocation_attempts = data[
116164
"ConsecutiveFailedInvocationAttempts"
117165
]
166+
close_status_str = data.get("CloseStatus")
167+
execution.close_status = (
168+
CloseStatus(close_status_str) if close_status_str else None
169+
)
118170

119171
return execution
120172

@@ -187,16 +239,36 @@ def has_pending_operations(self, execution: Execution) -> bool:
187239
return False
188240

189241
def complete_success(self, result: str | None) -> None:
242+
"""Complete execution successfully (DecisionType.COMPLETE_WORKFLOW_EXECUTION)."""
190243
self.result = DurableExecutionInvocationOutput(
191244
status=InvocationStatus.SUCCEEDED, result=result
192245
)
193246
self.is_complete = True
247+
self.close_status = CloseStatus.COMPLETED
194248

195249
def complete_fail(self, error: ErrorObject) -> None:
250+
"""Complete execution with failure (DecisionType.FAIL_WORKFLOW_EXECUTION)."""
251+
self.result = DurableExecutionInvocationOutput(
252+
status=InvocationStatus.FAILED, error=error
253+
)
254+
self.is_complete = True
255+
self.close_status = CloseStatus.FAILED
256+
257+
def complete_timeout(self, error: ErrorObject) -> None:
258+
"""Complete execution with timeout (SWF workflow timeout)."""
259+
self.result = DurableExecutionInvocationOutput(
260+
status=InvocationStatus.FAILED, error=error
261+
)
262+
self.is_complete = True
263+
self.close_status = CloseStatus.TIMED_OUT
264+
265+
def complete_stopped(self, error: ErrorObject) -> None:
266+
"""Complete execution as terminated (TerminateWorkflowExecutionV2Request)."""
196267
self.result = DurableExecutionInvocationOutput(
197268
status=InvocationStatus.FAILED, error=error
198269
)
199270
self.is_complete = True
271+
self.close_status = CloseStatus.TERMINATED
200272

201273
def find_operation(self, operation_id: str) -> tuple[int, Operation]:
202274
"""Find operation by ID, return index and operation."""

src/aws_durable_execution_sdk_python_testing/executor.py

Lines changed: 54 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -156,18 +156,7 @@ def get_execution_details(self, execution_arn: str) -> GetDurableExecutionRespon
156156

157157
# Extract execution details from the first operation (EXECUTION type)
158158
execution_op = execution.get_operation_execution_started()
159-
160-
# Determine status based on execution state
161-
if execution.is_complete:
162-
if (
163-
execution.result
164-
and execution.result.status == InvocationStatus.SUCCEEDED
165-
):
166-
status = "SUCCEEDED"
167-
else:
168-
status = "FAILED"
169-
else:
170-
status = "RUNNING"
159+
status = execution.current_status().value
171160

172161
# Extract result and error from execution result
173162
result = None
@@ -203,8 +192,8 @@ def list_executions(
203192
function_version: str | None = None, # noqa: ARG002
204193
execution_name: str | None = None,
205194
status_filter: str | None = None,
206-
time_after: str | None = None, # noqa: ARG002
207-
time_before: str | None = None, # noqa: ARG002
195+
started_after: str | None = None,
196+
started_before: str | None = None,
208197
marker: str | None = None,
209198
max_items: int | None = None,
210199
reverse_order: bool = False, # noqa: FBT001, FBT002
@@ -216,86 +205,43 @@ def list_executions(
216205
function_version: Filter by function version
217206
execution_name: Filter by execution name
218207
status_filter: Filter by status (RUNNING, SUCCEEDED, FAILED)
219-
time_after: Filter executions started after this time
220-
time_before: Filter executions started before this time
208+
started_after: Filter executions started after this time
209+
started_before: Filter executions started before this time
221210
marker: Pagination marker
222211
max_items: Maximum items to return (default 50)
223212
reverse_order: Return results in reverse chronological order
224213
225214
Returns:
226215
ListDurableExecutionsResponse: List of executions with pagination
227216
"""
228-
# Get all executions from store
229-
all_executions = self._store.list_all()
230-
231-
# Apply filters
232-
filtered_executions = []
233-
for execution in all_executions:
234-
# Filter by function name
235-
if function_name and execution.start_input.function_name != function_name:
236-
continue
237-
238-
# Filter by execution name
239-
if (
240-
execution_name
241-
and execution.start_input.execution_name != execution_name
242-
):
243-
continue
244-
245-
# Determine execution status
246-
execution_status = "RUNNING"
247-
if execution.is_complete:
248-
if (
249-
execution.result
250-
and execution.result.status == InvocationStatus.SUCCEEDED
251-
):
252-
execution_status = "SUCCEEDED"
253-
else:
254-
execution_status = "FAILED"
255-
256-
# Filter by status
257-
if status_filter and execution_status != status_filter:
258-
continue
259-
260-
# Convert to ExecutionSummary
261-
execution_op = execution.get_operation_execution_started()
262-
execution_summary = ExecutionSummary(
263-
durable_execution_arn=execution.durable_execution_arn,
264-
durable_execution_name=execution.start_input.execution_name,
265-
function_arn=f"arn:aws:lambda:us-east-1:123456789012:function:{execution.start_input.function_name}",
266-
status=execution_status,
267-
start_timestamp=execution_op.start_timestamp
268-
if execution_op.start_timestamp
269-
else datetime.now(UTC),
270-
end_timestamp=execution_op.end_timestamp
271-
if execution_op.end_timestamp
272-
else None,
273-
)
274-
filtered_executions.append(execution_summary)
275-
276-
# Sort by start date
277-
filtered_executions.sort(key=lambda e: e.start_timestamp, reverse=reverse_order)
278-
279-
# Apply pagination
280-
if max_items is None:
281-
max_items = 50
282-
283-
start_index = 0
217+
# Convert marker to offset
218+
offset: int = 0
284219
if marker:
285220
try:
286-
start_index = int(marker)
221+
offset = int(marker)
287222
except ValueError:
288-
start_index = 0
223+
offset = 0
289224

290-
end_index = start_index + max_items
291-
paginated_executions = filtered_executions[start_index:end_index]
225+
# Query store directly with parameters
226+
executions, next_marker = self._store.query(
227+
function_name=function_name,
228+
execution_name=execution_name,
229+
status_filter=status_filter,
230+
started_after=started_after,
231+
started_before=started_before,
232+
limit=max_items or 50,
233+
offset=offset,
234+
reverse_order=reverse_order,
235+
)
292236

293-
next_marker = None
294-
if end_index < len(filtered_executions):
295-
next_marker = str(end_index)
237+
# Convert to ExecutionSummary objects
238+
execution_summaries: list[ExecutionSummary] = [
239+
ExecutionSummary.from_execution(execution, execution.current_status().value)
240+
for execution in executions
241+
]
296242

297243
return ListDurableExecutionsResponse(
298-
durable_executions=paginated_executions, next_marker=next_marker
244+
durable_executions=execution_summaries, next_marker=next_marker
299245
)
300246

301247
def list_executions_by_function(
@@ -304,8 +250,8 @@ def list_executions_by_function(
304250
qualifier: str | None = None, # noqa: ARG002
305251
execution_name: str | None = None,
306252
status_filter: str | None = None,
307-
time_after: str | None = None,
308-
time_before: str | None = None,
253+
started_after: str | None = None,
254+
started_before: str | None = None,
309255
marker: str | None = None,
310256
max_items: int | None = None,
311257
reverse_order: bool = False, # noqa: FBT001, FBT002
@@ -317,8 +263,8 @@ def list_executions_by_function(
317263
qualifier: Function qualifier/version
318264
execution_name: Filter by execution name
319265
status_filter: Filter by status (RUNNING, SUCCEEDED, FAILED)
320-
time_after: Filter executions started after this time
321-
time_before: Filter executions started before this time
266+
started_after: Filter executions started after this time
267+
started_before: Filter executions started before this time
322268
marker: Pagination marker
323269
max_items: Maximum items to return (default 50)
324270
reverse_order: Return results in reverse chronological order
@@ -331,8 +277,8 @@ def list_executions_by_function(
331277
function_name=function_name,
332278
execution_name=execution_name,
333279
status_filter=status_filter,
334-
time_after=time_after,
335-
time_before=time_before,
280+
started_after=started_after,
281+
started_before=started_before,
336282
marker=marker,
337283
max_items=max_items,
338284
reverse_order=reverse_order,
@@ -371,8 +317,11 @@ def stop_execution(
371317
"Execution stopped by user request"
372318
)
373319

374-
# Stop the execution
375-
self.fail_execution(execution_arn, stop_error)
320+
# Stop sets TERMINATED close status (different from fail)
321+
logger.exception("[%s] Stopping execution.", execution_arn)
322+
execution.complete_stopped(error=stop_error) # Sets CloseStatus.TERMINATED
323+
self._store.update(execution)
324+
self._complete_events(execution_arn=execution_arn)
376325

377326
return StopDurableExecutionResponse(stop_timestamp=datetime.now(UTC))
378327

@@ -931,27 +880,24 @@ def wait_until_complete(
931880
raise ResourceNotFoundException(msg)
932881

933882
def complete_execution(self, execution_arn: str, result: str | None = None) -> None:
934-
"""Complete execution successfully."""
883+
"""Complete execution successfully (COMPLETE_WORKFLOW_EXECUTION decision)."""
935884
logger.debug("[%s] Completing execution with result: %s", execution_arn, result)
936885
execution: Execution = self._store.load(execution_arn=execution_arn)
937-
execution.complete_success(result=result)
886+
execution.complete_success(result=result) # Sets CloseStatus.COMPLETED
938887
self._store.update(execution)
939888
if execution.result is None:
940889
msg: str = "Execution result is required"
941-
942890
raise IllegalStateException(msg)
943891
self._complete_events(execution_arn=execution_arn)
944892

945893
def fail_execution(self, execution_arn: str, error: ErrorObject) -> None:
946-
"""Fail execution with error."""
894+
"""Fail execution with error (FAIL_WORKFLOW_EXECUTION decision)."""
947895
logger.exception("[%s] Completing execution with error.", execution_arn)
948896
execution: Execution = self._store.load(execution_arn=execution_arn)
949-
execution.complete_fail(error=error)
897+
execution.complete_fail(error=error) # Sets CloseStatus.FAILED
950898
self._store.update(execution)
951-
# set by complete_fail
952899
if execution.result is None:
953900
msg: str = "Execution result is required"
954-
955901
raise IllegalStateException(msg)
956902
self._complete_events(execution_arn=execution_arn)
957903

@@ -1003,6 +949,19 @@ def on_failed(self, execution_arn: str, error: ErrorObject) -> None:
1003949
"""Fail execution. Observer method triggered by notifier."""
1004950
self.fail_execution(execution_arn, error)
1005951

952+
def on_timed_out(self, execution_arn: str, error: ErrorObject) -> None:
953+
"""Handle execution timeout (workflow timeout). Observer method triggered by notifier."""
954+
logger.exception("[%s] Execution timed out.", execution_arn)
955+
execution: Execution = self._store.load(execution_arn=execution_arn)
956+
execution.complete_timeout(error=error) # Sets CloseStatus.TIMED_OUT
957+
self._store.update(execution)
958+
self._complete_events(execution_arn=execution_arn)
959+
960+
def on_stopped(self, execution_arn: str, error: ErrorObject) -> None:
961+
"""Handle execution stop. Observer method triggered by notifier."""
962+
# This should not be called directly - stop_execution handles termination
963+
self.fail_execution(execution_arn, error)
964+
1006965
def on_wait_timer_scheduled(
1007966
self, execution_arn: str, operation_id: str, delay: float
1008967
) -> None:

0 commit comments

Comments
 (0)