Skip to content

Commit 755a35e

Browse files
committed
chore: code review
1 parent c442bcd commit 755a35e

File tree

6 files changed

+158
-58
lines changed

6 files changed

+158
-58
lines changed

src/aws_durable_execution_sdk_python_testing/execution.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
# Import AWS exceptions
3030
from aws_durable_execution_sdk_python_testing.model import (
31+
InvocationCompletedDetails,
3132
StartDurableExecutionInput,
3233
)
3334
from aws_durable_execution_sdk_python_testing.token import (
@@ -60,9 +61,7 @@ def __init__(
6061
self.start_input: StartDurableExecutionInput = start_input
6162
self.operations: list[Operation] = operations
6263
self.updates: list[OperationUpdate] = []
63-
self.invocation_completions: list[
64-
tuple[float, float, str]
65-
] = [] # (start_ts, end_ts, request_id)
64+
self.invocation_completions: list[InvocationCompletedDetails] = []
6665
self.used_tokens: set[str] = set()
6766
# TODO: this will need to persist/rehydrate depending on inmemory vs sqllite store
6867
self._token_sequence: int = 0
@@ -105,8 +104,7 @@ def to_dict(self) -> dict[str, Any]:
105104
"Operations": [op.to_dict() for op in self.operations],
106105
"Updates": [update.to_dict() for update in self.updates],
107106
"InvocationCompletions": [
108-
[start, end, req_id]
109-
for start, end, req_id in self.invocation_completions
107+
completion.to_dict() for completion in self.invocation_completions
110108
],
111109
"UsedTokens": list(self.used_tokens),
112110
"TokenSequence": self._token_sequence,
@@ -137,7 +135,8 @@ def from_dict(cls, data: dict[str, Any]) -> Execution:
137135
OperationUpdate.from_dict(update_data) for update_data in data["Updates"]
138136
]
139137
execution.invocation_completions = [
140-
tuple(item) for item in data.get("InvocationCompletions", [])
138+
InvocationCompletedDetails.from_dict(item)
139+
for item in data.get("InvocationCompletions", [])
141140
]
142141
execution.used_tokens = set(data["UsedTokens"])
143142
execution._token_sequence = data["TokenSequence"] # noqa: SLF001
@@ -229,7 +228,13 @@ def record_invocation_completion(
229228
self, start_timestamp: float, end_timestamp: float, request_id: str
230229
) -> None:
231230
"""Record an invocation completion event."""
232-
self.invocation_completions.append((start_timestamp, end_timestamp, request_id))
231+
self.invocation_completions.append(
232+
InvocationCompletedDetails(
233+
start_timestamp=start_timestamp,
234+
end_timestamp=end_timestamp,
235+
request_id=request_id,
236+
)
237+
)
233238

234239
def complete_success(self, result: str | None) -> None:
235240
"""Complete execution successfully (DecisionType.COMPLETE_WORKFLOW_EXECUTION)."""

src/aws_durable_execution_sdk_python_testing/executor.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
from aws_durable_execution_sdk_python_testing.model import (
3434
CheckpointDurableExecutionResponse,
3535
CheckpointUpdatedExecutionState,
36+
EventCreationContext,
37+
EventType,
3638
GetDurableExecutionHistoryResponse,
3739
GetDurableExecutionResponse,
3840
GetDurableExecutionStateResponse,
@@ -45,7 +47,6 @@
4547
StartDurableExecutionOutput,
4648
StopDurableExecutionResponse,
4749
TERMINAL_STATUSES,
48-
EventCreationContext,
4950
)
5051
from aws_durable_execution_sdk_python_testing.model import (
5152
Event as HistoryEvent,
@@ -415,16 +416,14 @@ def get_execution_history(
415416
durable_execution_arn: str = execution.durable_execution_arn
416417

417418
# Add InvocationCompleted events
418-
for start_ts, end_ts, request_id in execution.invocation_completions:
419+
for completion in execution.invocation_completions:
419420
invocation_event = HistoryEvent(
420421
event_id=0, # Temporary, will be reassigned
421-
event_type="InvocationCompleted",
422-
event_timestamp=datetime.fromtimestamp(end_ts, tz=UTC),
423-
invocation_completed_details={
424-
"StartTimestamp": start_ts,
425-
"EndTimestamp": end_ts,
426-
"RequestId": request_id,
427-
},
422+
event_type=EventType.INVOCATION_COMPLETED.value,
423+
event_timestamp=datetime.fromtimestamp(
424+
completion.end_timestamp, tz=UTC
425+
),
426+
invocation_completed_details=completion,
428427
)
429428
all_events.append(invocation_event)
430429

@@ -785,7 +784,7 @@ async def invoke() -> None:
785784
self._store.save(execution)
786785

787786
invocation_start = time.time()
788-
response, request_id = self._invoker.invoke(
787+
invoke_response = self._invoker.invoke(
789788
execution.start_input.function_name,
790789
invocation_input,
791790
execution.start_input.lambda_endpoint,
@@ -797,7 +796,7 @@ async def invoke() -> None:
797796

798797
# Record invocation completion and save immediately
799798
execution.record_invocation_completion(
800-
invocation_start, invocation_end, request_id
799+
invocation_start, invocation_end, invoke_response.request_id
801800
)
802801
self._store.save(execution)
803802

@@ -809,6 +808,7 @@ async def invoke() -> None:
809808
return
810809

811810
# Process successful received response - validate status and handle accordingly
811+
response = invoke_response.invocation_output
812812
try:
813813
self._validate_invocation_response_and_store(
814814
execution_arn, response, execution

src/aws_durable_execution_sdk_python_testing/invoker.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import json
4+
from dataclasses import dataclass
45
from threading import Lock
56
from typing import TYPE_CHECKING, Any, Protocol
67
from uuid import uuid4
@@ -27,6 +28,14 @@
2728
from aws_durable_execution_sdk_python_testing.execution import Execution
2829

2930

31+
@dataclass(frozen=True)
32+
class InvokeResponse:
33+
"""Response from invoking a durable function."""
34+
35+
invocation_output: DurableExecutionInvocationOutput
36+
request_id: str
37+
38+
3039
def create_test_lambda_context() -> LambdaContext:
3140
# Create client context as a dictionary, not as objects
3241
# LambdaContext.__init__ expects dictionaries and will create the objects internally
@@ -66,7 +75,7 @@ def invoke(
6675
function_name: str,
6776
input: DurableExecutionInvocationInput,
6877
endpoint_url: str | None = None,
69-
) -> tuple[DurableExecutionInvocationOutput, str]: ... # pragma: no cover
78+
) -> InvokeResponse: ... # pragma: no cover
7079

7180
def update_endpoint(
7281
self, endpoint_url: str, region_name: str
@@ -97,15 +106,17 @@ def invoke(
97106
function_name: str, # noqa: ARG002
98107
input: DurableExecutionInvocationInput,
99108
endpoint_url: str | None = None, # noqa: ARG002
100-
) -> tuple[DurableExecutionInvocationOutput, str]:
109+
) -> InvokeResponse:
101110
# TODO: reasses if function_name will be used in future
102111
input_with_client = DurableExecutionInvocationInputWithClient.from_durable_execution_invocation_input(
103112
input, self.service_client
104113
)
105114
context = create_test_lambda_context()
106115
response_dict = self.handler(input_with_client, context)
107116
output = DurableExecutionInvocationOutput.from_dict(response_dict)
108-
return output, context.aws_request_id
117+
return InvokeResponse(
118+
invocation_output=output, request_id=context.aws_request_id
119+
)
109120

110121
def update_endpoint(self, endpoint_url: str, region_name: str) -> None:
111122
"""No-op for in-process invoker."""
@@ -194,7 +205,7 @@ def invoke(
194205
function_name: str,
195206
input: DurableExecutionInvocationInput,
196207
endpoint_url: str | None = None,
197-
) -> tuple[DurableExecutionInvocationOutput, str]:
208+
) -> InvokeResponse:
198209
"""Invoke AWS Lambda function and return durable execution result.
199210
200211
Args:
@@ -203,7 +214,7 @@ def invoke(
203214
endpoint_url: Lambda endpoint url
204215
205216
Returns:
206-
tuple: (DurableExecutionInvocationOutput, request_id)
217+
InvokeResponse: Response containing invocation output and request ID
207218
208219
Raises:
209220
ResourceNotFoundException: If function does not exist
@@ -259,7 +270,7 @@ def invoke(
259270

260271
# Convert to DurableExecutionInvocationOutput
261272
output = DurableExecutionInvocationOutput.from_dict(response_dict)
262-
return output, request_id
273+
return InvokeResponse(invocation_output=output, request_id=request_id)
263274

264275
except client.exceptions.ResourceNotFoundException as e:
265276
msg = f"Function not found: {function_name}"

src/aws_durable_execution_sdk_python_testing/model.py

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ class EventType(Enum):
6868
CALLBACK_SUCCEEDED = "CallbackSucceeded"
6969
CALLBACK_FAILED = "CallbackFailed"
7070
CALLBACK_TIMED_OUT = "CallbackTimedOut"
71+
INVOCATION_COMPLETED = "InvocationCompleted"
7172

7273

7374
TERMINAL_STATUSES: set[OperationStatus] = {
@@ -1222,6 +1223,30 @@ def to_dict(self) -> dict[str, Any]:
12221223
return result
12231224

12241225

1226+
@dataclass(frozen=True)
1227+
class InvocationCompletedDetails:
1228+
"""Invocation completed event details."""
1229+
1230+
start_timestamp: float
1231+
end_timestamp: float
1232+
request_id: str
1233+
1234+
@classmethod
1235+
def from_dict(cls, data: dict) -> InvocationCompletedDetails:
1236+
return cls(
1237+
start_timestamp=data["StartTimestamp"],
1238+
end_timestamp=data["EndTimestamp"],
1239+
request_id=data["RequestId"],
1240+
)
1241+
1242+
def to_dict(self) -> dict[str, Any]:
1243+
return {
1244+
"StartTimestamp": self.start_timestamp,
1245+
"EndTimestamp": self.end_timestamp,
1246+
"RequestId": self.request_id,
1247+
}
1248+
1249+
12251250
# endregion event_structures
12261251

12271252

@@ -1329,7 +1354,7 @@ class Event:
13291354
callback_succeeded_details: CallbackSucceededDetails | None = None
13301355
callback_failed_details: CallbackFailedDetails | None = None
13311356
callback_timed_out_details: CallbackTimedOutDetails | None = None
1332-
invocation_completed_details: dict[str, Any] | None = None
1357+
invocation_completed_details: InvocationCompletedDetails | None = None
13331358

13341359
@classmethod
13351360
def from_dict(cls, data: dict) -> Event:
@@ -1448,7 +1473,11 @@ def from_dict(cls, data: dict) -> Event:
14481473
if details_data := data.get("CallbackTimedOutDetails"):
14491474
callback_timed_out_details = CallbackTimedOutDetails.from_dict(details_data)
14501475

1451-
invocation_completed_details = data.get("InvocationCompletedDetails")
1476+
invocation_completed_details = None
1477+
if details_data := data.get("InvocationCompletedDetails"):
1478+
invocation_completed_details = InvocationCompletedDetails.from_dict(
1479+
details_data
1480+
)
14521481

14531482
return cls(
14541483
event_type=data["EventType"],
@@ -1568,7 +1597,9 @@ def to_dict(self) -> dict[str, Any]:
15681597
self.callback_timed_out_details.to_dict()
15691598
)
15701599
if self.invocation_completed_details is not None:
1571-
result["InvocationCompletedDetails"] = self.invocation_completed_details
1600+
result["InvocationCompletedDetails"] = (
1601+
self.invocation_completed_details.to_dict()
1602+
)
15721603
return result
15731604

15741605
# region execution
@@ -2224,6 +2255,30 @@ def create_callback_event(cls, context: EventCreationContext) -> Event:
22242255

22252256
# endregion callback
22262257

2258+
# region invocation_completed
2259+
@classmethod
2260+
def create_invocation_completed(
2261+
cls,
2262+
event_id: int,
2263+
event_timestamp: datetime.datetime,
2264+
start_timestamp: float,
2265+
end_timestamp: float,
2266+
request_id: str,
2267+
) -> Event:
2268+
"""Create invocation completed event."""
2269+
return cls(
2270+
event_type=EventType.INVOCATION_COMPLETED.value,
2271+
event_timestamp=event_timestamp,
2272+
event_id=event_id,
2273+
invocation_completed_details=InvocationCompletedDetails(
2274+
start_timestamp=start_timestamp,
2275+
end_timestamp=end_timestamp,
2276+
request_id=request_id,
2277+
),
2278+
)
2279+
2280+
# endregion invocation_completed
2281+
22272282
@classmethod
22282283
def create_event_started(cls, context: EventCreationContext) -> Event:
22292284
"""Convert operation to started event."""

0 commit comments

Comments
 (0)