Skip to content

Commit 9a697be

Browse files
author
Rares Polenciuc
committed
feat: complete sqlite store and function handler implementation
- Add SQLiteExecutionStore with database persistence and indexing - Implement query system with pagination support - Add BaseExecutionStore with shared query processing logic - Update Executor to use new query system for efficient operations - Complete ListDurableExecutionsByFunctionHandler with proper filtering - Add function name validation and error handling - Add comprehensive test coverage for all implementations - Support concurrent access patterns with proper database handling
1 parent a6bdb18 commit 9a697be

File tree

19 files changed

+2579
-303
lines changed

19 files changed

+2579
-303
lines changed

src/aws_durable_execution_sdk_python_testing/checkpoint/processors/execution.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ def process(
4545
"There is no error details but EXECUTION checkpoint action is not SUCCEED."
4646
)
4747
)
48+
# All EXECUTION failures go through normal fail path
49+
# Timeout/Stop status is set by executor based on the operation that caused it
4850
notifier.notify_failed(execution_arn=execution_arn, error=error)
4951
# TODO: Svc doesn't actually create checkpoint for EXECUTION. might have to for localrunner though.
5052
return None

src/aws_durable_execution_sdk_python_testing/execution.py

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import json
44
from dataclasses import replace
55
from datetime import UTC, datetime
6+
from enum import Enum
67
from threading import Lock
78
from typing import Any
89
from uuid import uuid4
@@ -20,17 +21,37 @@
2021
OperationUpdate,
2122
)
2223

23-
# Import AWS exceptions
2424
from aws_durable_execution_sdk_python_testing.exceptions import (
2525
IllegalStateException,
2626
InvalidParameterValueException,
2727
)
28+
29+
# Import AWS exceptions
2830
from aws_durable_execution_sdk_python_testing.model import (
2931
StartDurableExecutionInput,
3032
)
3133
from aws_durable_execution_sdk_python_testing.token import CheckpointToken
3234

3335

36+
class CloseStatus(Enum):
37+
"""Close status for completed executions (mimics backend SWF CloseStatus)."""
38+
39+
COMPLETED = "COMPLETED"
40+
FAILED = "FAILED"
41+
TERMINATED = "TERMINATED"
42+
TIMED_OUT = "TIMED_OUT"
43+
44+
45+
class ExecutionStatus(Enum):
46+
"""Execution status for API responses (mimics backend ExecutionStatus)."""
47+
48+
RUNNING = "RUNNING"
49+
SUCCEEDED = "SUCCEEDED"
50+
FAILED = "FAILED"
51+
STOPPED = "STOPPED"
52+
TIMED_OUT = "TIMED_OUT"
53+
54+
3455
class Execution:
3556
"""Execution state."""
3657

@@ -52,12 +73,38 @@ def __init__(
5273
self.is_complete: bool = False
5374
self.result: DurableExecutionInvocationOutput | None = None
5475
self.consecutive_failed_invocation_attempts: int = 0
76+
self.close_status: CloseStatus | None = (
77+
None # Track close status like backend SWF
78+
)
5579

5680
@property
5781
def token_sequence(self) -> int:
5882
"""Get current token sequence value."""
5983
return self._token_sequence
6084

85+
def current_status(self) -> ExecutionStatus:
86+
"""Get execution status."""
87+
if not self.is_complete:
88+
return ExecutionStatus.RUNNING
89+
90+
if not self.close_status:
91+
msg: str = "close_status must be set when execution is complete"
92+
raise IllegalStateException(msg)
93+
94+
# Convert CloseStatus to ExecutionStatus (like backend ExecutionStatusConverter)
95+
match self.close_status:
96+
case CloseStatus.COMPLETED:
97+
return ExecutionStatus.SUCCEEDED
98+
case CloseStatus.FAILED:
99+
return ExecutionStatus.FAILED
100+
case CloseStatus.TERMINATED:
101+
return ExecutionStatus.STOPPED
102+
case CloseStatus.TIMED_OUT:
103+
return ExecutionStatus.TIMED_OUT
104+
case _:
105+
error_msg: str = f"Unexpected close status: {self.close_status}"
106+
raise InvalidParameterValueException(error_msg)
107+
61108
@staticmethod
62109
def new(input: StartDurableExecutionInput) -> Execution: # noqa: A002
63110
# make a nicer arn
@@ -79,6 +126,7 @@ def to_dict(self) -> dict[str, Any]:
79126
"IsComplete": self.is_complete,
80127
"Result": self.result.to_dict() if self.result else None,
81128
"ConsecutiveFailedInvocationAttempts": self.consecutive_failed_invocation_attempts,
129+
"CloseStatus": self.close_status.value if self.close_status else None,
82130
}
83131

