Skip to content

Commit dba8485

Browse files
authored
Flow run state change events (#12825)
1 parent dfe6670 commit dba8485

File tree

12 files changed

+1540
-48
lines changed

12 files changed

+1540
-48
lines changed

src/prefect/client/orchestration.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3143,9 +3143,7 @@ async def read_automations(self) -> List[Automation]:
31433143
response.raise_for_status()
31443144
return pydantic.parse_obj_as(List[Automation], response.json())
31453145

3146-
async def find_automation(
3147-
self, id_or_name: str, exit_if_not_found: bool = True
3148-
) -> Optional[Automation]:
3146+
async def find_automation(self, id_or_name: str) -> Optional[Automation]:
31493147
try:
31503148
id = UUID(id_or_name)
31513149
except ValueError:

src/prefect/server/api/deployments.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ async def create_deployment(
115115
elif deployment.work_queue_name:
116116
# If just a queue name was provided, ensure that the queue exists and
117117
# get its ID.
118-
work_queue = await models.work_queues._ensure_work_queue_exists(
118+
work_queue = await models.work_queues.ensure_work_queue_exists(
119119
session=session, name=deployment.work_queue_name
120120
)
121121
deployment_dict["work_queue_id"] = work_queue.id

src/prefect/server/models/deployments.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,8 +218,8 @@ async def update_deployment(
218218
elif deployment.work_queue_name:
219219
# If just a queue name was provided, ensure the queue exists and
220220
# get its ID.
221-
work_queue = await models.work_queues._ensure_work_queue_exists(
222-
session=session, name=update_data["work_queue_name"], db=db
221+
work_queue = await models.work_queues.ensure_work_queue_exists(
222+
session=session, name=update_data["work_queue_name"]
223223
)
224224
update_data["work_queue_id"] = work_queue.id
225225

src/prefect/server/models/events.py

Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
from datetime import datetime, timedelta
2+
from typing import Any, Dict, List, MutableMapping, Optional, Set, Union
3+
from uuid import UUID
4+
5+
from cachetools import TTLCache
6+
from sqlalchemy.ext.asyncio import AsyncSession
7+
8+
from prefect.server import models, schemas
9+
from prefect.server.database.orm_models import (
10+
ORMDeployment,
11+
ORMFlow,
12+
ORMFlowRun,
13+
ORMFlowRunState,
14+
ORMTaskRun,
15+
ORMTaskRunState,
16+
ORMWorkPool,
17+
ORMWorkQueue,
18+
)
19+
from prefect.server.events.schemas.events import Event
20+
from prefect.server.models import deployments
21+
from prefect.settings import PREFECT_API_EVENTS_RELATED_RESOURCE_CACHE_TTL
22+
from prefect.utilities.text import truncated_to
23+
24+
ResourceData = Dict[str, Dict[str, Any]]
25+
RelatedResourceList = List[Dict[str, str]]
26+
27+
28+
# Some users use state messages to convey error messages and large results; let's
29+
# truncate them so they don't blow out the size of a message
30+
TRUNCATE_STATE_MESSAGES_AT = 100_000
31+
32+
33+
_flow_run_resource_data_cache: MutableMapping[UUID, ResourceData] = TTLCache(
34+
maxsize=1000,
35+
ttl=PREFECT_API_EVENTS_RELATED_RESOURCE_CACHE_TTL.value().total_seconds(),
36+
)
37+
38+
39+
async def flow_run_state_change_event(
40+
session: AsyncSession,
41+
occurred: datetime,
42+
flow_run: ORMFlowRun,
43+
initial_state_id: Optional[UUID],
44+
initial_state: Optional[schemas.states.State],
45+
validated_state_id: Optional[UUID],
46+
validated_state: schemas.states.State,
47+
) -> Event:
48+
return Event(
49+
occurred=occurred,
50+
event=f"prefect.flow-run.{validated_state.name}",
51+
resource={
52+
"prefect.resource.id": f"prefect.flow-run.{flow_run.id}",
53+
"prefect.resource.name": flow_run.name,
54+
"prefect.state-message": truncated_to(
55+
TRUNCATE_STATE_MESSAGES_AT, validated_state.message
56+
),
57+
"prefect.state-name": validated_state.name or "",
58+
"prefect.state-timestamp": (
59+
validated_state.timestamp.isoformat()
60+
if validated_state.timestamp
61+
else None
62+
),
63+
"prefect.state-type": validated_state.type.value,
64+
},
65+
related=await _flow_run_related_resources_from_orm(
66+
session=session, flow_run=flow_run
67+
),
68+
payload={
69+
"intended": {
70+
"from": _state_type(initial_state),
71+
"to": _state_type(validated_state),
72+
},
73+
"initial_state": state_payload(initial_state),
74+
"validated_state": state_payload(validated_state),
75+
},
76+
# Here we use the state's ID as the ID of the event as well, in order to
77+
# establish the ordering of the state-change events
78+
id=validated_state_id,
79+
follows=initial_state_id if _timing_is_tight(occurred, initial_state) else None,
80+
)
81+
82+
83+
async def _flow_run_related_resources_from_orm(
84+
session: AsyncSession, flow_run: ORMFlowRun
85+
) -> RelatedResourceList:
86+
resource_data = _flow_run_resource_data_cache.get(flow_run.id)
87+
if not resource_data:
88+
flow = await models.flows.read_flow(session=session, flow_id=flow_run.flow_id)
89+
deployment: Optional[ORMDeployment] = None
90+
if flow_run.deployment_id:
91+
deployment = await deployments.read_deployment(
92+
session, deployment_id=flow_run.deployment_id
93+
)
94+
95+
work_queue = None
96+
if flow_run.work_queue_id:
97+
work_queue = await models.work_queues.read_work_queue(
98+
session, work_queue_id=flow_run.work_queue_id
99+
)
100+
101+
work_pool = work_queue.work_pool if work_queue is not None else None
102+
103+
task_run: Optional[ORMTaskRun] = None
104+
if flow_run.parent_task_run_id:
105+
task_run = await models.task_runs.read_task_run(
106+
session,
107+
task_run_id=flow_run.parent_task_run_id,
108+
)
109+
110+
print(task_run)
111+
if task_run:
112+
print("events", task_run.id, task_run.tags)
113+
114+
resource_data = _as_resource_data(
115+
flow_run, flow, deployment, work_queue, work_pool, task_run
116+
)
117+
_flow_run_resource_data_cache[flow_run.id] = resource_data
118+
119+
return _resource_data_as_related_resources(
120+
resource_data,
121+
excluded_kinds=["flow-run"],
122+
) + _provenance_as_related_resources(flow_run.created_by)
123+
124+
125+
def _as_resource_data(
126+
flow_run: ORMFlowRun,
127+
flow: Union[ORMFlow, schemas.core.Flow, None],
128+
deployment: Union[ORMDeployment, schemas.responses.DeploymentResponse, None],
129+
work_queue: Union[ORMWorkQueue, schemas.responses.WorkQueueResponse, None],
130+
work_pool: Union[ORMWorkPool, schemas.core.WorkPool, None],
131+
task_run: Union[ORMTaskRun, schemas.core.TaskRun, None] = None,
132+
) -> ResourceData:
133+
return {
134+
"flow-run": {
135+
"id": str(flow_run.id),
136+
"name": flow_run.name,
137+
"tags": flow_run.tags if flow_run.tags else [],
138+
"role": "flow-run",
139+
},
140+
"flow": (
141+
{
142+
"id": str(flow.id),
143+
"name": flow.name,
144+
"tags": flow.tags if flow.tags else [],
145+
"role": "flow",
146+
}
147+
if flow
148+
else {}
149+
),
150+
"deployment": (
151+
{
152+
"id": str(deployment.id),
153+
"name": deployment.name,
154+
"tags": deployment.tags if deployment.tags else [],
155+
"role": "deployment",
156+
}
157+
if deployment
158+
else {}
159+
),
160+
"work-queue": (
161+
{
162+
"id": str(work_queue.id),
163+
"name": work_queue.name,
164+
"tags": [],
165+
"role": "work-queue",
166+
}
167+
if work_queue
168+
else {}
169+
),
170+
"work-pool": (
171+
{
172+
"id": str(work_pool.id),
173+
"name": work_pool.name,
174+
"tags": [],
175+
"role": "work-pool",
176+
}
177+
if work_pool
178+
else {}
179+
),
180+
"task-run": (
181+
{
182+
"id": str(task_run.id),
183+
"name": task_run.name,
184+
"tags": task_run.tags if task_run.tags else [],
185+
"role": "task-run",
186+
}
187+
if task_run
188+
else {}
189+
),
190+
}
191+
192+
193+
def _resource_data_as_related_resources(
194+
resource_data: ResourceData,
195+
excluded_kinds: Optional[List[str]] = None,
196+
) -> RelatedResourceList:
197+
related = []
198+
tags: Set[str] = set()
199+
200+
if excluded_kinds is None:
201+
excluded_kinds = []
202+
203+
for kind, data in resource_data.items():
204+
tags |= set(data.get("tags", []))
205+
206+
if kind in excluded_kinds or not data:
207+
continue
208+
209+
related.append(
210+
{
211+
"prefect.resource.id": f"prefect.{kind}.{data['id']}",
212+
"prefect.resource.role": data["role"],
213+
"prefect.resource.name": data["name"],
214+
}
215+
)
216+
217+
related += [
218+
{
219+
"prefect.resource.id": f"prefect.tag.{tag}",
220+
"prefect.resource.role": "tag",
221+
}
222+
for tag in sorted(tags)
223+
]
224+
225+
return related
226+
227+
228+
def _provenance_as_related_resources(
229+
created_by: Optional[schemas.core.CreatedBy],
230+
) -> RelatedResourceList:
231+
if not created_by:
232+
return []
233+
234+
resource_id: str
235+
236+
if created_by.type == "DEPLOYMENT":
237+
resource_id = f"prefect.deployment.{created_by.id}"
238+
elif created_by.type == "AUTOMATION":
239+
resource_id = f"prefect.automation.{created_by.id}"
240+
else:
241+
return []
242+
243+
related = {
244+
"prefect.resource.id": resource_id,
245+
"prefect.resource.role": "creator",
246+
}
247+
if created_by.display_value:
248+
related["prefect.resource.name"] = created_by.display_value
249+
return [related]
250+
251+
252+
def _state_type(
253+
state: Union[ORMFlowRunState, ORMTaskRunState, Optional[schemas.states.State]],
254+
) -> Optional[str]:
255+
return str(state.type.value) if state else None
256+
257+
258+
def state_payload(state: Optional[schemas.states.State]) -> Optional[Dict[str, str]]:
259+
"""Given a State, return the essential string parts of it for use in an
260+
event payload"""
261+
if not state:
262+
return None
263+
payload: Dict[str, str] = {"type": state.type.value}
264+
if state.name:
265+
payload["name"] = state.name
266+
if state.message:
267+
payload["message"] = truncated_to(TRUNCATE_STATE_MESSAGES_AT, state.message)
268+
if state.is_paused():
269+
payload["pause_reschedule"] = str(state.state_details.pause_reschedule).lower()
270+
return payload
271+
272+
273+
def _timing_is_tight(
274+
occurred: datetime,
275+
initial_state: Union[
276+
ORMFlowRunState, ORMTaskRunState, Optional[schemas.states.State]
277+
],
278+
) -> bool:
279+
# Only connect events with event.follows if the timing here is tight, which will
280+
# help us resolve the order of these events if they happen to be delivered out of
281+
# order. If the preceding state change happened a while back, don't worry about
282+
# it because the order is very likely to be unambiguous.
283+
TIGHT_TIMING = timedelta(minutes=5)
284+
if initial_state and initial_state.timestamp:
285+
return bool(-TIGHT_TIMING < (occurred - initial_state.timestamp) < TIGHT_TIMING)
286+
287+
return False

0 commit comments

Comments
 (0)