Skip to content

Commit c7a4dd0

Browse files
authored
fix: invoke lambda after callback success/failure (#119)
1 parent dbb5903 commit c7a4dd0

File tree

3 files changed

+29
-13
lines changed

3 files changed

+29
-13
lines changed

src/aws_durable_execution_sdk_python_testing/checkpoint/processor.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,15 +71,15 @@ def process_checkpoint(
7171
execution_arn=token.execution_arn,
7272
)
7373

74-
# 5. Save update
74+
# 5. Generate a new checkpoint token and save updated operations
75+
new_checkpoint_token = execution.get_new_checkpoint_token()
7576
execution.operations = updated_operations
7677
execution.updates.extend(all_updates)
77-
7878
self._store.update(execution)
7979

8080
# 6. Return checkpoint result
8181
return CheckpointOutput(
82-
checkpoint_token=execution.get_new_checkpoint_token(),
82+
checkpoint_token=new_checkpoint_token,
8383
new_execution_state=CheckpointUpdatedExecutionState(
8484
operations=execution.get_navigable_operations(), next_marker=None
8585
),

src/aws_durable_execution_sdk_python_testing/executor.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
ResourceNotFoundException,
2929
)
3030
from aws_durable_execution_sdk_python_testing.execution import Execution
31+
from aws_durable_execution_sdk_python_testing.exceptions import IllegalStateException
3132
from aws_durable_execution_sdk_python_testing.model import (
3233
CheckpointDurableExecutionResponse,
3334
CheckpointUpdatedExecutionState,
@@ -611,8 +612,12 @@ def checkpoint_execution(
611612
new_execution_state=new_execution_state,
612613
)
613614

615+
# Save execution state after generating new token
616+
new_checkpoint_token = execution.get_new_checkpoint_token()
617+
self._store.update(execution)
618+
614619
return CheckpointDurableExecutionResponse(
615-
checkpoint_token=execution.get_new_checkpoint_token(),
620+
checkpoint_token=new_checkpoint_token,
616621
new_execution_state=None,
617622
)
618623

@@ -644,6 +649,7 @@ def send_callback_success(
644649
execution.complete_callback_success(callback_id, result)
645650
self._store.update(execution)
646651
self._cleanup_callback_timeouts(callback_id)
652+
self._invoke_execution(callback_token.execution_arn)
647653
logger.info("Callback success completed for callback_id: %s", callback_id)
648654
except Exception as e:
649655
msg = f"Failed to process callback success: {e}"
@@ -681,6 +687,7 @@ def send_callback_failure(
681687
execution.complete_callback_failure(callback_id, callback_error)
682688
self._store.update(execution)
683689
self._cleanup_callback_timeouts(callback_id)
690+
self._invoke_execution(callback_token.execution_arn)
684691
logger.info("Callback failure completed for callback_id: %s", callback_id)
685692
except Exception as e:
686693
msg = f"Failed to process callback failure: {e}"
@@ -944,7 +951,7 @@ def complete_execution(self, execution_arn: str, result: str | None = None) -> N
944951

945952
def fail_execution(self, execution_arn: str, error: ErrorObject) -> None:
946953
"""Fail execution with error."""
947-
logger.exception("[%s] Completing execution with error.", execution_arn)
954+
logger.error("[%s] Completing execution with error: %s", execution_arn, error)
948955
execution: Execution = self._store.load(execution_arn=execution_arn)
949956
execution.complete_fail(error=error)
950957
self._store.update(execution)
@@ -1190,9 +1197,8 @@ def _on_callback_timeout(self, execution_arn: str, callback_id: str) -> None:
11901197
f"Callback timed out: {CallbackTimeoutType.TIMEOUT.value}"
11911198
)
11921199
execution.complete_callback_failure(callback_id, timeout_error)
1200+
execution.complete_fail(timeout_error)
11931201
self._store.update(execution)
1194-
self._invoke_execution(execution_arn)
1195-
11961202
logger.warning("[%s] Callback %s timed out", execution_arn, callback_id)
11971203
except Exception:
11981204
logger.exception(
@@ -1218,9 +1224,8 @@ def _on_callback_heartbeat_timeout(
12181224
f"Callback heartbeat timed out: {CallbackTimeoutType.HEARTBEAT.value}"
12191225
)
12201226
execution.complete_callback_failure(callback_id, heartbeat_error)
1227+
execution.complete_fail(heartbeat_error)
12211228
self._store.update(execution)
1222-
self._invoke_execution(execution_arn)
1223-
12241229
logger.warning(
12251230
"[%s] Callback %s heartbeat timed out", execution_arn, callback_id
12261231
)

tests/executor_test.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2216,7 +2216,7 @@ def test_send_callback_success(executor, mock_store):
22162216
mock_execution.complete_callback_success.return_value = Mock()
22172217
mock_store.load.return_value = mock_execution
22182218

2219-
with patch.object(executor, "_invoke_execution"):
2219+
with patch.object(executor, "_invoke_execution") as mock_invoke:
22202220
result = executor.send_callback_success(callback_id, b"success-result")
22212221

22222222
assert isinstance(result, SendDurableExecutionCallbackSuccessResponse)
@@ -2225,6 +2225,8 @@ def test_send_callback_success(executor, mock_store):
22252225
callback_id, b"success-result"
22262226
)
22272227
mock_store.update.assert_called_once_with(mock_execution)
2228+
# Verify execution is invoked after callback success
2229+
mock_invoke.assert_called_once_with("test-arn")
22282230

22292231

22302232
def test_send_callback_success_empty_callback_id(executor):
@@ -2253,10 +2255,15 @@ def test_send_callback_success_with_result(executor, mock_store):
22532255
mock_execution.complete_callback_success.return_value = Mock()
22542256
mock_store.load.return_value = mock_execution
22552257

2256-
with patch.object(executor, "_invoke_execution"):
2258+
with patch.object(executor, "_invoke_execution") as mock_invoke:
22572259
result = executor.send_callback_success(callback_id, b"test-result")
22582260

22592261
assert isinstance(result, SendDurableExecutionCallbackSuccessResponse)
2262+
mock_execution.complete_callback_success.assert_called_once_with(
2263+
callback_id, b"test-result"
2264+
)
2265+
# Verify execution is invoked after callback success
2266+
mock_invoke.assert_called_once_with("test-arn")
22602267

22612268

22622269
def test_send_callback_failure(executor, mock_store):
@@ -2273,12 +2280,14 @@ def test_send_callback_failure(executor, mock_store):
22732280
mock_execution.complete_callback_failure.return_value = Mock()
22742281
mock_store.load.return_value = mock_execution
22752282

2276-
with patch.object(executor, "_invoke_execution"):
2283+
with patch.object(executor, "_invoke_execution") as mock_invoke:
22772284
result = executor.send_callback_failure(callback_id)
22782285

22792286
assert isinstance(result, SendDurableExecutionCallbackFailureResponse)
22802287
mock_store.load.assert_called_once_with("test-arn")
22812288
mock_store.update.assert_called_once_with(mock_execution)
2289+
# Verify execution is invoked after callback failure
2290+
mock_invoke.assert_called_once_with("test-arn")
22822291

22832292

22842293
def test_send_callback_failure_empty_callback_id(executor):
@@ -2306,11 +2315,13 @@ def test_send_callback_failure_with_error(executor, mock_store):
23062315
mock_store.load.return_value = mock_execution
23072316

23082317
error = ErrorObject.from_message("Test callback error")
2309-
with patch.object(executor, "_invoke_execution"):
2318+
with patch.object(executor, "_invoke_execution") as mock_invoke:
23102319
result = executor.send_callback_failure(callback_id, error)
23112320

23122321
assert isinstance(result, SendDurableExecutionCallbackFailureResponse)
23132322
mock_execution.complete_callback_failure.assert_called_once_with(callback_id, error)
2323+
# Verify execution is invoked after callback failure
2324+
mock_invoke.assert_called_once_with("test-arn")
23142325

23152326

23162327
def test_send_callback_heartbeat(executor, mock_store):

0 commit comments

Comments
 (0)