84132
@classmethod
@@ -112,6 +160,10 @@ def from_dict(cls, data: dict[str, Any]) -> Execution:
112160
execution.consecutive_failed_invocation_attempts = data[
113161
"ConsecutiveFailedInvocationAttempts"
114162
]
163+
close_status_str = data.get("CloseStatus")
164+
execution.close_status = (
165+
CloseStatus(close_status_str) if close_status_str else None
166+
)
115167

116168
return execution
117169

@@ -184,16 +236,36 @@ def has_pending_operations(self, execution: Execution) -> bool:
184236
return False
185237

186238
def complete_success(self, result: str | None) -> None:
239+
"""Complete execution successfully (DecisionType.COMPLETE_WORKFLOW_EXECUTION)."""
187240
self.result = DurableExecutionInvocationOutput(
188241
status=InvocationStatus.SUCCEEDED, result=result
189242
)
190243
self.is_complete = True
244+
self.close_status = CloseStatus.COMPLETED
191245

192246
def complete_fail(self, error: ErrorObject) -> None:
247+
"""Complete execution with failure (DecisionType.FAIL_WORKFLOW_EXECUTION)."""
248+
self.result = DurableExecutionInvocationOutput(
249+
status=InvocationStatus.FAILED, error=error
250+
)
251+
self.is_complete = True
252+
self.close_status = CloseStatus.FAILED
253+
254+
def complete_timeout(self, error: ErrorObject) -> None:
255+
"""Complete execution with timeout (SWF workflow timeout)."""
256+
self.result = DurableExecutionInvocationOutput(
257+
status=InvocationStatus.FAILED, error=error
258+
)
259+
self.is_complete = True
260+
self.close_status = CloseStatus.TIMED_OUT
261+
262+
def complete_stopped(self, error: ErrorObject) -> None:
263+
"""Complete execution as terminated (TerminateWorkflowExecutionV2Request)."""
193264
self.result = DurableExecutionInvocationOutput(
194265
status=InvocationStatus.FAILED, error=error
195266
)
196267
self.is_complete = True
268+
self.close_status = CloseStatus.TERMINATED
197269

198270
def find_operation(self, operation_id: str) -> tuple[int, Operation]:
199271
"""Find operation by ID, return index and operation."""

src/aws_durable_execution_sdk_python_testing/executor.py

Lines changed: 54 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -142,18 +142,7 @@ def get_execution_details(self, execution_arn: str) -> GetDurableExecutionRespon
142142

143143
# Extract execution details from the first operation (EXECUTION type)
144144
execution_op = execution.get_operation_execution_started()
145-
146-
# Determine status based on execution state
147-
if execution.is_complete:
148-
if (
149-
execution.result
150-
and execution.result.status == InvocationStatus.SUCCEEDED
151-
):
152-
status = "SUCCEEDED"
153-
else:
154-
status = "FAILED"
155-
else:
156-
status = "RUNNING"
145+
status = execution.current_status().value
157146

