Skip to content

Commit 337ef77

Browse files
author
Rares Polenciuc
committed
feat: add execution history event generation and pagination
- Add event_conversion module to transform operations into history events - Implement operation_to_started_event and operation_to_finished_event - Add generate_execution_events for complete execution history - Update executor get_execution_history with proper cursor-based pagination - Support reverse_order and include_execution_data options - Add comprehensive test coverage for event conversion and pagination
1 parent a6bdb18 commit 337ef77

File tree

6 files changed

+3114
-158
lines changed

6 files changed

+3114
-158
lines changed

src/aws_durable_execution_sdk_python_testing/executor.py

Lines changed: 108 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,13 @@
1212
DurableExecutionInvocationOutput,
1313
InvocationStatus,
1414
)
15-
from aws_durable_execution_sdk_python.lambda_service import ErrorObject, OperationUpdate
15+
from aws_durable_execution_sdk_python.lambda_service import (
16+
ErrorObject,
17+
Operation,
18+
OperationUpdate,
19+
OperationStatus,
20+
OperationType,
21+
)
1622

1723
from aws_durable_execution_sdk_python_testing.exceptions import (
1824
ExecutionAlreadyStartedException,
@@ -35,6 +41,8 @@
3541
StartDurableExecutionInput,
3642
StartDurableExecutionOutput,
3743
StopDurableExecutionResponse,
44+
TERMINAL_STATUSES,
45+
EventCreationContext,
3846
)
3947
from aws_durable_execution_sdk_python_testing.model import (
4048
Event as HistoryEvent,
@@ -59,8 +67,8 @@
5967

6068

6169
class Executor(ExecutionObserver):
62-
MAX_CONSECUTIVE_FAILED_ATTEMPTS = 5
63-
RETRY_BACKOFF_SECONDS = 5
70+
MAX_CONSECUTIVE_FAILED_ATTEMPTS: int = 5
71+
RETRY_BACKOFF_SECONDS: int = 5
6472

6573
def __init__(
6674
self,
@@ -420,20 +428,18 @@ def get_execution_state(
420428
def get_execution_history(
421429
self,
422430
execution_arn: str,
423-
include_execution_data: bool = False, # noqa: FBT001, FBT002, ARG002
424-
reverse_order: bool = False, # noqa: FBT001, FBT002, ARG002
431+
include_execution_data: bool = False, # noqa: FBT001, FBT002
432+
reverse_order: bool = False, # noqa: FBT001, FBT002
425433
marker: str | None = None,
426434
max_items: int | None = None,
427435
) -> GetDurableExecutionHistoryResponse:
428436
"""Get execution history with events.
429437
430-
TODO: incomplete
431-
432438
Args:
433439
execution_arn: The execution ARN
434440
include_execution_data: Whether to include execution data in events
435441
reverse_order: Return events in reverse chronological order
436-
marker: Pagination marker
442+
marker: Pagination marker (event_id)
437443
max_items: Maximum items to return
438444
439445
Returns:
@@ -442,30 +448,110 @@ def get_execution_history(
442448
Raises:
443449
ResourceNotFoundException: If execution does not exist
444450
"""
445-
execution = self.get_execution(execution_arn) # noqa: F841
451+
execution: Execution = self.get_execution(execution_arn)
452+
453+
# Generate events
454+
all_events: list[HistoryEvent] = []
455+
event_id: int = 1
456+
ops: list[Operation] = execution.operations
457+
updates: list[OperationUpdate] = execution.updates
458+
updates_dict: dict[str, OperationUpdate] = {u.operation_id: u for u in updates}
459+
durable_execution_arn: str = execution.durable_execution_arn
460+
for op in ops:
461+
# Step Operation can have PENDING status -> not included in History
462+
operation_update: OperationUpdate | None = updates_dict.get(
463+
op.operation_id, None
464+
)
446465

447-
# Convert operations to events
448-
# This is a simplified implementation - real implementation would need
449-
# to generate proper event history from operations
450-
events: list[HistoryEvent] = []
466+
if op.status is OperationStatus.PENDING:
467+
if (
468+
op.operation_type is not OperationType.CHAINED_INVOKE
469+
or op.start_timestamp is None
470+
):
471+
continue
472+
context: EventCreationContext = EventCreationContext(
473+
op,
474+
event_id,
475+
durable_execution_arn,
476+
execution.start_input,
477+
execution.result,
478+
operation_update,
479+
include_execution_data,
480+
)
481+
pending = HistoryEvent.create_chained_invoke_event_pending(context)
482+
all_events.append(pending)
483+
event_id += 1
484+
if op.start_timestamp is not None:
485+
context = EventCreationContext(
486+
op,
487+
event_id,
488+
durable_execution_arn,
489+
execution.start_input,
490+
execution.result,
491+
operation_update,
492+
include_execution_data,
493+
)
494+
started = HistoryEvent.create_event_started(context)
495+
all_events.append(started)
496+
event_id += 1
497+
if op.end_timestamp is not None and op.status in TERMINAL_STATUSES:
498+
context = EventCreationContext(
499+
op,
500+
event_id,
501+
durable_execution_arn,
502+
execution.start_input,
503+
execution.result,
504+
operation_update,
505+
include_execution_data,
506+
)
507+
finished = HistoryEvent.create_event_terminated(context)
508+
all_events.append(finished)
509+
event_id += 1
451510

452-
# Apply pagination
511+
# Apply cursor-based pagination
453512
if max_items is None:
454513
max_items = 100
455514

456-
start_index = 0
515+
# Handle pagination marker
516+
if reverse_order:
517+
all_events.reverse()
518+
start_index: int = 0
457519
if marker:
458520
try:
459-
start_index = int(marker)
521+
marker_event_id: int = int(marker)
522+
# Find the index of the first event with event_id >= marker
523+
start_index = len(all_events)
524+
for i, e in enumerate(all_events):
525+
is_valid_page_start: bool = (
526+
e.event_id < marker_event_id
527+
if reverse_order
528+
else e.event_id >= marker_event_id
529+
)
530+
if is_valid_page_start:
531+
start_index = i
532+
break
460533
except ValueError:
461534
start_index = 0
462535

463-
end_index = start_index + max_items
464-
paginated_events = events[start_index:end_index]
465-
466-
next_marker = None
467-
if end_index < len(events):
468-
next_marker = str(end_index)
536+
# Get paginated events
537+
end_index: int = start_index + max_items
538+
paginated_events: list[HistoryEvent] = all_events[start_index:end_index]
539+
540+
# Generate next marker
541+
next_marker: str | None = None
542+
if end_index < len(all_events):
543+
if reverse_order:
544+
# Next marker is the event_id of the last returned event
545+
next_marker = (
546+
str(paginated_events[-1].event_id) if paginated_events else None
547+
)
548+
else:
549+
# Next marker is the event_id of the next event after the last returned
550+
next_marker = (
551+
str(all_events[end_index].event_id)
552+
if end_index < len(all_events)
553+
else None
554+
)
469555

470556
return GetDurableExecutionHistoryResponse(
471557
events=paginated_events, next_marker=next_marker

0 commit comments

Comments
 (0)