Skip to content

Commit adde759

Browse files
author
Rares Polenciuc
committed
feat: add SQLite execution store with query capabilities
- Add SQLiteExecutionStore with database persistence and indexing - Implement ExecutionQuery system for filtering by function name, execution name, status - Add QueryResult with pagination support (limit, offset, has_more) - Refactor BaseExecutionStore with shared query processing logic - Update Executor to use new query system for efficient list operations - Add comprehensive test coverage for all store implementations - Support concurrent access patterns with proper database handling
1 parent 48eae39 commit adde759

File tree

10 files changed

+1529
-94
lines changed

10 files changed

+1529
-94
lines changed

src/aws_durable_execution_sdk_python_testing/executor.py

Lines changed: 51 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,8 @@ def list_executions(
162162
function_version: str | None = None, # noqa: ARG002
163163
execution_name: str | None = None,
164164
status_filter: str | None = None,
165-
time_after: str | None = None, # noqa: ARG002
166-
time_before: str | None = None, # noqa: ARG002
165+
time_after: str | None = None,
166+
time_before: str | None = None,
167167
marker: str | None = None,
168168
max_items: int | None = None,
169169
reverse_order: bool = False, # noqa: FBT001, FBT002
@@ -184,77 +184,65 @@ def list_executions(
184184
Returns:
185185
ListDurableExecutionsResponse: List of executions with pagination
186186
"""
187-
# Get all executions from store
188-
all_executions = self._store.list_all()
187+
from aws_durable_execution_sdk_python_testing.stores.base import ExecutionQuery
189188

190-
# Apply filters
191-
filtered_executions = []
192-
for execution in all_executions:
193-
# Filter by function name
194-
if function_name and execution.start_input.function_name != function_name:
195-
continue
189+
# Convert marker to offset
190+
offset = 0
191+
if marker:
192+
try:
193+
offset = int(marker)
194+
except ValueError:
195+
offset = 0
196196

197-
# Filter by execution name
198-
if (
199-
execution_name
200-
and execution.start_input.execution_name != execution_name
201-
):
202-
continue
197+
# Query store
198+
query = ExecutionQuery(
199+
function_name=function_name,
200+
execution_name=execution_name,
201+
status_filter=status_filter,
202+
time_after=time_after,
203+
time_before=time_before,
204+
limit=max_items or 50,
205+
offset=offset,
206+
reverse_order=reverse_order,
207+
)
208+
209+
result = self._store.query(query)
203210

204-
# Determine execution status
205-
execution_status = "RUNNING"
211+
# Convert to ExecutionSummary objects
212+
execution_summaries = []
213+
for execution in result.executions:
214+
execution_op = execution.get_operation_execution_started()
215+
status = "RUNNING"
206216
if execution.is_complete:
207-
if (
208-
execution.result
217+
status = (
218+
"SUCCEEDED"
219+
if execution.result
209220
and execution.result.status == InvocationStatus.SUCCEEDED
210-
):
211-
execution_status = "SUCCEEDED"
212-
else:
213-
execution_status = "FAILED"
214-
215-
# Filter by status
216-
if status_filter and execution_status != status_filter:
217-
continue
221+
else "FAILED"
222+
)
218223

219-
# Convert to ExecutionSummary
220-
execution_op = execution.get_operation_execution_started()
221-
execution_summary = ExecutionSummary(
222-
durable_execution_arn=execution.durable_execution_arn,
223-
durable_execution_name=execution.start_input.execution_name,
224-
function_arn=f"arn:aws:lambda:us-east-1:123456789012:function:{execution.start_input.function_name}",
225-
status=execution_status,
226-
start_timestamp=execution_op.start_timestamp.timestamp()
227-
if execution_op.start_timestamp
228-
else datetime.now(UTC).timestamp(),
229-
end_timestamp=execution_op.end_timestamp.timestamp()
230-
if execution_op.end_timestamp
231-
else None,
224+
execution_summaries.append(
225+
ExecutionSummary(
226+
durable_execution_arn=execution.durable_execution_arn,
227+
durable_execution_name=execution.start_input.execution_name,
228+
function_arn=f"arn:aws:lambda:us-east-1:123456789012:function:{execution.start_input.function_name}",
229+
status=status,
230+
start_timestamp=execution_op.start_timestamp.timestamp()
231+
if execution_op.start_timestamp
232+
else datetime.now(UTC).timestamp(),
233+
end_timestamp=execution_op.end_timestamp.timestamp()
234+
if execution_op.end_timestamp
235+
else None,
236+
)
232237
)
233-
filtered_executions.append(execution_summary)
234-
235-
# Sort by start date
236-
filtered_executions.sort(key=lambda e: e.start_timestamp, reverse=reverse_order)
237-
238-
# Apply pagination
239-
if max_items is None:
240-
max_items = 50
241-
242-
start_index = 0
243-
if marker:
244-
try:
245-
start_index = int(marker)
246-
except ValueError:
247-
start_index = 0
248238

249-
end_index = start_index + max_items
250-
paginated_executions = filtered_executions[start_index:end_index]
251-
252-
next_marker = None
253-
if end_index < len(filtered_executions):
254-
next_marker = str(end_index)
239+
# Generate next marker
240+
next_marker = (
241+
str(offset + len(execution_summaries)) if result.has_more else None
242+
)
255243

256244
return ListDurableExecutionsResponse(
257-
durable_executions=paginated_executions, next_marker=next_marker
245+
durable_executions=execution_summaries, next_marker=next_marker
258246
)
259247

260248
def list_executions_by_function(

src/aws_durable_execution_sdk_python_testing/stores/base.py

Lines changed: 106 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,13 @@
22

33
from __future__ import annotations
44

5+
from dataclasses import dataclass
6+
from datetime import UTC
57
from enum import Enum
68
from typing import TYPE_CHECKING, Protocol
79

10+
from aws_durable_execution_sdk_python.execution import InvocationStatus
11+
812

913
if TYPE_CHECKING:
1014
from aws_durable_execution_sdk_python_testing.execution import Execution
@@ -15,6 +19,30 @@ class StoreType(Enum):
1519

1620
MEMORY = "memory"
1721
FILESYSTEM = "filesystem"
22+
SQLITE = "sqlite"
23+
24+
25+
@dataclass
26+
class ExecutionQuery:
27+
"""Query parameters for execution filtering."""
28+
29+
function_name: str | None = None
30+
execution_name: str | None = None
31+
status_filter: str | None = None
32+
time_after: str | None = None
33+
time_before: str | None = None
34+
limit: int | None = None
35+
offset: int = 0
36+
reverse_order: bool = False
37+
38+
39+
@dataclass
40+
class QueryResult:
41+
"""Result of a query with pagination info."""
42+
43+
executions: list[Execution]
44+
total_count: int
45+
has_more: bool
1846

1947

2048
class ExecutionStore(Protocol):
@@ -24,4 +52,81 @@ class ExecutionStore(Protocol):
2452
def save(self, execution: Execution) -> None: ... # pragma: no cover
2553
def load(self, execution_arn: str) -> Execution: ... # pragma: no cover
2654
def update(self, execution: Execution) -> None: ... # pragma: no cover
27-
def list_all(self) -> list[Execution]: ... # pragma: no cover
55+
def query(self, query: ExecutionQuery) -> QueryResult: ... # pragma: no cover
56+
def list_all(
57+
self,
58+
) -> list[Execution]: ... # pragma: no cover # Keep for backward compatibility
59+
60+
61+
class BaseExecutionStore(ExecutionStore):
62+
"""Base implementation for execution stores with shared query logic."""
63+
64+
@staticmethod
65+
def process_query(
66+
executions: list[Execution], query: ExecutionQuery
67+
) -> QueryResult:
68+
"""Apply filtering, sorting, and pagination to executions."""
69+
# Apply filters
70+
filtered = []
71+
for execution in executions:
72+
if (
73+
query.function_name
74+
and execution.start_input.function_name != query.function_name
75+
):
76+
continue
77+
if (
78+
query.execution_name
79+
and execution.start_input.execution_name != query.execution_name
80+
):
81+
continue
82+
83+
# Status filtering
84+
if query.status_filter:
85+
status = "RUNNING"
86+
if execution.is_complete:
87+
status = (
88+
"SUCCEEDED"
89+
if execution.result
90+
and execution.result.status == InvocationStatus.SUCCEEDED
91+
else "FAILED"
92+
)
93+
if status != query.status_filter:
94+
continue
95+
96+
filtered.append(execution)
97+
98+
# Sort by start timestamp
99+
def get_sort_key(e):
100+
try:
101+
op = e.get_operation_execution_started()
102+
if op.start_timestamp:
103+
return (
104+
op.start_timestamp.timestamp()
105+
if hasattr(op.start_timestamp, "timestamp")
106+
else op.start_timestamp.replace(tzinfo=UTC).timestamp()
107+
)
108+
except Exception: # noqa: BLE001, S110
109+
pass
110+
return 0
111+
112+
filtered.sort(key=get_sort_key, reverse=query.reverse_order)
113+
114+
total_count = len(filtered)
115+
116+
# Apply pagination
117+
if query.limit:
118+
end_idx = query.offset + query.limit
119+
paginated = filtered[query.offset : end_idx]
120+
has_more = end_idx < total_count
121+
else:
122+
paginated = filtered[query.offset :]
123+
has_more = False
124+
125+
return QueryResult(
126+
executions=paginated, total_count=total_count, has_more=has_more
127+
)
128+
129+
def query(self, query: ExecutionQuery) -> QueryResult:
130+
"""Apply filtering, sorting, and pagination to executions."""
131+
executions = self.list_all()
132+
return self.process_query(executions, query)

src/aws_durable_execution_sdk_python_testing/stores/filesystem.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
ResourceNotFoundException,
1212
)
1313
from aws_durable_execution_sdk_python_testing.execution import Execution
14+
from aws_durable_execution_sdk_python_testing.stores.base import (
15+
BaseExecutionStore,
16+
)
1417

1518

1619
class DateTimeEncoder(json.JSONEncoder):
@@ -35,7 +38,7 @@ def datetime_object_hook(obj):
3538
return obj
3639

3740

38-
class FileSystemExecutionStore:
41+
class FileSystemExecutionStore(BaseExecutionStore):
3942
"""File system-based execution store for persistence."""
4043

4144
def __init__(self, storage_dir: Path) -> None:

src/aws_durable_execution_sdk_python_testing/stores/memory.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,16 @@
55
from threading import Lock
66
from typing import TYPE_CHECKING
77

8+
from aws_durable_execution_sdk_python_testing.stores.base import (
9+
BaseExecutionStore,
10+
)
11+
812

913
if TYPE_CHECKING:
1014
from aws_durable_execution_sdk_python_testing.execution import Execution
1115

1216

13-
class InMemoryExecutionStore:
17+
class InMemoryExecutionStore(BaseExecutionStore):
1418
"""Dict-based storage for testing."""
1519

1620
def __init__(self) -> None:

0 commit comments

Comments
 (0)