Skip to content

Commit fc4155b

Browse files
author
Rares Polenciuc
committed
feat: implement callback token generation and processing
- Add CallbackToken generation in callback processor with observer integration - Implement SendCallbackSuccess, SendCallbackFailure, and SendCallbackHeartbeat - Add callback operation lookup and completion methods to execution - Ensure unique token generation across executions
1 parent a6bdb18 commit fc4155b

File tree

13 files changed

+1164
-109
lines changed

13 files changed

+1164
-109
lines changed

src/aws_durable_execution_sdk_python_testing/checkpoint/processors/base.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,12 +94,12 @@ def _create_step_details(
9494
return None
9595

9696
def _create_callback_details(
97-
self, update: OperationUpdate
97+
self, update: OperationUpdate, callback_id: str | None = None
9898
) -> CallbackDetails | None:
9999
"""Create CallbackDetails from OperationUpdate."""
100100
return (
101101
CallbackDetails(
102-
callback_id="placeholder", result=update.payload, error=update.error
102+
callback_id=callback_id or "", result=update.payload, error=update.error
103103
)
104104
if update.operation_type == OperationType.CALLBACK
105105
else None
@@ -137,6 +137,7 @@ def _translate_update_to_operation(
137137
update: OperationUpdate,
138138
current_operation: Operation | None,
139139
status: OperationStatus,
140+
callback_id: str | None = None,
140141
) -> Operation:
141142
"""Transform OperationUpdate to Operation, always creating new Operation."""
142143
start_time = (
@@ -149,7 +150,7 @@ def _translate_update_to_operation(
149150
execution_details = self._create_execution_details(update)
150151
context_details = self._create_context_details(update)
151152
step_details = self._create_step_details(update, current_operation)
152-
callback_details = self._create_callback_details(update)
153+
callback_details = self._create_callback_details(update, callback_id)
153154
invoke_details = self._create_invoke_details(update)
154155
wait_details = self._create_wait_details(update, current_operation)
155156

src/aws_durable_execution_sdk_python_testing/checkpoint/processors/callback.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@
22

33
from __future__ import annotations
44

5+
from dataclasses import replace
56
from typing import TYPE_CHECKING
67

78
from aws_durable_execution_sdk_python.lambda_service import (
89
Operation,
910
OperationAction,
1011
OperationStatus,
1112
OperationUpdate,
13+
CallbackDetails,
1214
)
1315

1416
from aws_durable_execution_sdk_python_testing.checkpoint.processors.base import (
@@ -17,6 +19,7 @@
1719
from aws_durable_execution_sdk_python_testing.exceptions import (
1820
InvalidParameterValueException,
1921
)
22+
from aws_durable_execution_sdk_python_testing.token import CallbackToken
2023

2124

2225
if TYPE_CHECKING:
@@ -36,14 +39,26 @@ def process(
3639
"""Process CALLBACK operation update with scheduler integration for activities."""
3740
match update.action:
3841
case OperationAction.START:
39-
# TODO: create CallbackToken (see token module). Add Observer/Notifier for on_callback_created possibly,
40-
# but token might well have enough so don't need to maintain token list on execution itself
41-
return self._translate_update_to_operation(
42+
callback_token: CallbackToken = CallbackToken(
43+
execution_arn=execution_arn,
44+
operation_id=update.operation_id,
45+
)
46+
callback_id: str = callback_token.to_str()
47+
48+
notifier.notify_callback_created(
49+
execution_arn=execution_arn,
50+
operation_id=update.operation_id,
51+
callback_id=callback_id,
52+
)
53+
54+
operation: Operation = self._translate_update_to_operation(
4255
update=update,
4356
current_operation=current_op,
4457
status=OperationStatus.STARTED,
58+
callback_id=callback_id,
4559
)
60+
61+
return operation
4662
case _:
4763
msg: str = "Invalid action for CALLBACK operation."
48-
4964
raise InvalidParameterValueException(msg)

src/aws_durable_execution_sdk_python_testing/checkpoint/validators/operations/callback.py

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
VALID_ACTIONS_FOR_CALLBACK = frozenset(
1818
[
1919
OperationAction.START,
20-
OperationAction.CANCEL,
2120
]
2221
)
2322

@@ -41,14 +40,6 @@ def validate(current_state: Operation | None, update: OperationUpdate) -> None:
4140
"Cannot start a CALLBACK that already exist."
4241
)
4342
raise InvalidParameterValueException(msg_callback_exists)
44-
case OperationAction.CANCEL:
45-
if (
46-
current_state is None
47-
or current_state.status
48-
not in CallbackOperationValidator._ALLOWED_STATUS_TO_CANCEL
49-
):
50-
msg_callback_cancel: str = "Cannot cancel a CALLBACK that does not exist or has already completed."
51-
raise InvalidParameterValueException(msg_callback_cancel)
5243
case _:
53-
msg_callback_invalid: str = "Invalid CALLBACK action."
54-
raise InvalidParameterValueException(msg_callback_invalid)
44+
msg: str = "Invalid action for CALLBACK operation."
45+
raise InvalidParameterValueException(msg)

src/aws_durable_execution_sdk_python_testing/execution.py

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@
2828
from aws_durable_execution_sdk_python_testing.model import (
2929
StartDurableExecutionInput,
3030
)
31-
from aws_durable_execution_sdk_python_testing.token import CheckpointToken
31+
from aws_durable_execution_sdk_python_testing.token import (
32+
CheckpointToken,
33+
CallbackToken,
34+
)
3235

3336

3437
class Execution:
@@ -203,6 +206,18 @@ def find_operation(self, operation_id: str) -> tuple[int, Operation]:
203206
msg: str = f"Attempting to update state of an Operation [{operation_id}] that doesn't exist"
204207
raise IllegalStateException(msg)
205208

209+
def find_callback_operation(self, callback_id: str) -> tuple[int, Operation]:
210+
"""Find callback operation by callback_id, return index and operation."""
211+
for i, operation in enumerate(self.operations):
212+
if (
213+
operation.operation_type == OperationType.CALLBACK
214+
and operation.callback_details
215+
and operation.callback_details.callback_id == callback_id
216+
):
217+
return i, operation
218+
msg: str = f"Callback operation with callback_id [{callback_id}] not found"
219+
raise IllegalStateException(msg)
220+
206221
def complete_wait(self, operation_id: str) -> Operation:
207222
"""Complete WAIT operation when timer fires."""
208223
index, operation = self.find_operation(operation_id)
@@ -260,3 +275,56 @@ def complete_retry(self, operation_id: str) -> Operation:
260275
# Assign
261276
self.operations[index] = updated_operation
262277
return updated_operation
278+
279+
def complete_callback_success(
280+
self, callback_id: str, result: bytes | None = None
281+
) -> Operation:
282+
"""Complete CALLBACK operation with success."""
283+
index, operation = self.find_callback_operation(callback_id)
284+
285+
if operation.status != OperationStatus.STARTED:
286+
msg: str = f"Callback operation [{callback_id}] is not in STARTED state"
287+
raise IllegalStateException(msg)
288+
289+
with self._state_lock:
290+
self._token_sequence += 1
291+
updated_callback_details = None
292+
if operation.callback_details:
293+
updated_callback_details = replace(
294+
operation.callback_details,
295+
result=result.decode() if result else None,
296+
)
297+
298+
self.operations[index] = replace(
299+
operation,
300+
status=OperationStatus.SUCCEEDED,
301+
end_timestamp=datetime.now(UTC),
302+
callback_details=updated_callback_details,
303+
)
304+
return self.operations[index]
305+
306+
def complete_callback_failure(
307+
self, callback_id: str, error: ErrorObject
308+
) -> Operation:
309+
"""Complete CALLBACK operation with failure."""
310+
index, operation = self.find_callback_operation(callback_id)
311+
312+
if operation.status != OperationStatus.STARTED:
313+
msg: str = f"Callback operation [{callback_id}] is not in STARTED state"
314+
raise IllegalStateException(msg)
315+
316+
with self._state_lock:
317+
self._token_sequence += 1
318+
updated_callback_details = None
319+
if operation.callback_details:
320+
updated_callback_details = replace(
321+
operation.callback_details, error=error
322+
)
323+
324+
self.operations[index] = replace(
325+
operation,
326+
status=OperationStatus.FAILED,
327+
end_timestamp=datetime.now(UTC),
328+
callback_details=updated_callback_details,
329+
)
330+
return self.operations[index]

0 commit comments

Comments
 (0)