Skip to content

Commit b3cb7a1

Browse files
author
Rares Polenciuc
committed
feat: complete sqlite store and function handler implementation
- Add SQLiteExecutionStore with database persistence and indexing - Implement query system with pagination support - Add BaseExecutionStore with shared query processing logic - Update Executor to use new query system for efficient operations - Complete ListDurableExecutionsByFunctionHandler with proper filtering - Add function name validation and error handling - Add comprehensive test coverage for all implementations - Support concurrent access patterns with proper database handling
1 parent 3a31d90 commit b3cb7a1

File tree

20 files changed

+2606
-324
lines changed

20 files changed

+2606
-324
lines changed

src/aws_durable_execution_sdk_python_testing/checkpoint/processors/execution.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ def process(
4545
"There is no error details but EXECUTION checkpoint action is not SUCCEED."
4646
)
4747
)
48+
# All EXECUTION failures go through normal fail path
49+
# Timeout/Stop status is set by executor based on the operation that caused it
4850
notifier.notify_failed(execution_arn=execution_arn, error=error)
4951
# TODO: Svc doesn't actually create checkpoint for EXECUTION. might have to for localrunner though.
5052
return None

src/aws_durable_execution_sdk_python_testing/execution.py

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import json
44
from dataclasses import replace
55
from datetime import UTC, datetime
6+
from enum import Enum
67
from threading import Lock
78
from typing import Any
89
from uuid import uuid4
@@ -20,11 +21,12 @@
2021
OperationUpdate,
2122
)
2223

23-
# Import AWS exceptions
2424
from aws_durable_execution_sdk_python_testing.exceptions import (
2525
IllegalStateException,
2626
InvalidParameterValueException,
2727
)
28+
29+
# Import AWS exceptions
2830
from aws_durable_execution_sdk_python_testing.model import (
2931
StartDurableExecutionInput,
3032
)
@@ -34,6 +36,41 @@
3436
)
3537

3638

39+
class CloseStatus(Enum):
40+
"""Close status for completed executions (mimics backend SWF CloseStatus)."""
41+
42+
COMPLETED = "COMPLETED"
43+
FAILED = "FAILED"
44+
TERMINATED = "TERMINATED"
45+
TIMED_OUT = "TIMED_OUT"
46+
47+
48+
class ExecutionStatus(Enum):
49+
"""Execution status for API responses (mimics backend ExecutionStatus)."""
50+
51+
RUNNING = "RUNNING"
52+
SUCCEEDED = "SUCCEEDED"
53+
FAILED = "FAILED"
54+
STOPPED = "STOPPED"
55+
TIMED_OUT = "TIMED_OUT"
56+
57+
@classmethod
58+
def from_close_status(cls, close_status: CloseStatus) -> ExecutionStatus:
59+
"""Convert CloseStatus to ExecutionStatus."""
60+
match close_status:
61+
case CloseStatus.COMPLETED:
62+
return cls.SUCCEEDED
63+
case CloseStatus.FAILED:
64+
return cls.FAILED
65+
case CloseStatus.TERMINATED:
66+
return cls.STOPPED
67+
case CloseStatus.TIMED_OUT:
68+
return cls.TIMED_OUT
69+
case _:
70+
error_msg: str = f"Unexpected close status: {close_status}"
71+
raise InvalidParameterValueException(error_msg)
72+
73+
3774
class Execution:
3875
"""Execution state."""
3976

