Skip to content

Commit 506acc3

Browse files
author
Rares Polenciuc
committed
feat: execution state pagination and token validation
- Add paging logic in checkpoint processor with next_marker support - Implement checkpoint token validation - Add token expiration checking with error responses - Handle missing token cases with context-appropriate validation - Add pagination metadata to responses with configurable max_items - Add test coverage for all validation scenarios
1 parent c6f8acf commit 506acc3

File tree

7 files changed

+358
-54
lines changed

7 files changed

+358
-54
lines changed

src/aws_durable_execution_sdk_python_testing/checkpoint/processor.py

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
CheckpointUpdatedExecutionState,
1010
OperationUpdate,
1111
StateOutput,
12+
Operation,
1213
)
1314

1415
from aws_durable_execution_sdk_python_testing.checkpoint.transformer import (
@@ -88,14 +89,37 @@ def process_checkpoint(
8889
def get_execution_state(
8990
self,
9091
checkpoint_token: str,
91-
next_marker: str, # noqa: ARG002
92-
max_items: int = 1000, # noqa: ARG002
92+
next_marker: str | None = None,
93+
max_items: int = 1000,
9394
) -> StateOutput:
94-
"""Get current execution state."""
95+
"""Get current execution state with batched checkpoint token validation and pagination."""
96+
if not checkpoint_token:
97+
msg: str = "Checkpoint token is required"
98+
raise InvalidParameterValueException(msg)
99+
95100
token: CheckpointToken = CheckpointToken.from_str(checkpoint_token)
96101
execution: Execution = self._store.load(token.execution_arn)
102+
execution.validate_checkpoint_token(checkpoint_token)
103+
104+
# Get all operations
105+
all_operations: list[Operation] = execution.get_navigable_operations()
106+
107+
# Apply pagination
108+
start_index: int = 0
109+
if next_marker:
110+
try:
111+
start_index = int(next_marker)
112+
except ValueError:
113+
start_index = 0
114+
115+
end_index: int = start_index + max_items
116+
paginated_operations: list[Operation] = all_operations[start_index:end_index]
117+
118+
# Determine next marker
119+
next_marker_result: str | None = (
120+
str(end_index) if end_index < len(all_operations) else None
121+
)
97122

98-
# TODO: paging when size or max
99123
return StateOutput(
100-
operations=execution.get_navigable_operations(), next_marker=None
124+
operations=paginated_operations, next_marker=next_marker_result
101125
)

src/aws_durable_execution_sdk_python_testing/checkpoint/processors/execution.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,8 @@ def process(
3838
)
3939
case _:
4040
# intentional. actual service will fail any EXECUTION update that is not SUCCEED.
41-
error = (
42-
update.error
43-
if update.error
44-
else ErrorObject.from_message(
45-
"There is no error details but EXECUTION checkpoint action is not SUCCEED."
46-
)
41+
error = update.error or ErrorObject.from_message(
42+
"There is no error details but EXECUTION checkpoint action is not SUCCEED."
4743
)
4844
# All EXECUTION failures go through normal fail path
4945
# Timeout/Stop status is set by executor based on the operation that caused it

src/aws_durable_execution_sdk_python_testing/checkpoint/transformer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def __init__(
5555
self,
5656
processors: MutableMapping[OperationType, OperationProcessor] | None = None,
5757
):
58-
self.processors = processors if processors else self._DEFAULT_PROCESSORS
58+
self.processors = processors or self._DEFAULT_PROCESSORS
5959

6060
def process_updates(
6161
self,

src/aws_durable_execution_sdk_python_testing/execution.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,35 @@ def get_navigable_operations(self) -> list[Operation]:
212212
"""Get list of operations, but exclude child operations where the parent has already completed."""
213213
return self.operations
214214

215+
def validate_checkpoint_token(
216+
self,
217+
checkpoint_token: str | None,
218+
checkpoint_required: bool = True, # noqa: FBT001, FBT002
219+
checkpoint_required_msg: str | None = None,
220+
) -> None:
221+
"""Validate checkpoint token against this execution."""
222+
if not checkpoint_token and checkpoint_required:
223+
msg: str = checkpoint_required_msg or "Checkpoint token is required"
224+
raise InvalidParameterValueException(msg)
225+
226+
if not checkpoint_token:
227+
return # No token provided and not required
228+
229+
token: CheckpointToken = CheckpointToken.from_str(checkpoint_token)
230+
if token.execution_arn != self.durable_execution_arn:
231+
msg = "Checkpoint token does not match execution ARN"
232+
raise InvalidParameterValueException(msg)
233+
234+
if self.is_complete or token.token_sequence > self.token_sequence:
235+
msg = "Invalid or expired checkpoint token"
236+
raise InvalidParameterValueException(msg)
237+
238+
# Check if token has been used
239+
token_str: str = token.to_str()
240+
if token_str not in self.used_tokens:
241+
msg = f"Invalid checkpoint token: {token_str}"
242+
raise InvalidParameterValueException(msg)
243+
215244
def get_assertable_operations(self) -> list[Operation]:
216245
"""Get list of operations, but exclude the EXECUTION operations"""
217246
# TODO: this excludes EXECUTION at start, but can there be an EXECUTION at the end if there was a checkpoint with large payload?

src/aws_durable_execution_sdk_python_testing/executor.py

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151
if TYPE_CHECKING:
5252
from collections.abc import Awaitable, Callable
5353

54+
from aws_durable_execution_sdk_python.lambda_service import Operation
55+
5456
from aws_durable_execution_sdk_python_testing.invoker import Invoker
5557
from aws_durable_execution_sdk_python_testing.scheduler import Event, Scheduler
5658
from aws_durable_execution_sdk_python_testing.stores.base import ExecutionStore
@@ -319,32 +321,33 @@ def get_execution_state(
319321
ResourceNotFoundException: If execution does not exist
320322
InvalidParameterValueException: If checkpoint token is invalid
321323
"""
322-
execution = self.get_execution(execution_arn)
323-
324-
# TODO: Validate checkpoint token if provided
325-
if checkpoint_token and checkpoint_token not in execution.used_tokens:
326-
msg: str = f"Invalid checkpoint token: {checkpoint_token}"
327-
raise InvalidParameterValueException(msg)
324+
execution: Execution = self.get_execution(execution_arn)
325+
checkpoint_required_msg: str = (
326+
"Checkpoint token is required for paginated requests on active executions"
327+
)
328+
checkpoint_required: bool = not execution.is_complete and marker is not None
329+
execution.validate_checkpoint_token(
330+
checkpoint_token, checkpoint_required, checkpoint_required_msg
331+
)
328332

329333
# Get operations (excluding the initial EXECUTION operation for state)
330-
operations = execution.get_assertable_operations()
334+
operations: list[Operation] = execution.get_assertable_operations()
331335

332336
# Apply pagination
333337
if max_items is None:
334338
max_items = 100
335339

336-
# Simple pagination - in real implementation would need proper marker handling
337-
start_index = 0
340+
start_index: int = 0
338341
if marker:
339342
try:
340343
start_index = int(marker)
341344
except ValueError:
342345
start_index = 0
343346

344-
end_index = start_index + max_items
345-
paginated_operations = operations[start_index:end_index]
347+
end_index: int = start_index + max_items
348+
paginated_operations: list[Operation] = operations[start_index:end_index]
346349

347-
next_marker = None
350+
next_marker: str | None = None
348351
if end_index < len(operations):
349352
next_marker = str(end_index)
350353

@@ -482,11 +485,11 @@ def checkpoint_execution(
482485
InvalidParameterValueException: If checkpoint token is invalid
483486
"""
484487
execution = self.get_execution(execution_arn)
485-
486-
# Validate checkpoint token
487-
if checkpoint_token not in execution.used_tokens:
488-
msg: str = f"Invalid checkpoint token: {checkpoint_token}"
489-
raise InvalidParameterValueException(msg)
488+
execution.validate_checkpoint_token(
489+
checkpoint_token,
490+
checkpoint_required=True,
491+
checkpoint_required_msg="Checkpoint token is required for checkpoint operations",
492+
)
490493

491494
# TODO: Process operation updates using the checkpoint processor
492495
# This would integrate with the existing checkpoint processing pipeline

0 commit comments

Comments
 (0)