Skip to content

Commit ef60cac

Browse files
author
Rares Polenciuc
committed
feat: add execution history event generation and pagination
- Add event_conversion module to transform operations into history events - Implement operation_to_started_event and operation_to_finished_event - Add generate_execution_events for complete execution history - Update executor get_execution_history with proper cursor-based pagination - Support reverse_order and include_execution_data options - Add comprehensive test coverage for event conversion and pagination
1 parent 402a348 commit ef60cac

File tree

6 files changed

+2359
-131
lines changed

6 files changed

+2359
-131
lines changed

src/aws_durable_execution_sdk_python_testing/executor.py

Lines changed: 108 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,13 @@
1111
DurableExecutionInvocationOutput,
1212
InvocationStatus,
1313
)
14-
from aws_durable_execution_sdk_python.lambda_service import ErrorObject, OperationUpdate
14+
from aws_durable_execution_sdk_python.lambda_service import (
15+
ErrorObject,
16+
Operation,
17+
OperationUpdate,
18+
OperationStatus,
19+
OperationType,
20+
)
1521

1622
from aws_durable_execution_sdk_python_testing.exceptions import (
1723
ExecutionAlreadyStartedException,
@@ -33,6 +39,8 @@
3339
StartDurableExecutionInput,
3440
StartDurableExecutionOutput,
3541
StopDurableExecutionResponse,
42+
TERMINAL_STATUSES,
43+
EventCreationContext,
3644
)
3745
from aws_durable_execution_sdk_python_testing.model import (
3846
Event as HistoryEvent,
@@ -54,8 +62,8 @@
5462

5563

5664
class Executor(ExecutionObserver):
57-
MAX_CONSECUTIVE_FAILED_ATTEMPTS = 5
58-
RETRY_BACKOFF_SECONDS = 5
65+
MAX_CONSECUTIVE_FAILED_ATTEMPTS: int = 5
66+
RETRY_BACKOFF_SECONDS: int = 5
5967

6068
def __init__(self, store: ExecutionStore, scheduler: Scheduler, invoker: Invoker):
6169
self._store = store
@@ -393,20 +401,18 @@ def get_execution_state(
393401
def get_execution_history(
394402
self,
395403
execution_arn: str,
396-
include_execution_data: bool = False, # noqa: FBT001, FBT002, ARG002
397-
reverse_order: bool = False, # noqa: FBT001, FBT002, ARG002
404+
include_execution_data: bool = False, # noqa: FBT001, FBT002
405+
reverse_order: bool = False, # noqa: FBT001, FBT002
398406
marker: str | None = None,
399407
max_items: int | None = None,
400408
) -> GetDurableExecutionHistoryResponse:
401409
"""Get execution history with events.
402410
403-
TODO: incomplete
404-
405411
Args:
406412
execution_arn: The execution ARN
407413
include_execution_data: Whether to include execution data in events
408414
reverse_order: Return events in reverse chronological order
409-
marker: Pagination marker
415+
marker: Pagination marker (event_id)
410416
max_items: Maximum items to return
411417
412418
Returns:
@@ -415,30 +421,110 @@ def get_execution_history(
415421
Raises:
416422
ResourceNotFoundException: If execution does not exist
417423
"""
418-
execution = self.get_execution(execution_arn) # noqa: F841
424+
execution: Execution = self.get_execution(execution_arn)
425+
426+
# Generate events
427+
all_events: list[HistoryEvent] = []
428+
event_id: int = 1
429+
ops: list[Operation] = execution.operations
430+
updates: list[OperationUpdate] = execution.updates
431+
updates_dict: dict[str, OperationUpdate] = {u.operation_id: u for u in updates}
432+
durable_execution_arn: str = execution.durable_execution_arn
433+
for op in ops:
434+
# Step Operation can have PENDING status -> not included in History
435+
operation_update: OperationUpdate | None = updates_dict.get(
436+
op.operation_id, None
437+
)
419438

420-
# Convert operations to events
421-
# This is a simplified implementation - real implementation would need
422-
# to generate proper event history from operations
423-
events: list[HistoryEvent] = []
439+
if op.status is OperationStatus.PENDING:
440+
if (
441+
op.operation_type is not OperationType.CHAINED_INVOKE
442+
or op.start_timestamp is None
443+
):
444+
continue
445+
context: EventCreationContext = EventCreationContext(
446+
op,
447+
event_id,
448+
execution.durable_execution_arn,
449+
execution.start_input,
450+
execution.result,
451+
operation_update,
452+
include_execution_data,
453+
)
454+
pending = HistoryEvent.create_chained_invoke_event_pending(context)
455+
all_events.append(pending)
456+
event_id += 1
457+
if op.start_timestamp is not None:
458+
context: EventCreationContext = EventCreationContext(
459+
op,
460+
event_id,
461+
durable_execution_arn,
462+
execution.start_input,
463+
execution.result,
464+
operation_update,
465+
include_execution_data,
466+
)
467+
started = HistoryEvent.create_event_started(context)
468+
all_events.append(started)
469+
event_id += 1
470+
if op.end_timestamp is not None and op.status in TERMINAL_STATUSES:
471+
context: EventCreationContext = EventCreationContext(
472+
op,
473+
event_id,
474+
durable_execution_arn,
475+
execution.start_input,
476+
execution.result,
477+
operation_update,
478+
include_execution_data,
479+
)
480+
finished = HistoryEvent.create_event_terminated(context)
481+
all_events.append(finished)
482+
event_id += 1
424483

425-
# Apply pagination
484+
# Apply cursor-based pagination
426485
if max_items is None:
427486
max_items = 100
428487

429-
start_index = 0
488+
# Handle pagination marker
489+
if reverse_order:
490+
all_events.reverse()
491+
start_index: int = 0
430492
if marker:
431493
try:
432-
start_index = int(marker)
494+
marker_event_id: int = int(marker)
495+
# Find the index of the first event with event_id >= marker
496+
start_index = len(all_events)
497+
for i, e in enumerate(all_events):
498+
is_valid_page_start: bool = (
499+
e.event_id < marker_event_id
500+
if reverse_order
501+
else e.event_id >= marker_event_id
502+
)
503+
if is_valid_page_start:
504+
start_index = i
505+
break
433506
except ValueError:
434507
start_index = 0
435508

436-
end_index = start_index + max_items
437-
paginated_events = events[start_index:end_index]
438-
439-
next_marker = None
440-
if end_index < len(events):
441-
next_marker = str(end_index)
509+
# Get paginated events
510+
end_index: int = start_index + max_items
511+
paginated_events: list[HistoryEvent] = all_events[start_index:end_index]
512+
513+
# Generate next marker
514+
next_marker: str | None = None
515+
if end_index < len(all_events):
516+
if reverse_order:
517+
# Next marker is the event_id of the last returned event
518+
next_marker = (
519+
str(paginated_events[-1].event_id) if paginated_events else None
520+
)
521+
else:
522+
# Next marker is the event_id of the next event after the last returned
523+
next_marker = (
524+
str(all_events[end_index].event_id)
525+
if end_index < len(all_events)
526+
else None
527+
)
442528

443529
return GetDurableExecutionHistoryResponse(
444530
events=paginated_events, next_marker=next_marker

0 commit comments

Comments
 (0)