Skip to content

Commit fdbb2ac

Browse files
author
Rares Polenciuc
committed
feat: complete sqlite store and function handler implementation
- Add SQLiteExecutionStore with database persistence and indexing - Implement query system with pagination support - Add BaseExecutionStore with shared query processing logic - Update Executor to use new query system for efficient operations - Complete ListDurableExecutionsByFunctionHandler with proper filtering - Add function name validation and error handling - Add comprehensive test coverage for all implementations - Support concurrent access patterns with proper database handling
1 parent 1df26ce commit fdbb2ac

File tree

18 files changed

+2574
-302
lines changed

18 files changed

+2574
-302
lines changed

src/aws_durable_execution_sdk_python_testing/checkpoint/processors/execution.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ def process(
4545
"There is no error details but EXECUTION checkpoint action is not SUCCEED."
4646
)
4747
)
48+
# All EXECUTION failures go through normal fail path
49+
# Timeout/Stop status is set by executor based on the operation that caused it
4850
notifier.notify_failed(execution_arn=execution_arn, error=error)
4951
# TODO: Svc doesn't actually create checkpoint for EXECUTION. might have to for localrunner though.
5052
return None

src/aws_durable_execution_sdk_python_testing/execution.py

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import json
44
from dataclasses import replace
55
from datetime import UTC, datetime
6+
from enum import Enum
67
from threading import Lock
78
from typing import Any
89
from uuid import uuid4
@@ -20,17 +21,37 @@
2021
OperationUpdate,
2122
)
2223

23-
# Import AWS exceptions
2424
from aws_durable_execution_sdk_python_testing.exceptions import (
2525
IllegalStateException,
2626
InvalidParameterValueException,
2727
)
28+
29+
# Import AWS exceptions
2830
from aws_durable_execution_sdk_python_testing.model import (
2931
StartDurableExecutionInput,
3032
)
3133
from aws_durable_execution_sdk_python_testing.token import CheckpointToken
3234

3335

36+
class CloseStatus(Enum):
37+
"""Close status for completed executions (mimics backend SWF CloseStatus)."""
38+
39+
COMPLETED = "COMPLETED"
40+
FAILED = "FAILED"
41+
TERMINATED = "TERMINATED"
42+
TIMED_OUT = "TIMED_OUT"
43+
44+
45+
class ExecutionStatus(Enum):
46+
"""Execution status for API responses (mimics backend ExecutionStatus)."""
47+
48+
RUNNING = "RUNNING"
49+
SUCCEEDED = "SUCCEEDED"
50+
FAILED = "FAILED"
51+
STOPPED = "STOPPED"
52+
TIMED_OUT = "TIMED_OUT"
53+
54+
3455
class Execution:
3556
"""Execution state."""
3657

@@ -52,12 +73,39 @@ def __init__(
5273
self.is_complete: bool = False
5374
self.result: DurableExecutionInvocationOutput | None = None
5475
self.consecutive_failed_invocation_attempts: int = 0
76+
self.close_status: CloseStatus | None = (
77+
None # Track close status like backend SWF
78+
)
5579

5680
@property
5781
def token_sequence(self) -> int:
5882
"""Get current token sequence value."""
5983
return self._token_sequence
6084

85+
@property
86+
def status(self) -> str:
87+
"""Get execution status string (mimics backend ExecutionStatusConverter)."""
88+
if not self.is_complete:
89+
return ExecutionStatus.RUNNING.value
90+
91+
if not self.close_status:
92+
msg: str = "close_status cannot be None"
93+
raise IllegalStateException(msg)
94+
95+
# Convert CloseStatus to ExecutionStatus (like backend ExecutionStatusConverter)
96+
match self.close_status:
97+
case CloseStatus.COMPLETED:
98+
return ExecutionStatus.SUCCEEDED.value
99+
case CloseStatus.FAILED:
100+
return ExecutionStatus.FAILED.value
101+
case CloseStatus.TERMINATED:
102+
return ExecutionStatus.STOPPED.value
103+
case CloseStatus.TIMED_OUT:
104+
return ExecutionStatus.TIMED_OUT.value
105+
case _:
106+
error_msg: str = f"Unexpected close status: {self.close_status}"
107+
raise InvalidParameterValueException(error_msg)
108+
61109
@staticmethod
62110
def new(input: StartDurableExecutionInput) -> Execution: # noqa: A002
63111
# make a nicer arn
@@ -79,6 +127,7 @@ def to_dict(self) -> dict[str, Any]:
79127
"IsComplete": self.is_complete,
80128
"Result": self.result.to_dict() if self.result else None,
81129
"ConsecutiveFailedInvocationAttempts": self.consecutive_failed_invocation_attempts,
130+
"CloseStatus": self.close_status.value if self.close_status else None,
82131
}
83132

