Skip to content

Commit 2567d12

Browse files
author
Rares Polenciuc
committed
feat: add execution history event generation and pagination
- Add event_conversion module to transform operations into history events - Implement operation_to_started_event and operation_to_finished_event - Add generate_execution_events for complete execution history - Update executor get_execution_history with proper cursor-based pagination - Support reverse_order and include_execution_data options - Add comprehensive test coverage for event conversion and pagination
1 parent 155070c commit 2567d12

File tree

5 files changed

+2208
-26
lines changed

5 files changed

+2208
-26
lines changed

src/aws_durable_execution_sdk_python_testing/executor.py

Lines changed: 89 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,12 @@
1111
DurableExecutionInvocationOutput,
1212
InvocationStatus,
1313
)
14-
from aws_durable_execution_sdk_python.lambda_service import ErrorObject, OperationUpdate
14+
from aws_durable_execution_sdk_python.lambda_service import (
15+
ErrorObject,
16+
Operation,
17+
OperationStatus,
18+
OperationUpdate,
19+
)
1520

1621
from aws_durable_execution_sdk_python_testing.exceptions import (
1722
ExecutionAlreadyStartedException,
@@ -52,10 +57,18 @@
5257

5358
logger = logging.getLogger(__name__)
5459

60+
TERMINAL_STATUSES: set[OperationStatus] = {
61+
OperationStatus.SUCCEEDED,
62+
OperationStatus.FAILED,
63+
OperationStatus.TIMED_OUT,
64+
OperationStatus.STOPPED,
65+
OperationStatus.CANCELLED,
66+
}
67+
5568

5669
class Executor(ExecutionObserver):
57-
MAX_CONSECUTIVE_FAILED_ATTEMPTS = 5
58-
RETRY_BACKOFF_SECONDS = 5
70+
MAX_CONSECUTIVE_FAILED_ATTEMPTS: int = 5
71+
RETRY_BACKOFF_SECONDS: int = 5
5972

6073
def __init__(self, store: ExecutionStore, scheduler: Scheduler, invoker: Invoker):
6174
self._store = store
@@ -393,20 +406,18 @@ def get_execution_state(
393406
def get_execution_history(
394407
self,
395408
execution_arn: str,
396-
include_execution_data: bool = False, # noqa: FBT001, FBT002, ARG002
397-
reverse_order: bool = False, # noqa: FBT001, FBT002, ARG002
409+
include_execution_data: bool = False, # noqa: FBT001, FBT002
410+
reverse_order: bool = False, # noqa: FBT001, FBT002
398411
marker: str | None = None,
399412
max_items: int | None = None,
400413
) -> GetDurableExecutionHistoryResponse:
401414
"""Get execution history with events.
402415
403-
TODO: incomplete
404-
405416
Args:
406417
execution_arn: The execution ARN
407418
include_execution_data: Whether to include execution data in events
408419
reverse_order: Return events in reverse chronological order
409-
marker: Pagination marker
420+
marker: Pagination marker (event_id)
410421
max_items: Maximum items to return
411422
412423
Returns:
@@ -415,30 +426,85 @@ def get_execution_history(
415426
Raises:
416427
ResourceNotFoundException: If execution does not exist
417428
"""
418-
execution = self.get_execution(execution_arn) # noqa: F841
419-
420-
# Convert operations to events
421-
# This is a simplified implementation - real implementation would need
422-
# to generate proper event history from operations
423-
events: list[HistoryEvent] = []
429+
execution: Execution = self.get_execution(execution_arn)
430+
431+
# Generate events
432+
all_events: list[HistoryEvent] = []
433+
event_id: int = 1
434+
ops: list[Operation] = getattr(execution, "operations", [])
435+
for op in ops:
436+
if op.start_timestamp is not None:
437+
started = HistoryEvent.from_operation_started(
438+
op, event_id, include_execution_data
439+
)
440+
all_events.append(started)
441+
event_id += 1
442+
if op.end_timestamp is not None and op.status in TERMINAL_STATUSES:
443+
finished = HistoryEvent.from_operation_finished(
444+
op, event_id, include_execution_data
445+
)
446+
all_events.append(finished)
447+
event_id += 1
424448

425-
# Apply pagination
449+
# Apply cursor-based pagination
426450
if max_items is None:
427451
max_items = 100
428452

429-
start_index = 0
453+
# Handle pagination marker
454+
start_index: int = 0
430455
if marker:
431456
try:
432-
start_index = int(marker)
457+
marker_event_id: int = int(marker)
458+
# Find the index of the first event with event_id >= marker
459+
start_index = next(
460+
(
461+
i
462+
for i, e in enumerate(all_events)
463+
if e.event_id >= marker_event_id
464+
),
465+
len(all_events),
466+
)
433467
except ValueError:
434468
start_index = 0
435469

436-
end_index = start_index + max_items
437-
paginated_events = events[start_index:end_index]
438-
439-
next_marker = None
440-
if end_index < len(events):
441-
next_marker = str(end_index)
470+
# Apply reverse order after pagination setup
471+
if reverse_order:
472+
all_events.reverse()
473+
# Adjust start_index for reversed order
474+
if marker:
475+
try:
476+
marker_event_id = int(marker)
477+
# In reverse order, we want events with event_id < marker
478+
start_index = next(
479+
(
480+
i
481+
for i, e in enumerate(all_events)
482+
if e.event_id < marker_event_id
483+
),
484+
len(all_events),
485+
)
486+
except ValueError:
487+
start_index = 0
488+
489+
# Get paginated events
490+
end_index: int = start_index + max_items
491+
paginated_events: list[HistoryEvent] = all_events[start_index:end_index]
492+
493+
# Generate next marker
494+
next_marker: str | None = None
495+
if end_index < len(all_events):
496+
if reverse_order:
497+
# Next marker is the event_id of the last returned event
498+
next_marker = (
499+
str(paginated_events[-1].event_id) if paginated_events else None
500+
)
501+
else:
502+
# Next marker is the event_id of the next event after the last returned
503+
next_marker = (
504+
str(all_events[end_index].event_id)
505+
if end_index < len(all_events)
506+
else None
507+
)
442508

443509
return GetDurableExecutionHistoryResponse(
444510
events=paginated_events, next_marker=next_marker

0 commit comments

Comments
 (0)