Skip to content

Commit bb1f2e9

Browse files
committed
feat: add invoke operation
Add new invoke operation for calling other Durable Functions from a within a Durable Function. - Add invoke on DurableContext and an operation handler. - Add OperationSubType.INVOKE for invoke operation tracking - Amend InvokeOptions dataclass with update svc fields - Create InvokeConfig with timeout and payload + result serdes - Add new exception static factory for TimeSuspendExecution - Remove tests from coverage report - Update .gitignore to exclude additional cache/IDE directories
1 parent 1f81b84 commit bb1f2e9

File tree

12 files changed

+1042
-67
lines changed

12 files changed

+1042
-67
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,6 @@ __pycache__/
2626

2727
dist/
2828

29-
.idea
29+
.idea
30+
31+
.kiro/

pyproject.toml

Lines changed: 21 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,15 @@ readme = "README.md"
1010
requires-python = ">=3.13"
1111
license = "Apache-2.0"
1212
keywords = []
13-
authors = [
14-
{ name = "yaythomas", email = "tgaigher@amazon.com" },
15-
]
13+
authors = [{ name = "yaythomas", email = "tgaigher@amazon.com" }]
1614
classifiers = [
1715
"Development Status :: 4 - Beta",
1816
"Programming Language :: Python",
1917
"Programming Language :: Python :: 3.13",
2018
"Programming Language :: Python :: Implementation :: CPython",
2119
"Programming Language :: Python :: Implementation :: PyPy",
2220
]
23-
dependencies = [
24-
"boto3>=1.40.30"
25-
]
21+
dependencies = ["boto3>=1.40.30"]
2622

2723
[project.urls]
2824
Documentation = "https://github.com/aws/aws-durable-execution-sdk-python#readme"
@@ -38,48 +34,31 @@ packages = ["src/aws_durable_execution_sdk_python"]
3834
[tool.hatch.version]
3935
path = "src/aws_durable_execution_sdk_python/__about__.py"
4036

41-
# [tool.hatch.envs.default]
42-
# dependencies=["pytest"]
43-
44-
# [tool.hatch.envs.default.scripts]
45-
# test="pytest"
46-
4737
[tool.hatch.envs.test]
48-
dependencies = [
49-
"coverage[toml]",
50-
"pytest",
51-
"pytest-cov",
52-
]
38+
dependencies = ["coverage[toml]", "pytest", "pytest-cov"]
5339

5440
[tool.hatch.envs.test.scripts]
55-
cov="pytest --cov-report=term-missing --cov-config=pyproject.toml --cov=src/aws_durable_execution_sdk_python --cov=tests --cov-fail-under=98"
41+
cov = "pytest --cov-report=term-missing --cov-config=pyproject.toml --cov=src/aws_durable_execution_sdk_python --cov-fail-under=98"
5642

5743
[tool.hatch.envs.types]
58-
extra-dependencies = [
59-
"mypy>=1.0.0",
60-
"pytest"
61-
]
44+
extra-dependencies = ["mypy>=1.0.0", "pytest"]
6245
[tool.hatch.envs.types.scripts]
6346
check = "mypy --install-types --non-interactive {args:src/aws_durable_execution_sdk_python tests}"
6447

6548
[tool.coverage.run]
66-
source_pkgs = ["aws_durable_execution_sdk_python", "tests"]
49+
source_pkgs = ["aws_durable_execution_sdk_python"]
6750
branch = true
6851
parallel = true
69-
omit = [
70-
"src/aws_durable_execution_sdk_python/__about__.py",
71-
]
52+
omit = ["src/aws_durable_execution_sdk_python/__about__.py"]
7253

7354
[tool.coverage.paths]
74-
aws_durable_execution_sdk_python = ["src/aws_durable_execution_sdk_python", "*/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python"]
75-
tests = ["tests", "*/aws-durable-execution-sdk-python/tests"]
55+
aws_durable_execution_sdk_python = [
56+
"src/aws_durable_execution_sdk_python",
57+
"*/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python",
58+
]
7659