84133
@classmethod
@@ -112,6 +161,10 @@ def from_dict(cls, data: dict[str, Any]) -> Execution:
112161
execution.consecutive_failed_invocation_attempts = data[
113162
"ConsecutiveFailedInvocationAttempts"
114163
]
164+
close_status_str = data.get("CloseStatus")
165+
execution.close_status = (
166+
CloseStatus(close_status_str) if close_status_str else None
167+
)
115168

116169
return execution
117170

@@ -184,16 +237,36 @@ def has_pending_operations(self, execution: Execution) -> bool:
184237
return False
185238

186239
def complete_success(self, result: str | None) -> None:
240+
"""Complete execution successfully (DecisionType.COMPLETE_WORKFLOW_EXECUTION)."""
187241
self.result = DurableExecutionInvocationOutput(
188242
status=InvocationStatus.SUCCEEDED, result=result
189243
)
190244
self.is_complete = True
245+
self.close_status = CloseStatus.COMPLETED
191246

192247
def complete_fail(self, error: ErrorObject) -> None:
248+
"""Complete execution with failure (DecisionType.FAIL_WORKFLOW_EXECUTION)."""
249+
self.result = DurableExecutionInvocationOutput(
250+
status=InvocationStatus.FAILED, error=error
251+
)
252+
self.is_complete = True
253+
self.close_status = CloseStatus.FAILED
254+
255+
def complete_timeout(self, error: ErrorObject) -> None:
256+
"""Complete execution with timeout (SWF workflow timeout)."""
257+
self.result = DurableExecutionInvocationOutput(
258+
status=InvocationStatus.FAILED, error=error
259+
)
260+
self.is_complete = True
261+
self.close_status = CloseStatus.TIMED_OUT
262+
263+
def complete_stopped(self, error: ErrorObject) -> None:
264+
"""Complete execution as terminated (TerminateWorkflowExecutionV2Request)."""
193265
self.result = DurableExecutionInvocationOutput(
194266
status=InvocationStatus.FAILED, error=error
195267
)
196268
self.is_complete = True
269+
self.close_status = CloseStatus.TERMINATED
197270

198271
def find_operation(self, operation_id: str) -> tuple[int, Operation]:
199272
"""Find operation by ID, return index and operation."""

src/aws_durable_execution_sdk_python_testing/executor.py

Lines changed: 54 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -120,18 +120,7 @@ def get_execution_details(self, execution_arn: str) -> GetDurableExecutionRespon
120120

121121
# Extract execution details from the first operation (EXECUTION type)
122122
execution_op = execution.get_operation_execution_started()
123-
124-
# Determine status based on execution state
125-
if execution.is_complete:
126-
if (
127-
execution.result
128-
and execution.result.status == InvocationStatus.SUCCEEDED
129-
):
130-
status = "SUCCEEDED"
131-
else:
132-
status = "FAILED"
133-
else:
134-
status = "RUNNING"
123+
status = execution.status
135124