@@ -55,12 +92,27 @@ def __init__(
5592
self.is_complete: bool = False
5693
self.result: DurableExecutionInvocationOutput | None = None
5794
self.consecutive_failed_invocation_attempts: int = 0
95+
self.close_status: CloseStatus | None = (
96+
None # Track close status like backend SWF
97+
)
5898

5999
@property
60100
def token_sequence(self) -> int:
61101
"""Get current token sequence value."""
62102
return self._token_sequence
63103

104+
def current_status(self) -> ExecutionStatus:
105+
"""Get execution status."""
106+
if not self.is_complete:
107+
return ExecutionStatus.RUNNING
108+
109+
if not self.close_status:
110+
msg: str = "close_status must be set when execution is complete"
111+
raise IllegalStateException(msg)
112+
113+
# Convert CloseStatus to ExecutionStatus
114+
return ExecutionStatus.from_close_status(self.close_status)
115+
64116
@staticmethod
65117
def new(input: StartDurableExecutionInput) -> Execution: # noqa: A002
66118
# make a nicer arn
@@ -82,6 +134,7 @@ def to_dict(self) -> dict[str, Any]:
82134
"IsComplete": self.is_complete,
83135
"Result": self.result.to_dict() if self.result else None,
84136
"ConsecutiveFailedInvocationAttempts": self.consecutive_failed_invocation_attempts,
137+
"CloseStatus": self.close_status.value if self.close_status else None,
85138
}
86139

87140
@classmethod
@@ -115,6 +168,10 @@ def from_dict(cls, data: dict[str, Any]) -> Execution:
115168
execution.consecutive_failed_invocation_attempts = data[
116169
"ConsecutiveFailedInvocationAttempts"
117170
]
171+
close_status_str = data.get("CloseStatus")
172+
execution.close_status = (
173+
CloseStatus(close_status_str) if close_status_str else None
174+
)
118175

119176
return execution
120177

@@ -187,16 +244,40 @@ def has_pending_operations(self, execution: Execution) -> bool:
187244
return False
188245

189246
def complete_success(self, result: str | None) -> None:
247+
"""Complete execution successfully (DecisionType.COMPLETE_WORKFLOW_EXECUTION)."""
190248
self.result = DurableExecutionInvocationOutput(
191249
status=InvocationStatus.SUCCEEDED, result=result
192250
)
193251
self.is_complete = True
252+
self.close_status = CloseStatus.COMPLETED
253+
self._end_execution(OperationStatus.SUCCEEDED)
194254

195255
def complete_fail(self, error: ErrorObject) -> None:
256+
"""Complete execution with failure (DecisionType.FAIL_WORKFLOW_EXECUTION)."""
257+
self.result = DurableExecutionInvocationOutput(
258+
status=InvocationStatus.FAILED, error=error
259+
)
260+
self.is_complete = True
261+
self.close_status = CloseStatus.FAILED
262+
self._end_execution(OperationStatus.FAILED)
263+
264+
def complete_timeout(self, error: ErrorObject) -> None:
265+
"""Complete execution with timeout (SWF workflow timeout)."""
196266
self.result = DurableExecutionInvocationOutput(
197267
status=InvocationStatus.FAILED, error=error
198268
)
199269
self.is_complete = True
270+
self.close_status = CloseStatus.TIMED_OUT
271+
self._end_execution(OperationStatus.TIMED_OUT)
272+
273+
def complete_stopped(self, error: ErrorObject) -> None:
274+
"""Complete execution as terminated (TerminateWorkflowExecutionV2Request)."""
275+
self.result = DurableExecutionInvocationOutput(
276+
status=InvocationStatus.FAILED, error=error
277+
)
278+
self.is_complete = True
279+
self.close_status = CloseStatus.TERMINATED
280+
self._end_execution(OperationStatus.STOPPED)
200281

201282
def find_operation(self, operation_id: str) -> tuple[int, Operation]:
202283
"""Find operation by ID, return index and operation."""
@@ -327,3 +408,14 @@ def complete_callback_failure(
327408
callback_details=updated_callback_details,
328409
)
329410
return self.operations[index]
411+
412+
def _end_execution(self, status: OperationStatus) -> None:
413+
"""Set the end_timestamp on the main EXECUTION operation when execution completes."""
414+
execution_op: Operation = self.get_operation_execution_started()
415+
if execution_op.operation_type == OperationType.EXECUTION:
416+
with self._state_lock:
417+
self.operations[0] = replace(
418+
execution_op,
419+
status=status,
420+
end_timestamp=datetime.now(UTC),
421+
)

0 commit comments

Comments
 (0)