Skip to content

Commit 7ecc4f6

Browse files
author
Rares Polenciuc
committed
feat: execution state pagination and token validation
- Add paging logic in checkpoint processor with next_marker support - Implement checkpoint token validation - Add token expiration checking with error responses - Handle missing token cases with context-appropriate validation - Add pagination metadata to responses with configurable max_items - Add test coverage for all validation scenarios
1 parent 6954160 commit 7ecc4f6

File tree

7 files changed

+345
-54
lines changed

7 files changed

+345
-54
lines changed

src/aws_durable_execution_sdk_python_testing/checkpoint/processor.py

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,14 +88,35 @@ def process_checkpoint(
8888
def get_execution_state(
8989
self,
9090
checkpoint_token: str,
91-
next_marker: str, # noqa: ARG002
92-
max_items: int = 1000, # noqa: ARG002
91+
next_marker: str | None = None,
92+
max_items: int = 1000,
9393
) -> StateOutput:
94-
"""Get current execution state."""
94+
"""Get current execution state with batched checkpoint token validation and pagination."""
95+
if not checkpoint_token:
96+
msg: str = "Checkpoint token is required"
97+
raise InvalidParameterValueException(msg)
98+
9599
token: CheckpointToken = CheckpointToken.from_str(checkpoint_token)
96100
execution: Execution = self._store.load(token.execution_arn)
101+
execution.validate_checkpoint_token(checkpoint_token)
102+
103+
# Get all operations
104+
all_operations = execution.get_navigable_operations()
105+
106+
# Apply pagination
107+
start_index = 0
108+
if next_marker:
109+
try:
110+
start_index = int(next_marker)
111+
except ValueError:
112+
start_index = 0
113+
114+
end_index = start_index + max_items
115+
paginated_operations = all_operations[start_index:end_index]
116+
117+
# Determine next marker
118+
next_marker_result = str(end_index) if end_index < len(all_operations) else None
97119

98-
# TODO: paging when size or max
99120
return StateOutput(
100-
operations=execution.get_navigable_operations(), next_marker=None
121+
operations=paginated_operations, next_marker=next_marker_result
101122
)

src/aws_durable_execution_sdk_python_testing/checkpoint/processors/execution.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,8 @@ def process(
3838
)
3939
case _:
4040
# intentional. actual service will fail any EXECUTION update that is not SUCCEED.
41-
error = (
42-
update.error
43-
if update.error
44-
else ErrorObject.from_message(
45-
"There is no error details but EXECUTION checkpoint action is not SUCCEED."
46-
)
41+
error = update.error or ErrorObject.from_message(
42+
"There is no error details but EXECUTION checkpoint action is not SUCCEED."
4743
)
4844
notifier.notify_failed(execution_arn=execution_arn, error=error)
4945
# TODO: Svc doesn't actually create checkpoint for EXECUTION. might have to for localrunner though.

src/aws_durable_execution_sdk_python_testing/checkpoint/transformer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def __init__(
5555
self,
5656
processors: MutableMapping[OperationType, OperationProcessor] | None = None,
5757
):
58-
self.processors = processors if processors else self._DEFAULT_PROCESSORS
58+
self.processors = processors or self._DEFAULT_PROCESSORS
5959

6060
def process_updates(
6161
self,

src/aws_durable_execution_sdk_python_testing/execution.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,35 @@ def get_navigable_operations(self) -> list[Operation]:
159159
"""Get list of operations, but exclude child operations where the parent has already completed."""
160160
return self.operations
161161

162+
def validate_checkpoint_token(
163+
self,
164+
checkpoint_token: str | None,
165+
checkpoint_required: bool = True, # noqa: FBT001, FBT002
166+
checkpoint_required_msg: str | None = None,
167+
) -> None:
168+
"""Validate checkpoint token against this execution."""
169+
if not checkpoint_token and checkpoint_required:
170+
msg: str = checkpoint_required_msg or "Checkpoint token is required"
171+
raise InvalidParameterValueException(msg)
172+
173+
if not checkpoint_token:
174+
return # No token provided and not required
175+
176+
token: CheckpointToken = CheckpointToken.from_str(checkpoint_token)
177+
if token.execution_arn != self.durable_execution_arn:
178+
msg = "Checkpoint token does not match execution ARN"
179+
raise InvalidParameterValueException(msg)
180+
181+
if self.is_complete or token.token_sequence > self.token_sequence:
182+
msg = "Invalid or expired checkpoint token"
183+
raise InvalidParameterValueException(msg)
184+
185+
# Check if token has been used
186+
token_str: str = token.to_str()
187+
if token_str not in self.used_tokens:
188+
msg = f"Invalid checkpoint token: {token_str}"
189+
raise InvalidParameterValueException(msg)
190+
162191
def get_assertable_operations(self) -> list[Operation]:
163192
"""Get list of operations, but exclude the EXECUTION operations"""
164193
# TODO: this excludes EXECUTION at start, but can there be an EXECUTION at the end if there was a checkpoint with large payload?

src/aws_durable_execution_sdk_python_testing/executor.py

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
if TYPE_CHECKING:
4747
from collections.abc import Awaitable, Callable
4848

49+
from aws_durable_execution_sdk_python.lambda_service import Operation
50+
4951
from aws_durable_execution_sdk_python_testing.invoker import Invoker
5052
from aws_durable_execution_sdk_python_testing.scheduler import Event, Scheduler
5153
from aws_durable_execution_sdk_python_testing.stores.base import ExecutionStore
@@ -357,32 +359,33 @@ def get_execution_state(
357359
ResourceNotFoundException: If execution does not exist
358360
InvalidParameterValueException: If checkpoint token is invalid
359361
"""
360-
execution = self.get_execution(execution_arn)
361-
362-
# TODO: Validate checkpoint token if provided
363-
if checkpoint_token and checkpoint_token not in execution.used_tokens:
364-
msg: str = f"Invalid checkpoint token: {checkpoint_token}"
365-
raise InvalidParameterValueException(msg)
362+
execution: Execution = self.get_execution(execution_arn)
363+
checkpoint_required_msg: str = (
364+
"Checkpoint token is required for paginated requests on active executions"
365+
)
366+
checkpoint_required: bool = not execution.is_complete and marker is not None
367+
execution.validate_checkpoint_token(
368+
checkpoint_token, checkpoint_required, checkpoint_required_msg
369+
)
366370

367371
# Get operations (excluding the initial EXECUTION operation for state)
368-
operations = execution.get_assertable_operations()
372+
operations: list[Operation] = execution.get_assertable_operations()
369373

370374
# Apply pagination
371375
if max_items is None:
372376
max_items = 100
373377

374-
# Simple pagination - in real implementation would need proper marker handling
375-
start_index = 0
378+
start_index: int = 0
376379
if marker:
377380
try:
378381
start_index = int(marker)
379382
except ValueError:
380383
start_index = 0
381384

382-
end_index = start_index + max_items
383-
paginated_operations = operations[start_index:end_index]
385+
end_index: int = start_index + max_items
386+
paginated_operations: list[Operation] = operations[start_index:end_index]
384387

385-
next_marker = None
388+
next_marker: str | None = None
386389
if end_index < len(operations):
387390
next_marker = str(end_index)
388391

@@ -467,11 +470,11 @@ def checkpoint_execution(
467470
InvalidParameterValueException: If checkpoint token is invalid
468471
"""
469472
execution = self.get_execution(execution_arn)
470-
471-
# Validate checkpoint token
472-
if checkpoint_token not in execution.used_tokens:
473-
msg: str = f"Invalid checkpoint token: {checkpoint_token}"
474-
raise InvalidParameterValueException(msg)
473+
execution.validate_checkpoint_token(
474+
checkpoint_token,
475+
checkpoint_required=True,
476+
checkpoint_required_msg="Checkpoint token is required for checkpoint operations",
477+
)
475478

476479
# TODO: Process operation updates using the checkpoint processor
477480
# This would integrate with the existing checkpoint processing pipeline

0 commit comments

Comments
 (0)