Skip to content

Commit f260e85

Browse files
author
Rares Polenciuc
committed
feat: complete sqlite store and function handler implementation
- Add SQLiteExecutionStore with database persistence and indexing - Implement query system with pagination support - Add BaseExecutionStore with shared query processing logic - Update Executor to use new query system for efficient operations - Complete ListDurableExecutionsByFunctionHandler with proper filtering - Add function name validation and error handling - Add comprehensive test coverage for all implementations - Support concurrent access patterns with proper database handling
1 parent e006e9c commit f260e85

File tree

14 files changed

+1696
-225
lines changed

14 files changed

+1696
-225
lines changed

src/aws_durable_execution_sdk_python_testing/execution.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,29 @@ def token_sequence(self) -> int:
5858
"""Get current token sequence value."""
5959
return self._token_sequence
6060

61+
@property
62+
def status(self) -> str:
63+
"""Get execution status string."""
64+
if not self.is_complete:
65+
return "RUNNING"
66+
67+
if not self.result:
68+
return OperationStatus.FAILED.value
69+
70+
match self.result.status:
71+
case InvocationStatus.SUCCEEDED:
72+
return OperationStatus.SUCCEEDED.value
73+
case InvocationStatus.FAILED:
74+
if self.result.error and self.result.error.type:
75+
error_type = self.result.error.type.lower()
76+
if "timeout" in error_type:
77+
return OperationStatus.TIMED_OUT.value
78+
if "stop" in error_type:
79+
return OperationStatus.STOPPED.value
80+
return OperationStatus.FAILED.value
81+
case _:
82+
return OperationStatus.FAILED.value
83+
6184
@staticmethod
6285
def new(input: StartDurableExecutionInput) -> Execution: # noqa: A002
6386
# make a nicer arn

src/aws_durable_execution_sdk_python_testing/executor.py

Lines changed: 30 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@
1111
DurableExecutionInvocationOutput,
1212
InvocationStatus,
1313
)
14-
from aws_durable_execution_sdk_python.lambda_service import ErrorObject, OperationUpdate
14+
from aws_durable_execution_sdk_python.lambda_service import (
15+
ErrorObject,
16+
OperationUpdate,
17+
)
1518