136125
# Extract result and error from execution result
137126
result = None
@@ -167,8 +156,8 @@ def list_executions(
167156
function_version: str | None = None, # noqa: ARG002
168157
execution_name: str | None = None,
169158
status_filter: str | None = None,
170-
time_after: str | None = None, # noqa: ARG002
171-
time_before: str | None = None, # noqa: ARG002
159+
started_after: str | None = None,
160+
started_before: str | None = None,
172161
marker: str | None = None,
173162
max_items: int | None = None,
174163
reverse_order: bool = False, # noqa: FBT001, FBT002
@@ -180,86 +169,43 @@ def list_executions(
180169
function_version: Filter by function version
181170
execution_name: Filter by execution name
182171
status_filter: Filter by status (RUNNING, SUCCEEDED, FAILED)
183-
time_after: Filter executions started after this time
184-
time_before: Filter executions started before this time
172+
started_after: Filter executions started after this time
173+
started_before: Filter executions started before this time
185174
marker: Pagination marker
186175
max_items: Maximum items to return (default 50)
187176
reverse_order: Return results in reverse chronological order
188177
189178
Returns:
190179
ListDurableExecutionsResponse: List of executions with pagination
191180
"""
192-
# Get all executions from store
193-
all_executions = self._store.list_all()
194-
195-
# Apply filters
196-
filtered_executions = []
197-
for execution in all_executions:
198-
# Filter by function name
199-
if function_name and execution.start_input.function_name != function_name:
200-
continue
201-
202-
# Filter by execution name
203-
if (
204-
execution_name
205-
and execution.start_input.execution_name != execution_name
206-
):
207-
continue
208-
209-
# Determine execution status
210-
execution_status = "RUNNING"
211-
if execution.is_complete:
212-
if (
213-
execution.result
214-
and execution.result.status == InvocationStatus.SUCCEEDED
215-
):
216-
execution_status = "SUCCEEDED"
217-
else:
218-
execution_status = "FAILED"
219-
220-
# Filter by status
221-
if status_filter and execution_status != status_filter:
222-
continue
223-
224-
# Convert to ExecutionSummary
225-
execution_op = execution.get_operation_execution_started()
226-
execution_summary = ExecutionSummary(
227-
durable_execution_arn=execution.durable_execution_arn,
228-
durable_execution_name=execution.start_input.execution_name,
229-
function_arn=f"arn:aws:lambda:us-east-1:123456789012:function:{execution.start_input.function_name}",
230-
status=execution_status,
231-
start_timestamp=execution_op.start_timestamp
232-
if execution_op.start_timestamp
233-
else datetime.now(UTC),
234-
end_timestamp=execution_op.end_timestamp
235-
if execution_op.end_timestamp
236-
else None,
237-
)
238-
filtered_executions.append(execution_summary)
239-
240-
# Sort by start date
241-
filtered_executions.sort(key=lambda e: e.start_timestamp, reverse=reverse_order)
242-
243-
# Apply pagination
244-
if max_items is None:
245-
max_items = 50
246-
247-
start_index = 0
181+
# Convert marker to offset
182+
offset: int = 0
248183
if marker:
249184
try:
250-
start_index = int(marker)
185+
offset = int(marker)
251186
except ValueError:
252-
start_index = 0
187+
offset = 0
253188

254-
end_index = start_index + max_items
255-
paginated_executions = filtered_executions[start_index:end_index]
189+
# Query store directly with parameters
190+
executions, next_marker = self._store.query(
191+
function_name=function_name,
192+
execution_name=execution_name,
193+
status_filter=status_filter,
194+
started_after=started_after,
195+
started_before=started_before,
196+
limit=max_items or 50,
197+
offset=offset,
198+
reverse_order=reverse_order,
199+
)
256200

257-
next_marker = None
258-
if end_index < len(filtered_executions):
259-
next_marker = str(end_index)
201+
# Convert to ExecutionSummary objects
202+
execution_summaries: list[ExecutionSummary] = [
203+
ExecutionSummary.from_execution(execution, execution.status)
204+
for execution in executions
205+
]
260206

261207
return ListDurableExecutionsResponse(
262-
durable_executions=paginated_executions, next_marker=next_marker
208+
durable_executions=execution_summaries, next_marker=next_marker
263209
)
264210

265211
def list_executions_by_function(
@@ -268,8 +214,8 @@ def list_executions_by_function(
268214
qualifier: str | None = None, # noqa: ARG002
269215
execution_name: str | None = None,
270216
status_filter: str | None = None,
271-
time_after: str | None = None,
272-
time_before: str | None = None,
217+
started_after: str | None = None,
218+
started_before: str | None = None,
273219
marker: str | None = None,
274220
max_items: int | None = None,
275221
reverse_order: bool = False, # noqa: FBT001, FBT002
@@ -281,8 +227,8 @@ def list_executions_by_function(
281227
qualifier: Function qualifier/version
282228
execution_name: Filter by execution name
283229
status_filter: Filter by status (RUNNING, SUCCEEDED, FAILED)
284-
time_after: Filter executions started after this time
285-
time_before: Filter executions started before this time
230+
started_after: Filter executions started after this time
231+
started_before: Filter executions started before this time
286232
marker: Pagination marker
287233
max_items: Maximum items to return (default 50)
288234
reverse_order: Return results in reverse chronological order
@@ -295,8 +241,8 @@ def list_executions_by_function(
295241
function_name=function_name,
296242
execution_name=execution_name,
297243
status_filter=status_filter,
298-
time_after=time_after,
299-
time_before=time_before,
244+
started_after=started_after,
245+
started_before=started_before,
300246
marker=marker,
301247
max_items=max_items,
302248
reverse_order=reverse_order,
@@ -335,8 +281,11 @@ def stop_execution(
335281
"Execution stopped by user request"
336282
)
337283

338-
# Stop the execution
339-
self.fail_execution(execution_arn, stop_error)
284+
# Stop sets TERMINATED close status (different from fail)
285+
logger.exception("[%s] Stopping execution.", execution_arn)
286+
execution.complete_stopped(error=stop_error) # Sets CloseStatus.TERMINATED
287+
self._store.update(execution)
288+
self._complete_events(execution_arn=execution_arn)
340289

341290
return StopDurableExecutionResponse(stop_timestamp=datetime.now(UTC))
342291

@@ -824,27 +773,24 @@ def wait_until_complete(
824773
raise ResourceNotFoundException(msg)
825774

826775
def complete_execution(self, execution_arn: str, result: str | None = None) -> None:
827-
"""Complete execution successfully."""
776+
"""Complete execution successfully (COMPLETE_WORKFLOW_EXECUTION decision)."""
828777
logger.debug("[%s] Completing execution with result: %s", execution_arn, result)
829778
execution: Execution = self._store.load(execution_arn=execution_arn)
830-
execution.complete_success(result=result)
779+
execution.complete_success(result=result) # Sets CloseStatus.COMPLETED
831780
self._store.update(execution)
832781
if execution.result is None:
833782
msg: str = "Execution result is required"
834-
835783
raise IllegalStateException(msg)
836784
self._complete_events(execution_arn=execution_arn)
837785

838786
def fail_execution(self, execution_arn: str, error: ErrorObject) -> None:
839-
"""Fail execution with error."""
787+
"""Fail execution with error (FAIL_WORKFLOW_EXECUTION decision)."""
840788
logger.exception("[%s] Completing execution with error.", execution_arn)
841789
execution: Execution = self._store.load(execution_arn=execution_arn)
842-
execution.complete_fail(error=error)
790+
execution.complete_fail(error=error) # Sets CloseStatus.FAILED
843791
self._store.update(execution)
844-
# set by complete_fail
845792
if execution.result is None:
846793
msg: str = "Execution result is required"
847-
848794
raise IllegalStateException(msg)
849795
self._complete_events(execution_arn=execution_arn)
850796

@@ -896,6 +842,19 @@ def on_failed(self, execution_arn: str, error: ErrorObject) -> None:
896842
"""Fail execution. Observer method triggered by notifier."""
897843
self.fail_execution(execution_arn, error)
898844

845+
def on_timed_out(self, execution_arn: str, error: ErrorObject) -> None:
846+
"""Handle execution timeout (workflow timeout). Observer method triggered by notifier."""
847+
logger.exception("[%s] Execution timed out.", execution_arn)
848+
execution: Execution = self._store.load(execution_arn=execution_arn)
849+
execution.complete_timeout(error=error) # Sets CloseStatus.TIMED_OUT
850+
self._store.update(execution)
851+
self._complete_events(execution_arn=execution_arn)
852+
853+
def on_stopped(self, execution_arn: str, error: ErrorObject) -> None:
854+
"""Handle execution stop. Observer method triggered by notifier."""
855+
# This should not be called directly - stop_execution handles termination
856+
self.fail_execution(execution_arn, error)
857+
899858
def on_wait_timer_scheduled(
900859
self, execution_arn: str, operation_id: str, delay: float
901860
) -> None:

0 commit comments

Comments
 (0)