Skip to content

Commit 1df26ce

Browse files
author
Rares Polenciuc
committed
feat: add execution history event generation and pagination
- Add event_conversion module to transform operations into history events - Implement operation_to_started_event and operation_to_finished_event - Add generate_execution_events for complete execution history - Update executor get_execution_history with proper cursor-based pagination - Support reverse_order and include_execution_data options - Add comprehensive test coverage for event conversion and pagination
1 parent 402a348 commit 1df26ce

File tree

6 files changed

+2237
-119
lines changed

6 files changed

+2237
-119
lines changed

src/aws_durable_execution_sdk_python_testing/executor.py

Lines changed: 75 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@
1111
DurableExecutionInvocationOutput,
1212
InvocationStatus,
1313
)
14-
from aws_durable_execution_sdk_python.lambda_service import ErrorObject, OperationUpdate
14+
from aws_durable_execution_sdk_python.lambda_service import (
15+
ErrorObject,
16+
Operation,
17+
OperationUpdate,
18+
)
1519

1620
from aws_durable_execution_sdk_python_testing.exceptions import (
1721
ExecutionAlreadyStartedException,
@@ -33,6 +37,7 @@
3337
StartDurableExecutionInput,
3438
StartDurableExecutionOutput,
3539
StopDurableExecutionResponse,
40+
TERMINAL_STATUSES,
3641
)
3742
from aws_durable_execution_sdk_python_testing.model import (
3843
Event as HistoryEvent,
@@ -54,8 +59,8 @@
5459

5560

5661
class Executor(ExecutionObserver):
57-
MAX_CONSECUTIVE_FAILED_ATTEMPTS = 5
58-
RETRY_BACKOFF_SECONDS = 5
62+
MAX_CONSECUTIVE_FAILED_ATTEMPTS: int = 5
63+
RETRY_BACKOFF_SECONDS: int = 5
5964

6065
def __init__(self, store: ExecutionStore, scheduler: Scheduler, invoker: Invoker):
6166
self._store = store
@@ -393,20 +398,18 @@ def get_execution_state(
393398
def get_execution_history(
394399
self,
395400
execution_arn: str,
396-
include_execution_data: bool = False, # noqa: FBT001, FBT002, ARG002
397-
reverse_order: bool = False, # noqa: FBT001, FBT002, ARG002
401+
include_execution_data: bool = False, # noqa: FBT001, FBT002
402+
reverse_order: bool = False, # noqa: FBT001, FBT002
398403
marker: str | None = None,
399404
max_items: int | None = None,
400405
) -> GetDurableExecutionHistoryResponse:
401406
"""Get execution history with events.
402407
403-
TODO: incomplete
404-
405408
Args:
406409
execution_arn: The execution ARN
407410
include_execution_data: Whether to include execution data in events
408411
reverse_order: Return events in reverse chronological order
409-
marker: Pagination marker
412+
marker: Pagination marker (event_id)
410413
max_items: Maximum items to return
411414
412415
Returns:
@@ -415,30 +418,79 @@ def get_execution_history(
415418
Raises:
416419
ResourceNotFoundException: If execution does not exist
417420
"""
418-
execution = self.get_execution(execution_arn) # noqa: F841
419-
420-
# Convert operations to events
421-
# This is a simplified implementation - real implementation would need
422-
# to generate proper event history from operations
423-
events: list[HistoryEvent] = []
421+
execution: Execution = self.get_execution(execution_arn)
422+
423+
# Generate events
424+
all_events: list[HistoryEvent] = []
425+
event_id: int = 1
426+
ops: list[Operation] = execution.operations
427+
for op in ops:
428+
if op.start_timestamp is not None:
429+
started = HistoryEvent.from_operation_started(
430+
op, event_id, include_execution_data
431+
)
432+
all_events.append(started)
433+
event_id += 1
434+
if op.end_timestamp is not None and op.status in TERMINAL_STATUSES:
435+
finished = HistoryEvent.from_operation_finished(
436+
op, event_id, include_execution_data
437+
)
438+
all_events.append(finished)
439+
event_id += 1
424440

425-
# Apply pagination
441+
# Apply cursor-based pagination
426442
if max_items is None:
427443
max_items = 100
428444

429-
start_index = 0
445+
# Handle pagination marker
446+
start_index: int = 0
430447
if marker:
431448
try:
432-
start_index = int(marker)
449+
marker_event_id: int = int(marker)
450+
# Find the index of the first event with event_id >= marker
451+
start_index = len(all_events)
452+
for i, e in enumerate(all_events):
453+
if e.event_id >= marker_event_id:
454+
start_index = i
455+
break
433456
except ValueError:
434457
start_index = 0
435458

436-
end_index = start_index + max_items
437-
paginated_events = events[start_index:end_index]
438-
439-
next_marker = None
440-
if end_index < len(events):
441-
next_marker = str(end_index)
459+
# Apply reverse order after pagination setup
460+
if reverse_order:
461+
all_events.reverse()
462+
# Adjust start_index for reversed order
463+
if marker:
464+
try:
465+
marker_event_id = int(marker)
466+
# In reverse order, we want events with event_id < marker
467+
start_index = len(all_events)
468+
for i, e in enumerate(all_events):
469+
if e.event_id < marker_event_id:
470+
start_index = i
471+
break
472+
except ValueError:
473+
start_index = 0
474+
475+
# Get paginated events
476+
end_index: int = start_index + max_items
477+
paginated_events: list[HistoryEvent] = all_events[start_index:end_index]
478+
479+
# Generate next marker
480+
next_marker: str | None = None
481+
if end_index < len(all_events):
482+
if reverse_order:
483+
# Next marker is the event_id of the last returned event
484+
next_marker = (
485+
str(paginated_events[-1].event_id) if paginated_events else None
486+
)
487+
else:
488+
# Next marker is the event_id of the next event after the last returned
489+
next_marker = (
490+
str(all_events[end_index].event_id)
491+
if end_index < len(all_events)
492+
else None
493+
)
442494

443495
return GetDurableExecutionHistoryResponse(
444496
events=paginated_events, next_marker=next_marker

0 commit comments

Comments
 (0)