1619
from aws_durable_execution_sdk_python_testing.exceptions import (
1720
ExecutionAlreadyStartedException,
@@ -112,21 +115,8 @@ def get_execution_details(self, execution_arn: str) -> GetDurableExecutionRespon
112115
ResourceNotFoundException: If execution does not exist
113116
"""
114117
execution = self.get_execution(execution_arn)
115-
116-
# Extract execution details from the first operation (EXECUTION type)
117118
execution_op = execution.get_operation_execution_started()
118-
119-
# Determine status based on execution state
120-
if execution.is_complete:
121-
if (
122-
execution.result
123-
and execution.result.status == InvocationStatus.SUCCEEDED
124-
):
125-
status = "SUCCEEDED"
126-
else:
127-
status = "FAILED"
128-
else:
129-
status = "RUNNING"
119+
status = execution.status
130120

131121
# Extract result and error from execution result
132122
result = None
@@ -162,99 +152,41 @@ def list_executions(
162152
function_version: str | None = None, # noqa: ARG002
163153
execution_name: str | None = None,
164154
status_filter: str | None = None,
165-
time_after: str | None = None, # noqa: ARG002
166-
time_before: str | None = None, # noqa: ARG002
155+
time_after: str | None = None,
156+
time_before: str | None = None,
167157
marker: str | None = None,
168158
max_items: int | None = None,
169159
reverse_order: bool = False, # noqa: FBT001, FBT002
170160
) -> ListDurableExecutionsResponse:
171-
"""List executions with filtering and pagination.
172-
173-
Args:
174-
function_name: Filter by function name
175-
function_version: Filter by function version
176-
execution_name: Filter by execution name
177-
status_filter: Filter by status (RUNNING, SUCCEEDED, FAILED)
178-
time_after: Filter executions started after this time
179-
time_before: Filter executions started before this time
180-
marker: Pagination marker
181-
max_items: Maximum items to return (default 50)
182-
reverse_order: Return results in reverse chronological order
183-
184-
Returns:
185-
ListDurableExecutionsResponse: List of executions with pagination
186-
"""
187-
# Get all executions from store
188-
all_executions = self._store.list_all()
189-
190-
# Apply filters
191-
filtered_executions = []
192-
for execution in all_executions:
193-
# Filter by function name
194-
if function_name and execution.start_input.function_name != function_name:
195-
continue
196-
197-
# Filter by execution name
198-
if (
199-
execution_name
200-
and execution.start_input.execution_name != execution_name
201-
):
202-
continue
203-
204-
# Determine execution status
205-
execution_status = "RUNNING"
206-
if execution.is_complete:
207-
if (
208-
execution.result
209-
and execution.result.status == InvocationStatus.SUCCEEDED
210-
):
211-
execution_status = "SUCCEEDED"
212-
else:
213-
execution_status = "FAILED"
214-
215-
# Filter by status
216-
if status_filter and execution_status != status_filter:
217-
continue
218-
219-
# Convert to ExecutionSummary
220-
execution_op = execution.get_operation_execution_started()
221-
execution_summary = ExecutionSummary(
222-
durable_execution_arn=execution.durable_execution_arn,
223-
durable_execution_name=execution.start_input.execution_name,
224-
function_arn=f"arn:aws:lambda:us-east-1:123456789012:function:{execution.start_input.function_name}",
225-
status=execution_status,
226-
start_timestamp=execution_op.start_timestamp.timestamp()
227-
if execution_op.start_timestamp
228-
else datetime.now(UTC).timestamp(),
229-
end_timestamp=execution_op.end_timestamp.timestamp()
230-
if execution_op.end_timestamp
231-
else None,
232-
)
233-
filtered_executions.append(execution_summary)
234-
235-
# Sort by start date
236-
filtered_executions.sort(key=lambda e: e.start_timestamp, reverse=reverse_order)
237-
238-
# Apply pagination
239-
if max_items is None:
240-
max_items = 50
241-
242-
start_index = 0
161+
"""List executions with filtering and pagination."""
162+
# Convert marker to offset
163+
offset = 0
243164
if marker:
244165
try:
245-
start_index = int(marker)
166+
offset = int(marker)
246167
except ValueError:
247-
start_index = 0
168+
offset = 0
248169

249-
end_index = start_index + max_items
250-
paginated_executions = filtered_executions[start_index:end_index]
170+
# Query store directly with parameters
171+
executions, next_marker = self._store.query(
172+
function_name=function_name,
173+
execution_name=execution_name,
174+
status_filter=status_filter,
175+
time_after=time_after,
176+
time_before=time_before,
177+
limit=max_items or 50,
178+
offset=offset,
179+
reverse_order=reverse_order,
180+
)
251181

252-
next_marker = None
253-
if end_index < len(filtered_executions):
254-
next_marker = str(end_index)
182+
# Convert to ExecutionSummary objects
183+
execution_summaries = [
184+
ExecutionSummary.from_execution(execution, execution.status)
185+
for execution in executions
186+
]
255187

256188
return ListDurableExecutionsResponse(
257-
durable_executions=paginated_executions, next_marker=next_marker
189+
durable_executions=execution_summaries, next_marker=next_marker
258190
)
259191

260192
def list_executions_by_function(
@@ -269,22 +201,7 @@ def list_executions_by_function(
269201
max_items: int | None = None,
270202
reverse_order: bool = False, # noqa: FBT001, FBT002
271203
) -> ListDurableExecutionsByFunctionResponse:
272-
"""List executions for a specific function.
273-
274-
Args:
275-
function_name: The function name to filter by
276-
qualifier: Function qualifier/version
277-
execution_name: Filter by execution name
278-
status_filter: Filter by status (RUNNING, SUCCEEDED, FAILED)
279-
time_after: Filter executions started after this time
280-
time_before: Filter executions started before this time
281-
marker: Pagination marker
282-
max_items: Maximum items to return (default 50)
283-
reverse_order: Return results in reverse chronological order
284-
285-
Returns:
286-
ListDurableExecutionsByFunctionResponse: List of executions for the function
287-
"""
204+
"""List executions for a specific function."""
288205
# Use the general list_executions method with function_name filter
289206
list_response = self.list_executions(
290207
function_name=function_name,

src/aws_durable_execution_sdk_python_testing/model.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from __future__ import annotations
44

55
from dataclasses import dataclass
6+
from datetime import UTC, datetime
67
from typing import Any
78

89
# Import existing types from the main SDK - REUSE EVERYTHING POSSIBLE
@@ -229,6 +230,24 @@ def from_dict(cls, data: dict) -> Execution:
229230
end_timestamp=data.get("EndTimestamp"),
230231
)
231232

233+
@classmethod
234+
def from_execution(cls, execution, status: str) -> Execution:
235+
"""Create ExecutionSummary from Execution object."""
236+
237+
execution_op = execution.get_operation_execution_started()
238+
return cls(
239+
durable_execution_arn=execution.durable_execution_arn,
240+
durable_execution_name=execution.start_input.execution_name,
241+
function_arn=f"arn:aws:lambda:us-east-1:123456789012:function:{execution.start_input.function_name}",
242+
status=status,
243+
start_timestamp=execution_op.start_timestamp.timestamp()
244+
if execution_op.start_timestamp
245+
else datetime.now(UTC).timestamp(),
246+
end_timestamp=execution_op.end_timestamp.timestamp()
247+
if execution_op.end_timestamp
248+
else None,
249+
)
250+
232251
def to_dict(self) -> dict[str, Any]:
233252
result = {
234253
"DurableExecutionArn": self.durable_execution_arn,

src/aws_durable_execution_sdk_python_testing/stores/base.py

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from __future__ import annotations
44

5+
from datetime import UTC
56
from enum import Enum
67
from typing import TYPE_CHECKING, Protocol
78

@@ -15,6 +16,7 @@ class StoreType(Enum):
1516

1617
MEMORY = "memory"
1718
FILESYSTEM = "filesystem"
19+
SQLITE = "sqlite"
1820

1921

2022
class ExecutionStore(Protocol):
@@ -24,4 +26,118 @@ class ExecutionStore(Protocol):
2426
def save(self, execution: Execution) -> None: ... # pragma: no cover
2527
def load(self, execution_arn: str) -> Execution: ... # pragma: no cover
2628
def update(self, execution: Execution) -> None: ... # pragma: no cover
27-
def list_all(self) -> list[Execution]: ... # pragma: no cover
29+
def query(
30+
self,
31+
function_name: str | None = None,
32+
execution_name: str | None = None,
33+
status_filter: str | None = None,
34+
time_after: str | None = None,
35+
time_before: str | None = None,
36+
limit: int | None = None,
37+
offset: int = 0,
38+
reverse_order: bool = False, # noqa: FBT001, FBT002
39+
) -> tuple[list[Execution], str | None]: ... # pragma: no cover
40+
def list_all(
41+
self,
42+
) -> list[Execution]: ... # pragma: no cover # Keep for backward compatibility
43+
44+
45+
class BaseExecutionStore(ExecutionStore):
46+
"""Base implementation for execution stores with shared query logic."""
47+
48+
@staticmethod
49+
def process_query(
50+
executions: list[Execution],
51+
function_name: str | None = None,
52+
execution_name: str | None = None,
53+
status_filter: str | None = None,
54+
time_after: str | None = None,
55+
time_before: str | None = None,
56+
limit: int | None = None,
57+
offset: int = 0,
58+
reverse_order: bool = False, # noqa: FBT001, FBT002
59+
) -> tuple[list[Execution], str | None]:
60+
"""Apply filtering, sorting, and pagination to executions."""
61+
# Apply filters
62+
filtered = []
63+
for execution in executions:
64+
if function_name and execution.start_input.function_name != function_name:
65+
continue
66+
if (
67+
execution_name
68+
and execution.start_input.execution_name != execution_name
69+
):
70+
continue
71+
72+
# Status filtering
73+
if status_filter and execution.status != status_filter:
74+
continue
75+
76+
# Time filtering
77+
if time_after or time_before:
78+
try:
79+
op = execution.get_operation_execution_started()
80+
if op.start_timestamp:
81+
timestamp = (
82+
op.start_timestamp.timestamp()
83+
if hasattr(op.start_timestamp, "timestamp")
84+
else op.start_timestamp.replace(tzinfo=UTC).timestamp()
85+
)
86+
if time_after and timestamp < float(time_after):
87+
continue
88+
if time_before and timestamp > float(time_before):
89+
continue
90+
except (ValueError, AttributeError):
91+
continue
92+
93+
filtered.append(execution)
94+
95+
# Sort by start timestamp
96+
def get_sort_key(e):
97+
try:
98+
op = e.get_operation_execution_started()
99+
if op.start_timestamp:
100+
return (
101+
op.start_timestamp.timestamp()
102+
if hasattr(op.start_timestamp, "timestamp")
103+
else op.start_timestamp.replace(tzinfo=UTC).timestamp()
104+
)
105+
except Exception: # noqa: BLE001, S110
106+
pass
107+
return 0
108+
109+
filtered.sort(key=get_sort_key, reverse=reverse_order)
110+
111+
# Apply pagination
112+
if limit:
113+
end_idx = offset + limit
114+
paginated = filtered[offset:end_idx]
115+
has_more = end_idx < len(filtered)
116+
next_marker = str(end_idx) if has_more else None
117+
return paginated, next_marker
118+
return filtered[offset:], None
119+
120+
def query(
121+
self,
122+
function_name: str | None = None,
123+
execution_name: str | None = None,
124+
status_filter: str | None = None,
125+
time_after: str | None = None,
126+
time_before: str | None = None,
127+
limit: int | None = None,
128+
offset: int = 0,
129+
reverse_order: bool = False, # noqa: FBT001, FBT002
130+
) -> tuple[list[Execution], str | None]:
131+
"""Apply filtering, sorting, and pagination to executions."""
132+
executions = self.list_all()
133+
return self.process_query(
134+
executions,
135+
function_name=function_name,
136+
execution_name=execution_name,
137+
status_filter=status_filter,
138+
time_after=time_after,
139+
time_before=time_before,
140+
limit=limit,
141+
offset=offset,
142+
reverse_order=reverse_order,
143+
)

0 commit comments

Comments
 (0)