Skip to content

Commit 162e4be

Browse files
author
Rares Polenciuc
committed
feat: add SQLite execution store with query capabilities
- Add SQLiteExecutionStore with database persistence and indexing - Implement ExecutionQuery system for filtering by function name, execution name, status - Add QueryResult with pagination support (limit, offset, has_more) - Refactor BaseExecutionStore with shared query processing logic - Update Executor to use new query system for efficient list operations - Add comprehensive test coverage for all store implementations - Support concurrent access patterns with proper database handling
1 parent e006e9c commit 162e4be

File tree

10 files changed

+1567
-145
lines changed

10 files changed

+1567
-145
lines changed

src/aws_durable_execution_sdk_python_testing/executor.py

Lines changed: 70 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@
1111
DurableExecutionInvocationOutput,
1212
InvocationStatus,
1313
)
14-
from aws_durable_execution_sdk_python.lambda_service import ErrorObject, OperationUpdate
14+
from aws_durable_execution_sdk_python.lambda_service import (
15+
ErrorObject,
16+
OperationStatus,
17+
OperationUpdate,
18+
)
1519

1620
from aws_durable_execution_sdk_python_testing.exceptions import (
1721
ExecutionAlreadyStartedException,
@@ -99,6 +103,30 @@ def get_execution(self, execution_arn: str) -> Execution:
99103
msg: str = f"Execution {execution_arn} not found"
100104
raise ResourceNotFoundException(msg) from e
101105

106+
@staticmethod
107+
def _get_execution_status(execution: Execution) -> str:
108+
"""Get execution status string."""
109+
if not execution.is_complete:
110+
return "RUNNING"
111+
112+
if not execution.result:
113+
return OperationStatus.FAILED.value
114+
115+
match execution.result.status:
116+
case InvocationStatus.SUCCEEDED:
117+
return OperationStatus.SUCCEEDED.value
118+
case InvocationStatus.FAILED:
119+
if execution.result.error and execution.result.error.type:
120+
error_type = execution.result.error.type.lower()
121+
match error_type:
122+
case t if "timeout" in t:
123+
return OperationStatus.TIMED_OUT.value
124+
case t if "stop" in t:
125+
return OperationStatus.STOPPED.value
126+
return OperationStatus.FAILED.value
127+
case _:
128+
return OperationStatus.FAILED.value
129+
102130
def get_execution_details(self, execution_arn: str) -> GetDurableExecutionResponse:
103131
"""Get detailed execution information for web API response.
104132
@@ -112,21 +140,8 @@ def get_execution_details(self, execution_arn: str) -> GetDurableExecutionRespon
112140
ResourceNotFoundException: If execution does not exist
113141
"""
114142
execution = self.get_execution(execution_arn)
115-
116-
# Extract execution details from the first operation (EXECUTION type)
117143
execution_op = execution.get_operation_execution_started()
118-
119-
# Determine status based on execution state
120-
if execution.is_complete:
121-
if (
122-
execution.result
123-
and execution.result.status == InvocationStatus.SUCCEEDED
124-
):
125-
status = "SUCCEEDED"
126-
else:
127-
status = "FAILED"
128-
else:
129-
status = "RUNNING"
144+
status = self._get_execution_status(execution)
130145

131146
# Extract result and error from execution result
132147
result = None
@@ -162,99 +177,56 @@ def list_executions(
162177
function_version: str | None = None, # noqa: ARG002
163178
execution_name: str | None = None,
164179
status_filter: str | None = None,
165-
time_after: str | None = None, # noqa: ARG002
166-
time_before: str | None = None, # noqa: ARG002
180+
time_after: str | None = None,
181+
time_before: str | None = None,
167182
marker: str | None = None,
168183
max_items: int | None = None,
169184
reverse_order: bool = False, # noqa: FBT001, FBT002
170185
) -> ListDurableExecutionsResponse:
171-
"""List executions with filtering and pagination.
172-
173-
Args:
174-
function_name: Filter by function name
175-
function_version: Filter by function version
176-
execution_name: Filter by execution name
177-
status_filter: Filter by status (RUNNING, SUCCEEDED, FAILED)
178-
time_after: Filter executions started after this time
179-
time_before: Filter executions started before this time
180-
marker: Pagination marker
181-
max_items: Maximum items to return (default 50)
182-
reverse_order: Return results in reverse chronological order
183-
184-
Returns:
185-
ListDurableExecutionsResponse: List of executions with pagination
186-
"""
187-
# Get all executions from store
188-
all_executions = self._store.list_all()
189-
190-
# Apply filters
191-
filtered_executions = []
192-
for execution in all_executions:
193-
# Filter by function name
194-
if function_name and execution.start_input.function_name != function_name:
195-
continue
196-
197-
# Filter by execution name
198-
if (
199-
execution_name
200-
and execution.start_input.execution_name != execution_name
201-
):
202-
continue
203-
204-
# Determine execution status
205-
execution_status = "RUNNING"
206-
if execution.is_complete:
207-
if (
208-
execution.result
209-
and execution.result.status == InvocationStatus.SUCCEEDED
210-
):
211-
execution_status = "SUCCEEDED"
212-
else:
213-
execution_status = "FAILED"
214-
215-
# Filter by status
216-
if status_filter and execution_status != status_filter:
217-
continue
218-
219-
# Convert to ExecutionSummary
220-
execution_op = execution.get_operation_execution_started()
221-
execution_summary = ExecutionSummary(
222-
durable_execution_arn=execution.durable_execution_arn,
223-
durable_execution_name=execution.start_input.execution_name,
224-
function_arn=f"arn:aws:lambda:us-east-1:123456789012:function:{execution.start_input.function_name}",
225-
status=execution_status,
226-
start_timestamp=execution_op.start_timestamp.timestamp()
227-
if execution_op.start_timestamp
228-
else datetime.now(UTC).timestamp(),
229-
end_timestamp=execution_op.end_timestamp.timestamp()
230-
if execution_op.end_timestamp
231-
else None,
232-
)
233-
filtered_executions.append(execution_summary)
234-
235-
# Sort by start date
236-
filtered_executions.sort(key=lambda e: e.start_timestamp, reverse=reverse_order)
237-
238-
# Apply pagination
239-
if max_items is None:
240-
max_items = 50
241-
242-
start_index = 0
186+
"""List executions with filtering and pagination."""
187+
# Convert marker to offset
188+
offset = 0
243189
if marker:
244190
try:
245-
start_index = int(marker)
191+
offset = int(marker)
246192
except ValueError:
247-
start_index = 0
193+
offset = 0
248194

249-
end_index = start_index + max_items
250-
paginated_executions = filtered_executions[start_index:end_index]
195+
# Query store directly with parameters
196+
executions, next_marker = self._store.query(
197+
function_name=function_name,
198+
execution_name=execution_name,
199+
status_filter=status_filter,
200+
time_after=time_after,
201+
time_before=time_before,
202+
limit=max_items or 50,
203+
offset=offset,
204+
reverse_order=reverse_order,
205+
)
251206

252-
next_marker = None
253-
if end_index < len(filtered_executions):
254-
next_marker = str(end_index)
207+
# Convert to ExecutionSummary objects
208+
execution_summaries = []
209+
for execution in executions:
210+
execution_op = execution.get_operation_execution_started()
211+
status = self._get_execution_status(execution)
212+
213+
execution_summaries.append(
214+
ExecutionSummary(
215+
durable_execution_arn=execution.durable_execution_arn,
216+
durable_execution_name=execution.start_input.execution_name,
217+
function_arn=f"arn:aws:lambda:us-east-1:123456789012:function:{execution.start_input.function_name}",
218+
status=status,
219+
start_timestamp=execution_op.start_timestamp.timestamp()
220+
if execution_op.start_timestamp
221+
else datetime.now(UTC).timestamp(),
222+
end_timestamp=execution_op.end_timestamp.timestamp()
223+
if execution_op.end_timestamp
224+
else None,
225+
)
226+
)
255227

256228
return ListDurableExecutionsResponse(
257-
durable_executions=paginated_executions, next_marker=next_marker
229+
durable_executions=execution_summaries, next_marker=next_marker
258230
)
259231

260232
def list_executions_by_function(
@@ -269,22 +241,7 @@ def list_executions_by_function(
269241
max_items: int | None = None,
270242
reverse_order: bool = False, # noqa: FBT001, FBT002
271243
) -> ListDurableExecutionsByFunctionResponse:
272-
"""List executions for a specific function.
273-
274-
Args:
275-
function_name: The function name to filter by
276-
qualifier: Function qualifier/version
277-
execution_name: Filter by execution name
278-
status_filter: Filter by status (RUNNING, SUCCEEDED, FAILED)
279-
time_after: Filter executions started after this time
280-
time_before: Filter executions started before this time
281-
marker: Pagination marker
282-
max_items: Maximum items to return (default 50)
283-
reverse_order: Return results in reverse chronological order
284-
285-
Returns:
286-
ListDurableExecutionsByFunctionResponse: List of executions for the function
287-
"""
244+
"""List executions for a specific function."""
288245
# Use the general list_executions method with function_name filter
289246
list_response = self.list_executions(
290247
function_name=function_name,

src/aws_durable_execution_sdk_python_testing/stores/base.py

Lines changed: 112 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@
22

33
from __future__ import annotations
44

5+
from datetime import UTC
56
from enum import Enum
67
from typing import TYPE_CHECKING, Protocol
78

9+
from aws_durable_execution_sdk_python.execution import InvocationStatus
10+
811

912
if TYPE_CHECKING:
1013
from aws_durable_execution_sdk_python_testing.execution import Execution
@@ -15,6 +18,7 @@ class StoreType(Enum):
1518

1619
MEMORY = "memory"
1720
FILESYSTEM = "filesystem"
21+
SQLITE = "sqlite"
1822

1923

2024
class ExecutionStore(Protocol):
@@ -24,4 +28,111 @@ class ExecutionStore(Protocol):
2428
def save(self, execution: Execution) -> None: ... # pragma: no cover
2529
def load(self, execution_arn: str) -> Execution: ... # pragma: no cover
2630
def update(self, execution: Execution) -> None: ... # pragma: no cover
27-
def list_all(self) -> list[Execution]: ... # pragma: no cover
31+
def query(
32+
self,
33+
function_name: str | None = None,
34+
execution_name: str | None = None,
35+
status_filter: str | None = None,
36+
time_after: str | None = None,
37+
time_before: str | None = None,
38+
limit: int | None = None,
39+
offset: int = 0,
40+
reverse_order: bool = False, # noqa: FBT001, FBT002
41+
) -> tuple[list[Execution], str | None]: ... # pragma: no cover
42+
def list_all(
43+
self,
44+
) -> list[Execution]: ... # pragma: no cover # Keep for backward compatibility
45+
46+
47+
class BaseExecutionStore(ExecutionStore):
48+
"""Base implementation for execution stores with shared query logic."""
49+
50+
@staticmethod
51+
def process_query(
52+
executions: list[Execution],
53+
function_name: str | None = None,
54+
execution_name: str | None = None,
55+
status_filter: str | None = None,
56+
time_after: str | None = None, # noqa: ARG004
57+
time_before: str | None = None, # noqa: ARG004
58+
limit: int | None = None,
59+
offset: int = 0,
60+
reverse_order: bool = False, # noqa: FBT001, FBT002
61+
) -> tuple[list[Execution], str | None]:
62+
"""Apply filtering, sorting, and pagination to executions."""
63+
# Apply filters
64+
# :TODO add time filtering
65+
filtered = []
66+
for execution in executions:
67+
if function_name and execution.start_input.function_name != function_name:
68+
continue
69+
if (
70+
execution_name
71+
and execution.start_input.execution_name != execution_name
72+
):
73+
continue
74+
75+
# Status filtering
76+
if status_filter:
77+
status = "RUNNING"
78+
if execution.is_complete:
79+
status = (
80+
"SUCCEEDED"
81+
if execution.result
82+
and execution.result.status == InvocationStatus.SUCCEEDED
83+
else "FAILED"
84+
)
85+
if status != status_filter:
86+
continue
87+
88+
filtered.append(execution)
89+
90+
# Sort by start timestamp
91+
def get_sort_key(e):
92+
try:
93+
op = e.get_operation_execution_started()
94+
if op.start_timestamp:
95+
return (
96+
op.start_timestamp.timestamp()
97+
if hasattr(op.start_timestamp, "timestamp")
98+
else op.start_timestamp.replace(tzinfo=UTC).timestamp()
99+
)
100+
except Exception: # noqa: BLE001, S110
101+
pass
102+
return 0
103+
104+
filtered.sort(key=get_sort_key, reverse=reverse_order)
105+
106+
# Apply pagination
107+
if limit:
108+
end_idx = offset + limit
109+
paginated = filtered[offset:end_idx]
110+
has_more = end_idx < len(filtered)
111+
next_marker = str(end_idx) if has_more else None
112+
return paginated, next_marker
113+
return filtered[offset:], None
114+
115+
def query(
116+
self,
117+
function_name: str | None = None,
118+
execution_name: str | None = None,
119+
status_filter: str | None = None,
120+
time_after: str | None = None,
121+
time_before: str | None = None,
122+
limit: int | None = None,
123+
offset: int = 0,
124+
reverse_order: bool = False, # noqa: FBT001, FBT002
125+
) -> tuple[list[Execution], str | None]:
126+
"""Apply filtering, sorting, and pagination to executions."""
127+
executions = self.list_all()
128+
return self.process_query(
129+
executions,
130+
function_name=function_name,
131+
execution_name=execution_name,
132+
status_filter=status_filter,
133+
time_after=time_after,
134+
time_before=time_before,
135+
limit=limit,
136+
offset=offset,
137+
reverse_order=reverse_order,
138+
)

src/aws_durable_execution_sdk_python_testing/stores/filesystem.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
ResourceNotFoundException,
1212
)
1313
from aws_durable_execution_sdk_python_testing.execution import Execution
14+
from aws_durable_execution_sdk_python_testing.stores.base import (
15+
BaseExecutionStore,
16+
)
1417

1518

1619
class DateTimeEncoder(json.JSONEncoder):
@@ -35,7 +38,7 @@ def datetime_object_hook(obj):
3538
return obj
3639

3740

38-
class FileSystemExecutionStore:
41+
class FileSystemExecutionStore(BaseExecutionStore):
3942
"""File system-based execution store for persistence."""
4043

4144
def __init__(self, storage_dir: Path) -> None:

0 commit comments

Comments
 (0)