7760
[tool.coverage.report]
78-
exclude_lines = [
79-
"no cov",
80-
"if __name__ == .__main__.:",
81-
"if TYPE_CHECKING:",
82-
]
61+
exclude_lines = ["no cov", "if __name__ == .__main__.:", "if TYPE_CHECKING:"]
8362

8463
[tool.ruff]
8564
line-length = 88
@@ -88,4 +67,12 @@ line-length = 88
8867
preview = false
8968

9069
[tool.ruff.lint.per-file-ignores]
91-
"tests/**" = ["ARG001", "ARG002", "ARG005", "S101", "PLR2004", "SIM117", "TRY301"]
70+
"tests/**" = [
71+
"ARG001",
72+
"ARG002",
73+
"ARG005",
74+
"S101",
75+
"PLR2004",
76+
"SIM117",
77+
"TRY301",
78+
]

src/aws_durable_execution_sdk_python/config.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88

99
from aws_durable_execution_sdk_python.retries import RetryDecision # noqa: TCH001
1010

11-
R = TypeVar("R")
11+
P = TypeVar("P") # Payload type
12+
R = TypeVar("R") # Result type
1213
T = TypeVar("T")
1314
U = TypeVar("U")
1415

@@ -133,6 +134,14 @@ class MapConfig:
133134
serdes: SerDes | None = None
134135

135136

137+
@dataclass
138+
class InvokeConfig(Generic[P, R]):
139+
# retry_strategy: Callable[[Exception, int], RetryDecision] | None = None
140+
timeout_seconds: int = 0
141+
serdes_payload: SerDes[P] | None = None
142+
serdes_result: SerDes[R] | None = None
143+
144+
136145
@dataclass(frozen=True)
137146
class CallbackConfig:
138147
"""Configuration for callbacks."""

src/aws_durable_execution_sdk_python/context.py

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
BatchedInput,
88
CallbackConfig,
99
ChildConfig,
10+
InvokeConfig,
1011
MapConfig,
1112
ParallelConfig,
1213
StepConfig,
@@ -30,6 +31,7 @@
3031
wait_for_callback_handler,
3132
)
3233
from aws_durable_execution_sdk_python.operation.child import child_handler
34+
from aws_durable_execution_sdk_python.operation.invoke import invoke_handler
3335
from aws_durable_execution_sdk_python.operation.map import map_handler
3436
from aws_durable_execution_sdk_python.operation.parallel import parallel_handler
3537
from aws_durable_execution_sdk_python.operation.step import step_handler
@@ -56,17 +58,19 @@
5658

5759
from aws_durable_execution_sdk_python.state import CheckpointedResult
5860

59-
R = TypeVar("R")
61+
P = TypeVar("P") # Payload type
62+
R = TypeVar("R") # Result type
6063
T = TypeVar("T")
6164
U = TypeVar("U")
62-
P = ParamSpec("P")
65+
Params = ParamSpec("Params")
66+
6367

6468
logger = logging.getLogger(__name__)
6569

6670

6771
def durable_step(
68-
func: Callable[Concatenate[StepContext, P], T],
69-
) -> Callable[P, Callable[[StepContext], T]]:
72+
func: Callable[Concatenate[StepContext, Params], T],
73+
) -> Callable[Params, Callable[[StepContext], T]]:
7074
"""Wrap your callable into a named function that a Durable step can run."""
7175

7276
def wrapper(*args, **kwargs):
@@ -80,8 +84,8 @@ def function_with_arguments(context: StepContext):
8084

8185

8286
def durable_with_child_context(
83-
func: Callable[Concatenate[DurableContext, P], T],
84-
) -> Callable[P, Callable[[DurableContext], T]]:
87+
func: Callable[Concatenate[DurableContext, Params], T],
88+
) -> Callable[Params, Callable[[DurableContext], T]]:
8589
"""Wrap your callable into a Durable child context."""
8690

8791
def wrapper(*args, **kwargs):
@@ -291,6 +295,36 @@ def create_callback(
291295
serdes=config.serdes,
292296
)
293297

