Skip to content

Commit 7ef2cef

Browse files
authored
Add support for using the "normal" Trigger classes for flow.serve and .deploy (#12789)
1 parent 7f864ef commit 7ef2cef

File tree

9 files changed

+120
-288
lines changed

9 files changed

+120
-288
lines changed

src/prefect/cli/deploy.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,23 @@
77
from datetime import timedelta
88
from getpass import GetPassWarning
99
from pathlib import Path
10-
from typing import Any, Dict, List, Optional, Tuple
10+
from typing import Any, Dict, List, Optional, Tuple, Union
1111
from uuid import UUID
1212

13+
import typer
14+
import yaml
15+
from rich.console import Console
16+
from rich.panel import Panel
17+
from rich.table import Table
18+
from yaml.error import YAMLError
19+
1320
from prefect._internal.pydantic import HAS_PYDANTIC_V2
1421

1522
if HAS_PYDANTIC_V2:
1623
import pydantic.v1 as pydantic
1724
else:
1825
import pydantic
1926

20-
import typer
21-
import yaml
22-
from rich.console import Console
23-
from rich.panel import Panel
24-
from rich.table import Table
25-
from yaml.error import YAMLError
2627

2728
import prefect
2829
from prefect._internal.compatibility.deprecated import (
@@ -62,7 +63,7 @@
6263
_save_deployment_to_prefect_file,
6364
)
6465
from prefect.deployments.steps.core import run_steps
65-
from prefect.events import DeploymentTriggerTypes
66+
from prefect.events import DeploymentTriggerTypes, TriggerTypes
6667
from prefect.exceptions import ObjectNotFound, PrefectHTTPStatusError
6768
from prefect.flows import load_flow_from_entrypoint
6869
from prefect.settings import (
@@ -1602,7 +1603,9 @@ def _initialize_deployment_triggers(
16021603

16031604

16041605
async def _create_deployment_triggers(
1605-
client: PrefectClient, deployment_id: UUID, triggers: List[DeploymentTriggerTypes]
1606+
client: PrefectClient,
1607+
deployment_id: UUID,
1608+
triggers: List[Union[DeploymentTriggerTypes, TriggerTypes]],
16061609
):
16071610
if client.server_type.supports_automations():
16081611
try:

src/prefect/deployments/deployments.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,20 @@
1616
import pendulum
1717
import yaml
1818

19+
from prefect._internal.pydantic import HAS_PYDANTIC_V2
20+
21+
if HAS_PYDANTIC_V2:
22+
from pydantic.v1 import BaseModel, Field, parse_obj_as, root_validator, validator
23+
else:
24+
from pydantic import BaseModel, Field, parse_obj_as, root_validator, validator
25+
1926
from prefect._internal.compatibility.deprecated import (
2027
DeprecatedInfraOverridesField,
2128
deprecated_callable,
2229
deprecated_class,
2330
deprecated_parameter,
2431
handle_deprecated_infra_overrides_parameter,
2532
)
26-
from prefect._internal.pydantic import HAS_PYDANTIC_V2
2733
from prefect._internal.schemas.validators import (
2834
handle_openapi_schema,
2935
infrastructure_must_have_capabilities,
@@ -32,16 +38,10 @@
3238
validate_automation_names,
3339
validate_deprecated_schedule_fields,
3440
)
35-
from prefect.client.schemas.actions import DeploymentScheduleCreate
36-
37-
if HAS_PYDANTIC_V2:
38-
from pydantic.v1 import BaseModel, Field, parse_obj_as, root_validator, validator
39-
else:
40-
from pydantic import BaseModel, Field, parse_obj_as, root_validator, validator
41-
4241
from prefect.blocks.core import Block
4342
from prefect.blocks.fields import SecretDict
4443
from prefect.client.orchestration import PrefectClient, get_client
44+
from prefect.client.schemas.actions import DeploymentScheduleCreate
4545
from prefect.client.schemas.objects import (
4646
FlowRun,
4747
MinimalDeploymentSchedule,
@@ -53,7 +53,7 @@
5353
FlexibleScheduleList,
5454
)
5555
from prefect.deployments.steps.core import run_steps
56-
from prefect.events import DeploymentTriggerTypes
56+
from prefect.events import DeploymentTriggerTypes, TriggerTypes
5757
from prefect.exceptions import (
5858
BlockMissingCapabilities,
5959
ObjectAlreadyExists,
@@ -610,7 +610,7 @@ def _validate_schedule(cls, value):
610610
description="The parameter schema of the flow, including defaults.",
611611
)
612612
timestamp: datetime = Field(default_factory=partial(pendulum.now, "UTC"))
613-
triggers: List[DeploymentTriggerTypes] = Field(
613+
triggers: List[Union[DeploymentTriggerTypes, TriggerTypes]] = Field(
614614
default_factory=list,
615615
description="The triggers that should cause this deployment to run.",
616616
)

src/prefect/deployments/runner.py

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -42,26 +42,19 @@ def fast_flow():
4242
from rich.progress import Progress, SpinnerColumn, TextColumn, track
4343
from rich.table import Table
4444

45-
from prefect._internal.concurrency.api import create_call, from_async
4645
from prefect._internal.pydantic import HAS_PYDANTIC_V2
47-
from prefect._internal.schemas.validators import (
48-
reconcile_paused_deployment,
49-
reconcile_schedules_runner,
50-
validate_automation_names,
51-
)
52-
from prefect.runner.storage import RunnerStorage
53-
from prefect.settings import (
54-
PREFECT_DEFAULT_DOCKER_BUILD_NAMESPACE,
55-
PREFECT_DEFAULT_WORK_POOL_NAME,
56-
PREFECT_UI_URL,
57-
)
58-
from prefect.utilities.collections import get_from_dict, isiterable
5946

6047
if HAS_PYDANTIC_V2:
6148
from pydantic.v1 import BaseModel, Field, PrivateAttr, root_validator, validator
6249
else:
6350
from pydantic import BaseModel, Field, PrivateAttr, root_validator, validator
6451

52+
from prefect._internal.concurrency.api import create_call, from_async
53+
from prefect._internal.schemas.validators import (
54+
reconcile_paused_deployment,
55+
reconcile_schedules_runner,
56+
validate_automation_names,
57+
)
6558
from prefect.client.orchestration import get_client
6659
from prefect.client.schemas.objects import MinimalDeploymentSchedule
6760
from prefect.client.schemas.schedules import (
@@ -72,13 +65,20 @@ def fast_flow():
7265
FlexibleScheduleList,
7366
create_minimal_deployment_schedule,
7467
)
75-
from prefect.events import DeploymentTriggerTypes
68+
from prefect.events import DeploymentTriggerTypes, TriggerTypes
7669
from prefect.exceptions import (
7770
ObjectNotFound,
7871
PrefectHTTPStatusError,
7972
)
73+
from prefect.runner.storage import RunnerStorage
74+
from prefect.settings import (
75+
PREFECT_DEFAULT_DOCKER_BUILD_NAMESPACE,
76+
PREFECT_DEFAULT_WORK_POOL_NAME,
77+
PREFECT_UI_URL,
78+
)
8079
from prefect.utilities.asyncutils import sync_compatible
8180
from prefect.utilities.callables import ParameterSchema, parameter_schema
81+
from prefect.utilities.collections import get_from_dict, isiterable
8282
from prefect.utilities.dockerutils import (
8383
PushError,
8484
build_image,
@@ -179,7 +179,7 @@ class Config:
179179
"The path to the entrypoint for the workflow, relative to the `path`."
180180
),
181181
)
182-
triggers: List[DeploymentTriggerTypes] = Field(
182+
triggers: List[Union[DeploymentTriggerTypes, TriggerTypes]] = Field(
183183
default_factory=list,
184184
description="The triggers that should cause this deployment to run.",
185185
)
@@ -454,7 +454,7 @@ def from_flow(
454454
schedule: Optional[SCHEDULE_TYPES] = None,
455455
is_schedule_active: Optional[bool] = None,
456456
parameters: Optional[dict] = None,
457-
triggers: Optional[List[DeploymentTriggerTypes]] = None,
457+
triggers: Optional[List[Union[DeploymentTriggerTypes, TriggerTypes]]] = None,
458458
description: Optional[str] = None,
459459
tags: Optional[List[str]] = None,
460460
version: Optional[str] = None,
@@ -590,7 +590,7 @@ def from_entrypoint(
590590
schedule: Optional[SCHEDULE_TYPES] = None,
591591
is_schedule_active: Optional[bool] = None,
592592
parameters: Optional[dict] = None,
593-
triggers: Optional[List[DeploymentTriggerTypes]] = None,
593+
triggers: Optional[List[Union[DeploymentTriggerTypes, TriggerTypes]]] = None,
594594
description: Optional[str] = None,
595595
tags: Optional[List[str]] = None,
596596
version: Optional[str] = None,
@@ -688,7 +688,7 @@ async def from_storage(
688688
schedule: Optional[SCHEDULE_TYPES] = None,
689689
is_schedule_active: Optional[bool] = None,
690690
parameters: Optional[dict] = None,
691-
triggers: Optional[List[DeploymentTriggerTypes]] = None,
691+
triggers: Optional[List[Union[DeploymentTriggerTypes, TriggerTypes]]] = None,
692692
description: Optional[str] = None,
693693
tags: Optional[List[str]] = None,
694694
version: Optional[str] = None,

src/prefect/events/schemas/automations.py

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@
1919
from prefect._internal.schemas.validators import validate_trigger_within
2020

2121
if HAS_PYDANTIC_V2:
22-
from pydantic.v1 import Field, root_validator, validator
22+
from pydantic.v1 import Field, PrivateAttr, root_validator, validator
2323
from pydantic.v1.fields import ModelField
2424
else:
25-
from pydantic import Field, root_validator, validator
25+
from pydantic import Field, PrivateAttr, root_validator, validator
2626
from pydantic.fields import ModelField
2727

2828
from prefect._internal.schemas.bases import PrefectBaseModel
29-
from prefect.events.actions import ActionTypes
29+
from prefect.events.actions import ActionTypes, RunDeployment
3030
from prefect.utilities.collections import AutoEnum
3131

3232
from .events import ResourceSpecification
@@ -50,6 +50,48 @@ class Trigger(PrefectBaseModel, abc.ABC, extra="ignore"):
5050
def describe_for_cli(self, indent: int = 0) -> str:
5151
"""Return a human-readable description of this trigger for the CLI"""
5252

53+
# The following allows the regular Trigger class to be used when serving or
54+
# deploying flows, analogous to how the Deployment*Trigger classes work
55+
56+
_deployment_id: Optional[UUID] = PrivateAttr(default=None)
57+
58+
def set_deployment_id(self, deployment_id: UUID):
59+
self._deployment_id = deployment_id
60+
61+
def owner_resource(self) -> Optional[str]:
62+
return f"prefect.deployment.{self._deployment_id}"
63+
64+
def actions(self) -> List[RunDeployment]:
65+
assert self._deployment_id
66+
return [
67+
RunDeployment(
68+
deployment_id=self._deployment_id,
69+
parameters=getattr(self, "parameters", None),
70+
job_variables=getattr(self, "job_variables", None),
71+
)
72+
]
73+
74+
def as_automation(self) -> "AutomationCore":
75+
assert self._deployment_id
76+
77+
trigger = self
78+
79+
# This is one of the Deployment*Trigger classes, so translate it over to a
80+
# plain Trigger
81+
if hasattr(self, "trigger_type"):
82+
trigger = self.trigger_type(**self.dict())
83+
84+
return AutomationCore(
85+
name=(
86+
getattr(self, "name", None)
87+
or f"Automation for deployment {self._deployment_id}"
88+
),
89+
enabled=getattr(self, "enabled", True),
90+
trigger=trigger,
91+
actions=self.actions(),
92+
owner_resource=self.owner_resource(),
93+
)
94+
5395

5496
class ResourceTrigger(Trigger, abc.ABC):
5597
"""

0 commit comments

Comments
 (0)