Skip to content

Commit 65d7f67

Browse files
author
Rares Polenciuc
committed
feat: complete sqlite store and function handler implementation
- Add SQLiteExecutionStore with database persistence and indexing - Implement query system with pagination support - Add BaseExecutionStore with shared query processing logic - Update Executor to use new query system for efficient operations - Complete ListDurableExecutionsByFunctionHandler with proper filtering - Add function name validation and error handling - Add comprehensive test coverage for all implementations - Support concurrent access patterns with proper database handling
1 parent e006e9c commit 65d7f67

File tree

19 files changed

+2036
-311
lines changed

19 files changed

+2036
-311
lines changed

examples/examples-catalog.json

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,39 @@
110110
"ExecutionTimeout": 300
111111
},
112112
"path": "./src/map_operations.py"
113+
},
114+
{
115+
"name": "Invoke Test",
116+
"description": "Test function that invokes another durable function to test LambdaInvoker",
117+
"handler": "invoke_test.handler",
118+
"integration": true,
119+
"durableConfig": {
120+
"RetentionPeriodInDays": 7,
121+
"ExecutionTimeout": 300
122+
},
123+
"path": "./src/invoke_test.py"
124+
},
125+
{
126+
"name": "Helper Function",
127+
"description": "Helper function that can be invoked by other functions",
128+
"handler": "invoke_test.helper_function",
129+
"integration": true,
130+
"durableConfig": {
131+
"RetentionPeriodInDays": 7,
132+
"ExecutionTimeout": 300
133+
},
134+
"path": "./src/invoke_test.py"
135+
},
136+
{
137+
"name": "Kawaii Handler",
138+
"description": "🌸 Kawaii handler that tests context.invoke() by calling HelloWorld ✨",
139+
"handler": "kawaii_handler.lambda_handler",
140+
"integration": true,
141+
"durableConfig": {
142+
"RetentionPeriodInDays": 7,
143+
"ExecutionTimeout": 300
144+
},
145+
"path": "./src/kawaii_handler.py"
113146
}
114147
]
115148
}

examples/template.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,3 +107,12 @@ Resources:
107107
DurableConfig:
108108
RetentionPeriodInDays: 7
109109
ExecutionTimeout: 300
110+
KawaiiHandler:
111+
Type: AWS::Serverless::Function
112+
Properties:
113+
CodeUri: build/
114+
Handler: kawaii_handler.lambda_handler
115+
Description: 🌸 Kawaii handler that tests context.invoke() by calling HelloWorld ✨
116+
DurableConfig:
117+
RetentionPeriodInDays: 7
118+
ExecutionTimeout: 300

src/aws_durable_execution_sdk_python_testing/checkpoint/processors/execution.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ def process(
4545
"There is no error details but EXECUTION checkpoint action is not SUCCEED."
4646
)
4747
)
48+
# All EXECUTION failures go through normal fail path
49+
# Timeout/Stop status is set by executor based on the operation that caused it
4850
notifier.notify_failed(execution_arn=execution_arn, error=error)
4951
# TODO: Svc doesn't actually create checkpoint for EXECUTION. might have to for localrunner though.
5052
return None

src/aws_durable_execution_sdk_python_testing/execution.py

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import json
44
from dataclasses import replace
55
from datetime import UTC, datetime
6+
from enum import Enum
67
from threading import Lock
78
from typing import Any
89
from uuid import uuid4
@@ -20,17 +21,37 @@
2021
OperationUpdate,
2122
)
2223

23-
# Import AWS exceptions
2424
from aws_durable_execution_sdk_python_testing.exceptions import (
2525
IllegalStateException,
2626
InvalidParameterValueException,
2727
)
28+
29+
# Import AWS exceptions
2830
from aws_durable_execution_sdk_python_testing.model import (
2931
StartDurableExecutionInput,
3032
)
3133
from aws_durable_execution_sdk_python_testing.token import CheckpointToken
3234

3335

36+
class CloseStatus(Enum):
37+
"""Close status for completed executions (mimics backend SWF CloseStatus)."""
38+
39+
COMPLETED = "COMPLETED"
40+
FAILED = "FAILED"
41+
TERMINATED = "TERMINATED"
42+
TIMED_OUT = "TIMED_OUT"
43+
44+
45+
class ExecutionStatus(Enum):
46+
"""Execution status for API responses (mimics backend ExecutionStatus)."""
47+
48+
RUNNING = "RUNNING"
49+
SUCCEEDED = "SUCCEEDED"
50+
FAILED = "FAILED"
51+
STOPPED = "STOPPED"
52+
TIMED_OUT = "TIMED_OUT"
53+
54+
3455
class Execution:
3556
"""Execution state."""
3657

