Skip to content

Commit e539d40

Browse files
authored
Add automation name filter, ability to read automation by name (#12850)
1 parent 4dae0da commit e539d40

File tree

9 files changed

+256
-13
lines changed

9 files changed

+256
-13
lines changed

src/prefect/client/orchestration.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
handle_deprecated_infra_overrides_parameter,
2626
)
2727
from prefect._internal.pydantic import HAS_PYDANTIC_V2
28+
from prefect.client.schemas import sorting
29+
from prefect.events import filters
2830
from prefect.settings import (
2931
PREFECT_API_SERVICES_TRIGGERS_ENABLED,
3032
PREFECT_EXPERIMENTAL_EVENTS,
@@ -3178,6 +3180,45 @@ async def read_automation(self, automation_id: UUID) -> Optional[Automation]:
31783180
response.raise_for_status()
31793181
return Automation.parse_obj(response.json())
31803182

3183+
async def read_automation_by_name(self, name: str) -> Optional[Automation]:
3184+
"""
3185+
Query the Prefect API for an automation by name. Only automations matching the provided name will be returned.
3186+
3187+
If more than one automation matches the name, the most recently updated automation will be returned.
3188+
3189+
Args:
3190+
name: the name of the automation to query
3191+
3192+
Returns:
3193+
an Automation model representation of the automation, or None if not found. If more than one automation
3194+
matches the name, the most recently updated automation will be returned.
3195+
"""
3196+
if not self.server_type.supports_automations():
3197+
self._raise_for_unsupported_automations()
3198+
automation_filter = filters.AutomationFilter(name=dict(any_=[name]))
3199+
3200+
response = await self._client.post(
3201+
"/automations/filter",
3202+
json={
3203+
"limit": 1,
3204+
"sort": sorting.AutomationSort.UPDATED_DESC,
3205+
"automations": automation_filter.dict(json_compatible=True)
3206+
if automation_filter
3207+
else None,
3208+
},
3209+
)
3210+
3211+
response.raise_for_status()
3212+
3213+
if not response.json():
3214+
return None
3215+
3216+
else:
3217+
# normally a `/filter` endpoint would return a list of objects, but read_x_by_name
3218+
# methods return a single object in all other methods in the client, so
3219+
# we're ensuring parity there
3220+
return Automation.parse_obj(response.json()[0])
3221+
31813222
async def pause_automation(self, automation_id: UUID):
31823223
if not self.server_type.supports_automations():
31833224
self._raise_for_unsupported_automations()

src/prefect/client/schemas/sorting.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,15 @@ class TaskRunSort(AutoEnum):
2727
END_TIME_DESC = AutoEnum.auto()
2828

2929

30+
class AutomationSort(AutoEnum):
31+
"""Defines automation sorting options."""
32+
33+
CREATED_DESC = AutoEnum.auto()
34+
UPDATED_DESC = AutoEnum.auto()
35+
NAME_ASC = AutoEnum.auto()
36+
NAME_DESC = AutoEnum.auto()
37+
38+
3039
class LogSort(AutoEnum):
3140
"""Defines log sorting options."""
3241

src/prefect/events/filters.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,33 @@
1616
from pydantic import Field, PrivateAttr # type: ignore
1717

1818

19+
class AutomationFilterCreated(PrefectBaseModel):
20+
"""Filter by `Automation.created`."""
21+
22+
before_: Optional[DateTimeTZ] = Field(
23+
default=None,
24+
description="Only include automations created before this datetime",
25+
)
26+
27+
28+
class AutomationFilterName(PrefectBaseModel):
29+
"""Filter by `Automation.created`."""
30+
31+
any_: Optional[List[str]] = Field(
32+
default=None,
33+
description="Only include automations with names that match any of these strings",
34+
)
35+
36+
37+
class AutomationFilter(PrefectBaseModel):
38+
name: Optional[AutomationFilterName] = Field(
39+
default=None, description="Filter criteria for `Automation.name`"
40+
)
41+
created: Optional[AutomationFilterCreated] = Field(
42+
default=None, description="Filter criteria for `Automation.created`"
43+
)
44+
45+
1946
class EventDataFilter(PrefectBaseModel, extra="forbid"):
2047
"""A base class for filtering event data."""
2148

src/prefect/server/api/automations.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Sequence
1+
from typing import Optional, Sequence
22
from uuid import UUID
33

44
import pendulum
@@ -22,7 +22,7 @@
2222
from prefect.server.database.dependencies import provide_database_interface
2323
from prefect.server.database.interface import PrefectDBInterface
2424
from prefect.server.events import actions
25-
from prefect.server.events.models import automations
25+
from prefect.server.events.models import automations as automations_models
2626
from prefect.server.events.schemas.automations import (
2727
Automation,
2828
AutomationCreate,
@@ -133,15 +133,15 @@ async def create_automation(
133133
owner_resource = automation_dict.pop("owner_resource", None)
134134

135135
async with db.session_context(begin_transaction=True) as session:
136-
created_automation = await automations.create_automation(
136+
created_automation = await automations_models.create_automation(
137137
session=session,
138138
automation=Automation(
139139
**automation_dict,
140140
),
141141
)
142142

143143
if owner_resource:
144-
await automations.relate_automation_to_resource(
144+
await automations_models.relate_automation_to_resource(
145145
session,
146146
automation_id=created_automation.id,
147147
resource_id=owner_resource,
@@ -185,7 +185,7 @@ async def update_automation(
185185
)
186186

187187
async with db.session_context(begin_transaction=True) as session:
188-
updated = await automations.update_automation(
188+
updated = await automations_models.update_automation(
189189
session=session,
190190
automation_update=automation,
191191
automation_id=automation_id,
@@ -206,7 +206,7 @@ async def patch_automation(
206206
):
207207
try:
208208
async with db.session_context(begin_transaction=True) as session:
209-
updated = await automations.update_automation(
209+
updated = await automations_models.update_automation(
210210
session=session,
211211
automation_update=automation,
212212
automation_id=automation_id,
@@ -230,7 +230,7 @@ async def delete_automation(
230230
db: PrefectDBInterface = Depends(provide_database_interface),
231231
):
232232
async with db.session_context(begin_transaction=True) as session:
233-
deleted = await automations.delete_automation(
233+
deleted = await automations_models.delete_automation(
234234
session=session,
235235
automation_id=automation_id,
236236
)
@@ -244,14 +244,16 @@ async def read_automations(
244244
sort: AutomationSort = Body(AutomationSort.NAME_ASC),
245245
limit: int = LimitBody(),
246246
offset: int = Body(0, ge=0),
247+
automations: Optional[AutomationFilter] = None,
247248
db: PrefectDBInterface = Depends(provide_database_interface),
248249
) -> Sequence[Automation]:
249250
async with db.session_context() as session:
250-
return await automations.read_automations_for_workspace(
251+
return await automations_models.read_automations_for_workspace(
251252
session=session,
252253
sort=sort,
253254
limit=limit,
254255
offset=offset,
256+
automation_filter=automations,
255257
)
256258

257259

@@ -260,7 +262,7 @@ async def count_automations(
260262
db: PrefectDBInterface = Depends(provide_database_interface),
261263
) -> int:
262264
async with db.session_context() as session:
263-
return await automations.count_automations_for_workspace(session=session)
265+
return await automations_models.count_automations_for_workspace(session=session)
264266

265267

266268
@router.get("/{id:uuid}")
@@ -269,7 +271,7 @@ async def read_automation(
269271
db: PrefectDBInterface = Depends(provide_database_interface),
270272
) -> Automation:
271273
async with db.session_context() as session:
272-
automation = await automations.read_automation(
274+
automation = await automations_models.read_automation(
273275
session=session,
274276
automation_id=automation_id,
275277
)
@@ -285,7 +287,7 @@ async def read_automations_related_to_resource(
285287
db: PrefectDBInterface = Depends(provide_database_interface),
286288
) -> Sequence[Automation]:
287289
async with db.session_context() as session:
288-
return await automations.read_automations_related_to_resource(
290+
return await automations_models.read_automations_related_to_resource(
289291
session=session,
290292
resource_id=resource_id,
291293
)
@@ -297,7 +299,7 @@ async def delete_automations_owned_by_resource(
297299
db: PrefectDBInterface = Depends(provide_database_interface),
298300
):
299301
async with db.session_context(begin_transaction=True) as session:
300-
await automations.delete_automations_owned_by_resource(
302+
await automations_models.delete_automations_owned_by_resource(
301303
session,
302304
resource_id=resource_id,
303305
automation_filter=AutomationFilter(

src/prefect/server/events/filters.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,34 @@ def _get_filter_list(self, db: PrefectDBInterface) -> list:
4444
return filters
4545

4646

47+
class AutomationFilterName(PrefectFilterBaseModel):
48+
"""Filter by `Automation.created`."""
49+
50+
any_: Optional[List[str]] = Field(
51+
default=None,
52+
description="Only include automations with names that match any of these strings",
53+
)
54+
55+
def _get_filter_list(self, db: PrefectDBInterface) -> List:
56+
filters = []
57+
if self.any_ is not None:
58+
filters.append(db.Automation.name.in_(self.any_))
59+
return filters
60+
61+
4762
class AutomationFilter(PrefectOperatorFilterBaseModel):
63+
name: Optional[AutomationFilterName] = Field(
64+
default=None, description="Filter criteria for `Automation.name`"
65+
)
4866
created: Optional[AutomationFilterCreated] = Field(
4967
default=None, description="Filter criteria for `Automation.created`"
5068
)
5169

5270
def _get_filter_list(self, db: PrefectDBInterface) -> List:
5371
filters = []
5472

73+
if self.name is not None:
74+
filters.append(self.name.as_sql_filter(db))
5575
if self.created is not None:
5676
filters.append(self.created.as_sql_filter(db))
5777

src/prefect/server/events/models/automations.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,14 @@ async def read_automations_for_workspace(
3535
sort: AutomationSort = AutomationSort.NAME_ASC,
3636
limit: Optional[int] = None,
3737
offset: Optional[int] = None,
38+
automation_filter: Optional[filters.AutomationFilter] = None,
3839
) -> Sequence[Automation]:
3940
query = sa.select(db.Automation)
4041

4142
query = query.order_by(db.Automation.sort_expression(sort))
4243

44+
if automation_filter:
45+
query = query.where(automation_filter.as_sql_filter(db))
4346
if limit is not None:
4447
query = query.limit(limit)
4548
if offset is not None:

tests/client/test_prefect_client.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2090,6 +2090,77 @@ async def test_create_automation(self, cloud_client, automation: AutomationCore)
20902090
)
20912091
assert automation_id == UUID(created_automation["id"])
20922092

2093+
async def test_read_automation(self, cloud_client, automation: AutomationCore):
2094+
with respx.mock(base_url=PREFECT_CLOUD_API_URL.value()) as router:
2095+
created_automation = automation.dict(json_compatible=True)
2096+
created_automation["id"] = str(uuid4())
2097+
2098+
created_automation_id = created_automation["id"]
2099+
2100+
read_route = router.get(f"/automations/{created_automation_id}").mock(
2101+
return_value=httpx.Response(200, json=created_automation)
2102+
)
2103+
2104+
read_automation = await cloud_client.read_automation(created_automation_id)
2105+
2106+
assert read_route.called
2107+
assert read_automation.id == UUID(created_automation["id"])
2108+
2109+
async def test_read_automation_not_found(
2110+
self, cloud_client, automation: AutomationCore
2111+
):
2112+
with respx.mock(base_url=PREFECT_CLOUD_API_URL.value()) as router:
2113+
created_automation = automation.dict(json_compatible=True)
2114+
created_automation["id"] = str(uuid4())
2115+
2116+
created_automation_id = created_automation["id"]
2117+
2118+
read_route = router.get(f"/automations/{created_automation_id}").mock(
2119+
return_value=httpx.Response(404)
2120+
)
2121+
2122+
with pytest.raises(prefect.exceptions.PrefectHTTPStatusError, match="404"):
2123+
await cloud_client.read_automation(created_automation_id)
2124+
2125+
assert read_route.called
2126+
2127+
async def test_read_automation_by_name(
2128+
self, cloud_client, automation: AutomationCore
2129+
):
2130+
with respx.mock(base_url=PREFECT_CLOUD_API_URL.value()) as router:
2131+
created_automation = automation.dict(json_compatible=True)
2132+
created_automation["id"] = str(uuid4())
2133+
read_route = router.post("/automations/filter").mock(
2134+
return_value=httpx.Response(200, json=[created_automation])
2135+
)
2136+
read_automation = await cloud_client.read_automation_by_name(
2137+
automation.name
2138+
)
2139+
2140+
assert read_route.called
2141+
assert isinstance(read_automation, AutomationCore)
2142+
assert read_automation.id == UUID(created_automation["id"])
2143+
assert read_automation.name == automation.name == created_automation["name"]
2144+
2145+
async def test_read_automation_by_name_not_found(
2146+
self, cloud_client, automation: AutomationCore
2147+
):
2148+
with respx.mock(base_url=PREFECT_CLOUD_API_URL.value()) as router:
2149+
created_automation = automation.dict(json_compatible=True)
2150+
created_automation["id"] = str(uuid4())
2151+
created_automation["name"] = "nonexistent"
2152+
read_route = router.post("/automations/filter").mock(
2153+
return_value=httpx.Response(200, json=[])
2154+
)
2155+
2156+
nonexistent_automation = await cloud_client.read_automation_by_name(
2157+
name="nonexistent"
2158+
)
2159+
2160+
assert read_route.called
2161+
2162+
assert nonexistent_automation is None
2163+
20932164
async def test_delete_owned_automations_not_cloud_runtime_error(
20942165
self, prefect_client
20952166
):

tests/events/server/models/test_automations.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,34 @@ async def test_reading_automations_by_workspace_by_name(
5959
]
6060

6161

62+
async def test_reading_automation_by_workspace_by_name_filtered_match(
63+
automations_session: AsyncSession,
64+
some_workspace_automations: Sequence[Automation],
65+
):
66+
existing = await automations.read_automations_for_workspace(
67+
session=automations_session,
68+
sort=AutomationSort.NAME_ASC,
69+
automation_filter=filters.AutomationFilter(name={"any_": ["automation 2"]}),
70+
)
71+
72+
assert len(existing) == 1
73+
74+
assert existing[0].name == "automation 2"
75+
76+
77+
async def test_reading_automation_by_workspace_by_name_filtered_mismatch(
78+
automations_session: AsyncSession,
79+
some_workspace_automations: Sequence[Automation],
80+
):
81+
existing = await automations.read_automations_for_workspace(
82+
session=automations_session,
83+
sort=AutomationSort.NAME_ASC,
84+
automation_filter=filters.AutomationFilter(name={"any_": ["automation 5"]}),
85+
)
86+
87+
assert len(existing) == 0
88+
89+
6290
async def test_reading_automations_by_workspace_paging(
6391
automations_session: AsyncSession,
6492
some_workspace_automations: Sequence[Automation],

0 commit comments

Comments
 (0)