158147
# Extract result and error from execution result
159148
result = None
@@ -189,8 +178,8 @@ def list_executions(
189178
function_version: str | None = None, # noqa: ARG002
190179
execution_name: str | None = None,
191180
status_filter: str | None = None,
192-
time_after: str | None = None, # noqa: ARG002
193-
time_before: str | None = None, # noqa: ARG002
181+
started_after: str | None = None,
182+
started_before: str | None = None,
194183
marker: str | None = None,
195184
max_items: int | None = None,
196185
reverse_order: bool = False, # noqa: FBT001, FBT002
@@ -202,86 +191,43 @@ def list_executions(
202191
function_version: Filter by function version
203192
execution_name: Filter by execution name
204193
status_filter: Filter by status (RUNNING, SUCCEEDED, FAILED)
205-
time_after: Filter executions started after this time
206-
time_before: Filter executions started before this time
194+
started_after: Filter executions started after this time
195+
started_before: Filter executions started before this time
207196
marker: Pagination marker
208197
max_items: Maximum items to return (default 50)
209198
reverse_order: Return results in reverse chronological order
210199
211200
Returns:
212201
ListDurableExecutionsResponse: List of executions with pagination
213202
"""
214-
# Get all executions from store
215-
all_executions = self._store.list_all()
216-
217-
# Apply filters
218-
filtered_executions = []
219-
for execution in all_executions:
220-
# Filter by function name
221-
if function_name and execution.start_input.function_name != function_name:
222-
continue
223-
224-
# Filter by execution name
225-
if (
226-
execution_name
227-
and execution.start_input.execution_name != execution_name
228-
):
229-
continue
230-
231-
# Determine execution status
232-
execution_status = "RUNNING"
233-
if execution.is_complete:
234-
if (
235-
execution.result
236-
and execution.result.status == InvocationStatus.SUCCEEDED
237-
):
238-
execution_status = "SUCCEEDED"
239-
else:
240-
execution_status = "FAILED"
241-
242-
# Filter by status
243-
if status_filter and execution_status != status_filter:
244-
continue
245-
246-
# Convert to ExecutionSummary
247-
execution_op = execution.get_operation_execution_started()
248-
execution_summary = ExecutionSummary(
249-
durable_execution_arn=execution.durable_execution_arn,
250-
durable_execution_name=execution.start_input.execution_name,
251-
function_arn=f"arn:aws:lambda:us-east-1:123456789012:function:{execution.start_input.function_name}",
252-
status=execution_status,
253-
start_timestamp=execution_op.start_timestamp
254-
if execution_op.start_timestamp
255-
else datetime.now(UTC),
256-
end_timestamp=execution_op.end_timestamp
257-
if execution_op.end_timestamp
258-
else None,
259-
)
260-
filtered_executions.append(execution_summary)
261-
262-
# Sort by start date
263-
filtered_executions.sort(key=lambda e: e.start_timestamp, reverse=reverse_order)
264-
265-
# Apply pagination
266-
if max_items is None:
267-
max_items = 50
268-
269-
start_index = 0
203+
# Convert marker to offset
204+
offset: int = 0
270205
if marker:
271206
try:
272-
start_index = int(marker)
207+
offset = int(marker)
273208
except ValueError:
274-
start_index = 0
209+
offset = 0
275210

276-
end_index = start_index + max_items
277-
paginated_executions = filtered_executions[start_index:end_index]
211+
# Query store directly with parameters
212+
executions, next_marker = self._store.query(
213+
function_name=function_name,
214+
execution_name=execution_name,
215+
status_filter=status_filter,
216+
started_after=started_after,
217+
started_before=started_before,
218+
limit=max_items or 50,
219+
offset=offset,
220+
reverse_order=reverse_order,
221+
)
278222

279-
next_marker = None
280-
if end_index < len(filtered_executions):
281-
next_marker = str(end_index)
223+
# Convert to ExecutionSummary objects
224+
execution_summaries: list[ExecutionSummary] = [
225+
ExecutionSummary.from_execution(execution, execution.current_status().value)
226+
for execution in executions
227+
]
282228

283229
return ListDurableExecutionsResponse(
284-
durable_executions=paginated_executions, next_marker=next_marker
230+
durable_executions=execution_summaries, next_marker=next_marker
285231
)
286232

287233
def list_executions_by_function(
@@ -290,8 +236,8 @@ def list_executions_by_function(
290236
qualifier: str | None = None, # noqa: ARG002
291237
execution_name: str | None = None,
292238
status_filter: str | None = None,
293-
time_after: str | None = None,
294-
time_before: str | None = None,
239+
started_after: str | None = None,
240+
started_before: str | None = None,
295241
marker: str | None = None,
296242
max_items: int | None = None,
297243
reverse_order: bool = False, # noqa: FBT001, FBT002
@@ -303,8 +249,8 @@ def list_executions_by_function(
303249
qualifier: Function qualifier/version
304250
execution_name: Filter by execution name
305251
status_filter: Filter by status (RUNNING, SUCCEEDED, FAILED)
306-
time_after: Filter executions started after this time
307-
time_before: Filter executions started before this time
252+
started_after: Filter executions started after this time
253+
started_before: Filter executions started before this time
308254
marker: Pagination marker
309255
max_items: Maximum items to return (default 50)
310256
reverse_order: Return results in reverse chronological order
@@ -317,8 +263,8 @@ def list_executions_by_function(
317263
function_name=function_name,
318264
execution_name=execution_name,
319265
status_filter=status_filter,
320-
time_after=time_after,
321-
time_before=time_before,
266+
started_after=started_after,
267+
started_before=started_before,
322268
marker=marker,
323269
max_items=max_items,
324270
reverse_order=reverse_order,
@@ -357,8 +303,11 @@ def stop_execution(
357303
"Execution stopped by user request"
358304
)
359305

360-
# Stop the execution
361-
self.fail_execution(execution_arn, stop_error)
306+
# Stop sets TERMINATED close status (different from fail)
307+
logger.exception("[%s] Stopping execution.", execution_arn)
308+
execution.complete_stopped(error=stop_error) # Sets CloseStatus.TERMINATED
309+
self._store.update(execution)
310+
self._complete_events(execution_arn=execution_arn)
362311

363312
return StopDurableExecutionResponse(stop_timestamp=datetime.now(UTC))
364313

@@ -808,27 +757,24 @@ def wait_until_complete(
808757
raise ResourceNotFoundException(msg)
809758

810759
def complete_execution(self, execution_arn: str, result: str | None = None) -> None:
811-
"""Complete execution successfully."""
760+
"""Complete execution successfully (COMPLETE_WORKFLOW_EXECUTION decision)."""
812761
logger.debug("[%s] Completing execution with result: %s", execution_arn, result)
813762
execution: Execution = self._store.load(execution_arn=execution_arn)
814-
execution.complete_success(result=result)
763+
execution.complete_success(result=result) # Sets CloseStatus.COMPLETED
815764
self._store.update(execution)
816765
if execution.result is None:
817766
msg: str = "Execution result is required"
818-
819767
raise IllegalStateException(msg)
820768
self._complete_events(execution_arn=execution_arn)
821769

822770
def fail_execution(self, execution_arn: str, error: ErrorObject) -> None:
823-
"""Fail execution with error."""
771+
"""Fail execution with error (FAIL_WORKFLOW_EXECUTION decision)."""
824772
logger.exception("[%s] Completing execution with error.", execution_arn)
825773
execution: Execution = self._store.load(execution_arn=execution_arn)
826-
execution.complete_fail(error=error)
774+
execution.complete_fail(error=error) # Sets CloseStatus.FAILED
827775
self._store.update(execution)
828-
# set by complete_fail
829776
if execution.result is None:
830777
msg: str = "Execution result is required"
831-
832778
raise IllegalStateException(msg)
833779
self._complete_events(execution_arn=execution_arn)
834780

@@ -880,6 +826,19 @@ def on_failed(self, execution_arn: str, error: ErrorObject) -> None:
880826
"""Fail execution. Observer method triggered by notifier."""
881827
self.fail_execution(execution_arn, error)
882828

829+
def on_timed_out(self, execution_arn: str, error: ErrorObject) -> None:
830+
"""Handle execution timeout (workflow timeout). Observer method triggered by notifier."""
831+
logger.exception("[%s] Execution timed out.", execution_arn)
832+
execution: Execution = self._store.load(execution_arn=execution_arn)
833+
execution.complete_timeout(error=error) # Sets CloseStatus.TIMED_OUT
834+
self._store.update(execution)
835+
self._complete_events(execution_arn=execution_arn)
836+
837+
def on_stopped(self, execution_arn: str, error: ErrorObject) -> None:
838+
"""Handle execution stop. Observer method triggered by notifier."""
839+
# This should not be called directly - stop_execution handles termination
840+
self.fail_execution(execution_arn, error)
841+
883842
def on_wait_timer_scheduled(
884843
self, execution_arn: str, operation_id: str, delay: float
885844
) -> None:

0 commit comments

Comments
 (0)