@@ -52,12 +73,37 @@ def __init__(
5273
self.is_complete: bool = False
5374
self.result: DurableExecutionInvocationOutput | None = None
5475
self.consecutive_failed_invocation_attempts: int = 0
76+
self.close_status: CloseStatus | None = (
77+
None # Track close status like backend SWF
78+
)
5579

5680
@property
5781
def token_sequence(self) -> int:
5882
"""Get current token sequence value."""
5983
return self._token_sequence
6084

85+
@property
86+
def status(self) -> str:
87+
"""Get execution status string (mimics backend ExecutionStatusConverter)."""
88+
if not self.is_complete:
89+
return ExecutionStatus.RUNNING.value
90+
91+
if not self.close_status:
92+
return ExecutionStatus.FAILED.value
93+
94+
# Convert CloseStatus to ExecutionStatus (like backend ExecutionStatusConverter)
95+
match self.close_status:
96+
case CloseStatus.COMPLETED:
97+
return ExecutionStatus.SUCCEEDED.value
98+
case CloseStatus.FAILED:
99+
return ExecutionStatus.FAILED.value
100+
case CloseStatus.TERMINATED:
101+
return ExecutionStatus.STOPPED.value
102+
case CloseStatus.TIMED_OUT:
103+
return ExecutionStatus.TIMED_OUT.value
104+
case _:
105+
return ExecutionStatus.FAILED.value
106+
61107
@staticmethod
62108
def new(input: StartDurableExecutionInput) -> Execution: # noqa: A002
63109
# make a nicer arn
@@ -79,6 +125,7 @@ def to_dict(self) -> dict[str, Any]:
79125
"IsComplete": self.is_complete,
80126
"Result": self.result.to_dict() if self.result else None,
81127
"ConsecutiveFailedInvocationAttempts": self.consecutive_failed_invocation_attempts,
128+
"CloseStatus": self.close_status.value if self.close_status else None,
82129
}
83130

84131
@classmethod
@@ -112,6 +159,10 @@ def from_dict(cls, data: dict[str, Any]) -> Execution:
112159
execution.consecutive_failed_invocation_attempts = data[
113160
"ConsecutiveFailedInvocationAttempts"
114161
]
162+
close_status_str = data.get("CloseStatus")
163+
execution.close_status = (
164+
CloseStatus(close_status_str) if close_status_str else None
165+
)
115166

116167
return execution
117168

@@ -184,16 +235,36 @@ def has_pending_operations(self, execution: Execution) -> bool:
184235
return False
185236

186237
def complete_success(self, result: str | None) -> None:
238+
"""Complete execution successfully (DecisionType.COMPLETE_WORKFLOW_EXECUTION)."""
187239
self.result = DurableExecutionInvocationOutput(
188240
status=InvocationStatus.SUCCEEDED, result=result
189241
)
190242
self.is_complete = True
243+
self.close_status = CloseStatus.COMPLETED
191244

192245
def complete_fail(self, error: ErrorObject) -> None:
246+
"""Complete execution with failure (DecisionType.FAIL_WORKFLOW_EXECUTION)."""
247+
self.result = DurableExecutionInvocationOutput(
248+
status=InvocationStatus.FAILED, error=error
249+
)
250+
self.is_complete = True
251+
self.close_status = CloseStatus.FAILED
252+
253+
def complete_timeout(self, error: ErrorObject) -> None:
254+
"""Complete execution with timeout (SWF workflow timeout)."""
255+
self.result = DurableExecutionInvocationOutput(
256+
status=InvocationStatus.FAILED, error=error
257+
)
258+
self.is_complete = True
259+
self.close_status = CloseStatus.TIMED_OUT
260+
261+
def complete_stopped(self, error: ErrorObject) -> None:
262+
"""Complete execution as terminated (TerminateWorkflowExecutionV2Request)."""
193263
self.result = DurableExecutionInvocationOutput(
194264
status=InvocationStatus.FAILED, error=error
195265
)
196266
self.is_complete = True
267+
self.close_status = CloseStatus.TERMINATED
197268

198269
def find_operation(self, operation_id: str) -> tuple[int, Operation]:
199270
"""Find operation by ID, return index and operation."""

0 commit comments

Comments
 (0)