Skip to content

Commit f30b353

Browse files
authored
remove sync_compatible from build_server (#16314)
1 parent d5229df commit f30b353

File tree

8 files changed

+177
-179
lines changed

8 files changed

+177
-179
lines changed

src/prefect/flow_runs.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ async def suspend_flow_run(
307307
flow_run_id: Optional[UUID] = None,
308308
timeout: Optional[int] = 3600,
309309
key: Optional[str] = None,
310-
client: PrefectClient = None,
310+
client: Optional[PrefectClient] = None,
311311
) -> None:
312312
...
313313

@@ -318,7 +318,7 @@ async def suspend_flow_run(
318318
flow_run_id: Optional[UUID] = None,
319319
timeout: Optional[int] = 3600,
320320
key: Optional[str] = None,
321-
client: PrefectClient = None,
321+
client: Optional[PrefectClient] = None,
322322
) -> T:
323323
...
324324

@@ -330,7 +330,7 @@ async def suspend_flow_run(
330330
flow_run_id: Optional[UUID] = None,
331331
timeout: Optional[int] = 3600,
332332
key: Optional[str] = None,
333-
client: PrefectClient = None,
333+
client: Optional[PrefectClient] = None,
334334
) -> Optional[T]:
335335
"""
336336
Suspends a flow run by stopping code execution until resumed.

src/prefect/flows.py

+77-102
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
Callable,
2525
Coroutine,
2626
Generic,
27-
Hashable,
2827
Iterable,
2928
NoReturn,
3029
Optional,
@@ -43,7 +42,7 @@
4342
from pydantic.v1.decorator import ValidatedFunction as V1ValidatedFunction
4443
from pydantic.v1.errors import ConfigError # TODO
4544
from rich.console import Console
46-
from typing_extensions import Literal, ParamSpec, Self
45+
from typing_extensions import Literal, ParamSpec, TypeAlias
4746

4847
from prefect._internal.concurrency.api import create_call, from_async
4948
from prefect.blocks.core import Block
@@ -105,7 +104,11 @@
105104
T = TypeVar("T") # Generic type var for capturing the inner return type of async funcs
106105
R = TypeVar("R") # The return type of the user's function
107106
P = ParamSpec("P") # The parameters of the flow
108-
F = TypeVar("F", bound="Flow") # The type of the flow
107+
F = TypeVar("F", bound="Flow[Any, Any]") # The type of the flow
108+
109+
StateHookCallable: TypeAlias = Callable[
110+
[FlowSchema, FlowRun, State], Union[Awaitable[None], None]
111+
]
109112

110113
logger = get_logger("flows")
111114

@@ -195,15 +198,11 @@ def __init__(
195198
result_serializer: Optional[ResultSerializer] = None,
196199
cache_result_in_memory: bool = True,
197200
log_prints: Optional[bool] = None,
198-
on_completion: Optional[
199-
list[Callable[[FlowSchema, FlowRun, State], None]]
200-
] = None,
201-
on_failure: Optional[list[Callable[[FlowSchema, FlowRun, State], None]]] = None,
202-
on_cancellation: Optional[
203-
list[Callable[[FlowSchema, FlowRun, State], None]]
204-
] = None,
205-
on_crashed: Optional[list[Callable[[FlowSchema, FlowRun, State], None]]] = None,
206-
on_running: Optional[list[Callable[[FlowSchema, FlowRun, State], None]]] = None,
201+
on_completion: Optional[list[StateHookCallable]] = None,
202+
on_failure: Optional[list[StateHookCallable]] = None,
203+
on_cancellation: Optional[list[StateHookCallable]] = None,
204+
on_crashed: Optional[list[StateHookCallable]] = None,
205+
on_running: Optional[list[StateHookCallable]] = None,
207206
):
208207
if name is not None and not isinstance(name, str):
209208
raise TypeError(
@@ -375,7 +374,7 @@ def __init__(
375374
def ismethod(self) -> bool:
376375
return hasattr(self.fn, "__prefect_self__")
377376

378-
def __get__(self, instance, owner):
377+
def __get__(self, instance: Any, owner: Any):
379378
"""
380379
Implement the descriptor protocol so that the flow can be used as an instance method.
381380
When an instance method is loaded, this method is called with the "self" instance as
@@ -402,24 +401,22 @@ def with_options(
402401
retry_delay_seconds: Optional[Union[int, float]] = None,
403402
description: Optional[str] = None,
404403
flow_run_name: Optional[Union[Callable[[], str], str]] = None,
405-
task_runner: Union[Type[TaskRunner], TaskRunner, None] = None,
404+
task_runner: Union[
405+
Type[TaskRunner[PrefectFuture[R]]], TaskRunner[PrefectFuture[R]], None
406+
] = None,
406407
timeout_seconds: Union[int, float, None] = None,
407408
validate_parameters: Optional[bool] = None,
408409
persist_result: Optional[bool] = NotSet, # type: ignore
409410
result_storage: Optional[ResultStorage] = NotSet, # type: ignore
410411
result_serializer: Optional[ResultSerializer] = NotSet, # type: ignore
411412
cache_result_in_memory: Optional[bool] = None,
412413
log_prints: Optional[bool] = NotSet, # type: ignore
413-
on_completion: Optional[
414-
list[Callable[[FlowSchema, FlowRun, State], None]]
415-
] = None,
416-
on_failure: Optional[list[Callable[[FlowSchema, FlowRun, State], None]]] = None,
417-
on_cancellation: Optional[
418-
list[Callable[[FlowSchema, FlowRun, State], None]]
419-
] = None,
420-
on_crashed: Optional[list[Callable[[FlowSchema, FlowRun, State], None]]] = None,
421-
on_running: Optional[list[Callable[[FlowSchema, FlowRun, State], None]]] = None,
422-
) -> Self:
414+
on_completion: Optional[list[StateHookCallable]] = None,
415+
on_failure: Optional[list[StateHookCallable]] = None,
416+
on_cancellation: Optional[list[StateHookCallable]] = None,
417+
on_crashed: Optional[list[StateHookCallable]] = None,
418+
on_running: Optional[list[StateHookCallable]] = None,
419+
) -> "Flow[P, R]":
423420
"""
424421
Create a new flow from the current object, updating provided options.
425422
@@ -645,7 +642,7 @@ async def to_deployment(
645642
paused: Optional[bool] = None,
646643
schedules: Optional["FlexibleScheduleList"] = None,
647644
concurrency_limit: Optional[Union[int, ConcurrencyLimitConfig, None]] = None,
648-
parameters: Optional[dict] = None,
645+
parameters: Optional[dict[str, Any]] = None,
649646
triggers: Optional[list[Union[DeploymentTriggerTypes, TriggerTypes]]] = None,
650647
description: Optional[str] = None,
651648
tags: Optional[list[str]] = None,
@@ -755,33 +752,23 @@ def my_other_flow(name):
755752
entrypoint_type=entrypoint_type,
756753
)
757754

758-
def on_completion(
759-
self, fn: Callable[[FlowSchema, FlowRun, State], None]
760-
) -> Callable[[FlowSchema, FlowRun, State], None]:
755+
def on_completion(self, fn: StateHookCallable) -> StateHookCallable:
761756
self.on_completion_hooks.append(fn)
762757
return fn
763758

764-
def on_cancellation(
765-
self, fn: Callable[[FlowSchema, FlowRun, State], None]
766-
) -> Callable[[FlowSchema, FlowRun, State], None]:
759+
def on_cancellation(self, fn: StateHookCallable) -> StateHookCallable:
767760
self.on_cancellation_hooks.append(fn)
768761
return fn
769762

770-
def on_crashed(
771-
self, fn: Callable[[FlowSchema, FlowRun, State], None]
772-
) -> Callable[[FlowSchema, FlowRun, State], None]:
763+
def on_crashed(self, fn: StateHookCallable) -> StateHookCallable:
773764
self.on_crashed_hooks.append(fn)
774765
return fn
775766

776-
def on_running(
777-
self, fn: Callable[[FlowSchema, FlowRun, State], None]
778-
) -> Callable[[FlowSchema, FlowRun, State], None]:
767+
def on_running(self, fn: StateHookCallable) -> StateHookCallable:
779768
self.on_running_hooks.append(fn)
780769
return fn
781770

782-
def on_failure(
783-
self, fn: Callable[[FlowSchema, FlowRun, State], None]
784-
) -> Callable[[FlowSchema, FlowRun, State], None]:
771+
def on_failure(self, fn: StateHookCallable) -> StateHookCallable:
785772
self.on_failure_hooks.append(fn)
786773
return fn
787774

@@ -1039,8 +1026,11 @@ def my_flow(name: str = "world"):
10391026
await storage.pull_code()
10401027

10411028
full_entrypoint = str(storage.destination / entrypoint)
1042-
flow: Flow = await from_async.wait_for_call_in_new_thread(
1043-
create_call(load_flow_from_entrypoint, full_entrypoint)
1029+
flow = cast(
1030+
Flow[P, R],
1031+
await from_async.wait_for_call_in_new_thread(
1032+
create_call(load_flow_from_entrypoint, full_entrypoint)
1033+
),
10441034
)
10451035
flow._storage = storage
10461036
flow._entrypoint = entrypoint
@@ -1442,17 +1432,11 @@ def flow(
14421432
result_serializer: Optional[ResultSerializer] = None,
14431433
cache_result_in_memory: bool = True,
14441434
log_prints: Optional[bool] = None,
1445-
on_completion: Optional[
1446-
list[Callable[[FlowSchema, FlowRun, State], Union[Awaitable[None], None]]]
1447-
] = None,
1448-
on_failure: Optional[
1449-
list[Callable[[FlowSchema, FlowRun, State], Union[Awaitable[None], None]]]
1450-
] = None,
1451-
on_cancellation: Optional[
1452-
list[Callable[[FlowSchema, FlowRun, State], None]]
1453-
] = None,
1454-
on_crashed: Optional[list[Callable[[FlowSchema, FlowRun, State], None]]] = None,
1455-
on_running: Optional[list[Callable[[FlowSchema, FlowRun, State], None]]] = None,
1435+
on_completion: Optional[list[StateHookCallable]] = None,
1436+
on_failure: Optional[list[StateHookCallable]] = None,
1437+
on_cancellation: Optional[list[StateHookCallable]] = None,
1438+
on_crashed: Optional[list[StateHookCallable]] = None,
1439+
on_running: Optional[list[StateHookCallable]] = None,
14561440
) -> Callable[[Callable[P, R]], Flow[P, R]]:
14571441
...
14581442

@@ -1474,17 +1458,11 @@ def flow(
14741458
result_serializer: Optional[ResultSerializer] = None,
14751459
cache_result_in_memory: bool = True,
14761460
log_prints: Optional[bool] = None,
1477-
on_completion: Optional[
1478-
list[Callable[[FlowSchema, FlowRun, State], Union[Awaitable[None], None]]]
1479-
] = None,
1480-
on_failure: Optional[
1481-
list[Callable[[FlowSchema, FlowRun, State], Union[Awaitable[None], None]]]
1482-
] = None,
1483-
on_cancellation: Optional[
1484-
list[Callable[[FlowSchema, FlowRun, State], None]]
1485-
] = None,
1486-
on_crashed: Optional[list[Callable[[FlowSchema, FlowRun, State], None]]] = None,
1487-
on_running: Optional[list[Callable[[FlowSchema, FlowRun, State], None]]] = None,
1461+
on_completion: Optional[list[StateHookCallable]] = None,
1462+
on_failure: Optional[list[StateHookCallable]] = None,
1463+
on_cancellation: Optional[list[StateHookCallable]] = None,
1464+
on_crashed: Optional[list[StateHookCallable]] = None,
1465+
on_running: Optional[list[StateHookCallable]] = None,
14881466
):
14891467
"""
14901468
Decorator to designate a function as a Prefect workflow.
@@ -1593,30 +1571,27 @@ def flow(
15931571
if isinstance(__fn, (classmethod, staticmethod)):
15941572
method_decorator = type(__fn).__name__
15951573
raise TypeError(f"@{method_decorator} should be applied on top of @flow")
1596-
return cast(
1597-
Flow[P, R],
1598-
Flow(
1599-
fn=__fn,
1600-
name=name,
1601-
version=version,
1602-
flow_run_name=flow_run_name,
1603-
task_runner=task_runner,
1604-
description=description,
1605-
timeout_seconds=timeout_seconds,
1606-
validate_parameters=validate_parameters,
1607-
retries=retries,
1608-
retry_delay_seconds=retry_delay_seconds,
1609-
persist_result=persist_result,
1610-
result_storage=result_storage,
1611-
result_serializer=result_serializer,
1612-
cache_result_in_memory=cache_result_in_memory,
1613-
log_prints=log_prints,
1614-
on_completion=on_completion,
1615-
on_failure=on_failure,
1616-
on_cancellation=on_cancellation,
1617-
on_crashed=on_crashed,
1618-
on_running=on_running,
1619-
),
1574+
return Flow(
1575+
fn=__fn,
1576+
name=name,
1577+
version=version,
1578+
flow_run_name=flow_run_name,
1579+
task_runner=task_runner,
1580+
description=description,
1581+
timeout_seconds=timeout_seconds,
1582+
validate_parameters=validate_parameters,
1583+
retries=retries,
1584+
retry_delay_seconds=retry_delay_seconds,
1585+
persist_result=persist_result,
1586+
result_storage=result_storage,
1587+
result_serializer=result_serializer,
1588+
cache_result_in_memory=cache_result_in_memory,
1589+
log_prints=log_prints,
1590+
on_completion=on_completion,
1591+
on_failure=on_failure,
1592+
on_cancellation=on_cancellation,
1593+
on_crashed=on_crashed,
1594+
on_running=on_running,
16201595
)
16211596
else:
16221597
return cast(
@@ -1668,10 +1643,10 @@ def _raise_on_name_with_banned_characters(name: Optional[str]) -> Optional[str]:
16681643

16691644

16701645
def select_flow(
1671-
flows: Iterable[Flow],
1646+
flows: Iterable[Flow[P, R]],
16721647
flow_name: Optional[str] = None,
16731648
from_message: Optional[str] = None,
1674-
) -> Flow:
1649+
) -> Flow[P, R]:
16751650
"""
16761651
Select the only flow in an iterable or a flow specified by name.
16771652
@@ -1716,7 +1691,7 @@ def select_flow(
17161691
def load_flow_from_entrypoint(
17171692
entrypoint: str,
17181693
use_placeholder_flow: bool = True,
1719-
) -> Flow:
1694+
) -> Flow[P, Any]:
17201695
"""
17211696
Extract a flow object from a script at an entrypoint by running all of the code in the file.
17221697
@@ -1740,7 +1715,7 @@ def load_flow_from_entrypoint(
17401715
else:
17411716
path, func_name = entrypoint.rsplit(".", maxsplit=1)
17421717
try:
1743-
flow = import_object(entrypoint)
1718+
flow: Flow[P, Any] = import_object(entrypoint) # pyright: ignore[reportRedeclaration]
17441719
except AttributeError as exc:
17451720
raise MissingFlowError(
17461721
f"Flow function with name {func_name!r} not found in {path!r}. "
@@ -1749,13 +1724,13 @@ def load_flow_from_entrypoint(
17491724
# If the flow has dependencies that are not installed in the current
17501725
# environment, fallback to loading the flow via AST parsing.
17511726
if use_placeholder_flow:
1752-
flow = safe_load_flow_from_entrypoint(entrypoint)
1727+
flow: Optional[Flow[P, Any]] = safe_load_flow_from_entrypoint(entrypoint)
17531728
if flow is None:
17541729
raise
17551730
else:
17561731
raise
17571732

1758-
if not isinstance(flow, Flow):
1733+
if not isinstance(flow, Flow): # pyright: ignore[reportUnnecessaryIsInstance]
17591734
raise MissingFlowError(
17601735
f"Function with name {func_name!r} is not a flow. Make sure that it is "
17611736
"decorated with '@flow'."
@@ -1770,7 +1745,7 @@ def serve(
17701745
print_starting_message: bool = True,
17711746
limit: Optional[int] = None,
17721747
**kwargs: Any,
1773-
):
1748+
) -> None:
17741749
"""
17751750
Serve the provided list of deployments.
17761751
@@ -1840,7 +1815,7 @@ async def aserve(
18401815
print_starting_message: bool = True,
18411816
limit: Optional[int] = None,
18421817
**kwargs: Any,
1843-
):
1818+
) -> None:
18441819
"""
18451820
Asynchronously serve the provided list of deployments.
18461821
@@ -1945,7 +1920,7 @@ async def load_flow_from_flow_run(
19451920
ignore_storage: bool = False,
19461921
storage_base_path: Optional[str] = None,
19471922
use_placeholder_flow: bool = True,
1948-
) -> Flow:
1923+
) -> Flow[P, Any]:
19491924
"""
19501925
Load a flow from the location/script provided in a deployment's storage document.
19511926
@@ -2024,7 +1999,7 @@ async def load_flow_from_flow_run(
20241999
return flow
20252000

20262001

2027-
def load_placeholder_flow(entrypoint: str, raises: Exception):
2002+
def load_placeholder_flow(entrypoint: str, raises: Exception) -> Flow[P, Any]:
20282003
"""
20292004
Load a placeholder flow that is initialized with the same arguments as the
20302005
flow specified in the entrypoint. If called the flow will raise `raises`.
@@ -2202,7 +2177,7 @@ def _sanitize_and_load_flow(
22022177

22032178
def load_flow_arguments_from_entrypoint(
22042179
entrypoint: str, arguments: Optional[Union[list[str], set[str]]] = None
2205-
) -> dict[Hashable, Any]:
2180+
) -> dict[str, Any]:
22062181
"""
22072182
Extract flow arguments from an entrypoint string.
22082183
@@ -2235,7 +2210,7 @@ def load_flow_arguments_from_entrypoint(
22352210
"log_prints",
22362211
}
22372212

2238-
result: dict[Hashable, Any] = {}
2213+
result: dict[str, Any] = {}
22392214

22402215
for decorator in func_def.decorator_list:
22412216
if (
@@ -2248,7 +2223,7 @@ def load_flow_arguments_from_entrypoint(
22482223

22492224
if isinstance(keyword.value, ast.Constant):
22502225
# Use the string value of the argument
2251-
result[keyword.arg] = str(keyword.value.value)
2226+
result[cast(str, keyword.arg)] = str(keyword.value.value)
22522227
continue
22532228

22542229
# if the arg value is not a raw str (i.e. a variable or expression),
@@ -2261,7 +2236,7 @@ def load_flow_arguments_from_entrypoint(
22612236

22622237
try:
22632238
evaluated_value = eval(cleaned_value, namespace) # type: ignore
2264-
result[keyword.arg] = str(evaluated_value)
2239+
result[cast(str, keyword.arg)] = str(evaluated_value)
22652240
except Exception as e:
22662241
logger.info(
22672242
"Failed to parse @flow argument: `%s=%s` due to the following error. Ignoring and falling back to default behavior.",

0 commit comments

Comments
 (0)