298+
def invoke(
299+
self,
300+
function_name: str,
301+
payload: P,
302+
name: str | None = None,
303+
config: InvokeConfig[P, R] | None = None,
304+
) -> R:
305+
"""Invoke another Durable Function.
306+
307+
Args:
308+
function_name: Name of the function to invoke
309+
payload: Input payload to send to the function
310+
name: Optional name for the operation
311+
config: Optional configuration for the invoke operation
312+
313+
Returns:
314+
The result of the invoked function
315+
"""
316+
return invoke_handler(
317+
function_name=function_name,
318+
payload=payload,
319+
state=self.state,
320+
operation_identifier=OperationIdentifier(
321+
operation_id=self._create_step_id(),
322+
parent_id=self._parent_id,
323+
name=name,
324+
),
325+
config=config,
326+
)
327+
294328
def map(
295329
self,
296330
inputs: Sequence[U],

src/aws_durable_execution_sdk_python/exceptions.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
from __future__ import annotations
77

8+
import time
89
from dataclasses import dataclass
910

1011

@@ -77,6 +78,24 @@ def __init__(self, message: str, scheduled_timestamp: float):
7778
super().__init__(message)
7879
self.scheduled_timestamp = scheduled_timestamp
7980

81+
@classmethod
82+
def from_delay(cls, message: str, delay_seconds: int) -> TimedSuspendExecution:
83+
"""Create a timed suspension with the delay calculated from now.
84+
85+
Args:
86+
message: Descriptive message for the suspension
87+
delay_seconds: Duration to suspend in seconds from current time
88+
89+
Returns:
90+
TimedSuspendExecution: Instance with calculated resume time
91+
92+
Example:
93+
>>> exception = TimedSuspendExecution.from_delay("Waiting for callback", 30)
94+
>>> # Will suspend for 30 seconds from now
95+
"""
96+
resume_time = time.time() + delay_seconds
97+
return cls(message, scheduled_timestamp=resume_time)
98+
8099

81100
class OrderedLockError(DurableExecutionsError):
82101
"""An error from OrderedLock.

src/aws_durable_execution_sdk_python/lambda_service.py

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ class OperationSubType(Enum):
6363
PARALLEL_BRANCH = "ParallelBranch"
6464
WAIT_FOR_CALLBACK = "WaitForCallback"
6565
WAIT_FOR_CONDITION = "WaitForCondition"
66+
INVOKE = "Invoke"
6667

6768

6869
@dataclass(frozen=True)
@@ -241,15 +242,11 @@ def to_dict(self) -> MutableMapping[str, Any]:
241242
@dataclass(frozen=True)
242243
class InvokeOptions:
243244
function_name: str
244-
function_qualifier: str | None = None
245-
durable_execution_name: str | None = None
245+
timeout_seconds: int = 0
246246

247247
def to_dict(self) -> MutableMapping[str, Any]:
248-
result = {"FunctionName": self.function_name}
249-
if self.function_qualifier:
250-
result["FunctionQualifier"] = self.function_qualifier
251-
if self.durable_execution_name:
252-
result["DurableExecutionName"] = self.durable_execution_name
248+
result: MutableMapping[str, Any] = {"FunctionName": self.function_name}
249+
result["TimeoutSeconds"] = self.timeout_seconds
253250
return result
254251

255252

@@ -471,6 +468,28 @@ def create_step_retry(
471468

472469
# endregion step
473470

471+
# region invoke
472+
@classmethod
473+
def create_invoke_start(
474+
cls,
475+
identifier: OperationIdentifier,
476+
payload: str,
477+
invoke_options: InvokeOptions,
478+
) -> OperationUpdate:
479+
"""Create an instance of OperationUpdate for type: INVOKE, action: START."""
480+
return cls(
481+
operation_id=identifier.operation_id,
482+
parent_id=identifier.parent_id,
483+
operation_type=OperationType.INVOKE,
484+
sub_type=OperationSubType.INVOKE,
485+
action=OperationAction.START,
486+
name=identifier.name,
487+
payload=payload,
488+
invoke_options=invoke_options,
489+
)
490+
491+
# endregion invoke
492+
474493
# region wait for condition
475494
@classmethod
476495
def create_wait_for_condition_start(

0 commit comments

Comments
 (0)