Skip to content

Commit c5025e3

Browse files
authored
CRUD API calls for scheduler (#1214)
* add linker table for workflow apscheduler * refactor for apschedule * Add crud API for creating schedule jobs * Update after review * remove export * undo psql * fix linting * remove unused * working example * Refactor and update Graphql * update settings * testing added * Added migrations for existing schedules * fixing unit tests * update get * update settings * update docs * Fix linting and test * fix unit tests and mypy * fix pre-commit * add more tests * update after review * refactor after review * restore validate_products file * fix minor issues * add upgrading guide for schedules * fix linting'
1 parent d7faa16 commit c5025e3

File tree

25 files changed

+1382
-108
lines changed

25 files changed

+1382
-108
lines changed

.bumpversion.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[bumpversion]
2-
current_version = 4.6.5
2+
current_version = 4.7.0rc1
33
commit = False
44
tag = False
55
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(rc(?P<build>\d+))?

docs/guides/tasks.md

Lines changed: 92 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,9 @@ Even if the task does not have any form input, an entry will still need to be ma
112112
}
113113
```
114114

115-
## The schedule file
115+
## The schedule file (DEPRECATED)
116+
> This section is deprecated and will be removed in version 5.0.0, please refer to the [new scheduling system](#the-schedule-api)
117+
> below.
116118
117119
> from `4.3.0` we switched from [schedule] package to [apscheduler] to allow schedules to be stored in the DB and schedule tasks from the API.
118120
@@ -152,11 +154,97 @@ To keep things organized and consistent (similar to how workflows are handled),
152154
> In previous versions, schedules needed to be explicitly listed in an ALL_SCHEDULERS variable.
153155
> This is no longer required, but ALL_SCHEDULERS is still supported for backwards compatibility.
154156
157+
158+
## The schedule API
159+
160+
> from `4.3.0` we switched from [schedule] package to [apscheduler] to allow schedules to be stored in the DB and schedule tasks from the API.
161+
162+
> from `4.7.0` we deprecated `@scheduler.scheduled_job()` provided by [apscheduler] in favor of a more dynamic API based system.
163+
> Although we do no longer support the `@scheduler.scheduled_job()` decorator, it is still available because it is part of [apscheduler].
164+
> Therefore, we do NOT recommend using it for new schedules. Because you will miss a Linker Table join between schedules and workflows/tasks.
165+
166+
167+
Schedules can be created, updated, and deleted via the REST API, and retrieved via the already existing GraphQL API. It
168+
will become possible to manage schedules through the
169+
UI ([development ticket](https://github.com/workfloworchestrator/orchestrator-ui-library/issues/2215)), but you may also
170+
use the API directly to automate configuration of your schedules.
171+
172+
*Example POST*
173+
174+
To create a schedule, you can now simply run a `POST` request to the `/api/schedules` endpoint with a JSON body containing the schedule details.
175+
An example body to create a nightly sync schedule would look like this:
176+
177+
```json
178+
{
179+
"name": "Nightly Product Validation",
180+
"workflow_name": "validate_products",
181+
"workflow_id": "e96cc6bb-9494-4ac1-a572-050988487ee1",
182+
"trigger": "interval",
183+
"trigger_kwargs": {
184+
"hours": 12
185+
}
186+
}
187+
```
188+
189+
Respectively, you can update or delete schedules via `PUT` and `DELETE` requests to the same endpoint.
190+
191+
*Example PUT*
192+
193+
With `PUT` you can only update the `name`, `trigger`, and `trigger_kwargs` of an existing schedule.
194+
For example, to update the above schedule to run every 24 hours instead of every 12 hours
195+
```json
196+
{
197+
"schedule_id": "c1b6e5e3-d9f0-48f2-bc65-3c9c33fcf561",
198+
"name": "Updated Nightly Cleanup",
199+
"trigger": "interval",
200+
"trigger_kwargs": {
201+
"hours": 24
202+
}
203+
}
204+
```
205+
206+
*Example DELETE*
207+
208+
To delete a schedule, you only need to provide the `schedule_id` in the `DELETE` call
209+
```json
210+
{
211+
"workflow_id": "b67d4ca7-19fb-4b83-a022-34c6322fb5f1",
212+
"schedule_id": "1fe43a96-b0f4-4c89-b9b7-87db14bbd8d3"
213+
}
214+
```
215+
216+
There are multiple triggers that can be used ([trigger docs]):
217+
218+
- [IntervalTrigger]: use when you want to run the task at fixed intervals of time.
219+
- [CronTrigger]: use when you want to run the task periodically at certain time(s) of day.
220+
- [DateTrigger]: use when you want to run the task just once at a certain point of time.
221+
- [CalendarIntervalTrigger]: use when you want to run the task on calendar-based intervals, at a specific time of day.
222+
- [AndTrigger]: use when you want to combine multiple triggers so the task only runs when **all** of them would fire at the same time.
223+
- [OrTrigger]: use when you want to combine multiple triggers so the task runs when **any one** of them would fire.
224+
225+
For detailed configuration options, see the [APScheduler scheduling docs].
226+
227+
The scheduler automatically loads any schedules that are imported before the scheduler starts.
228+
229+
> In previous versions, schedules needed to be explicitly listed in an ALL_SCHEDULERS variable.
230+
> This is no longer required; ALL_SCHEDULERS is deprecated as of orchestrator-core 4.7.0 and will be removed in 5.0.0.
231+
> Follow-up ticket to remove deprecated code: [#1276](https://github.com/workfloworchestrator/orchestrator-core/issues/1276)
232+
155233
## The scheduler
156234

157235
The scheduler is invoked via `python main.py scheduler`.
158236
Try `--help` or review the [CLI docs][cli-docs] to learn more.
159237

238+
### Initial schedules
239+
From version orchestrator-core >= `4.7.0`, the scheduler uses the database to store schedules instead of hard-coded schedule files.
240+
Previous versions (orchestrator-core < `4.7.0` had hard-coded schedules. These can be ported to the new system by creating them via the API or CLI.
241+
Run the following CLI command to import previously existing orchestrator-core schedules and change them if needed via the API.
242+
243+
```shell
244+
python main.py scheduler load-initial-schedule
245+
```
246+
247+
> Remember, that if you do not explicitly import these, they will not be available to the scheduler.
160248
### Manually executing tasks
161249

162250
When doing development, it is possible to manually make the scheduler run your task even if your Orchestrator instance is not in "scheduler mode."
@@ -165,11 +253,11 @@ Shell into your running instance and run the following:
165253

166254
```shell
167255
docker exec -it backend /bin/bash
168-
python main.py scheduler force run_nightly_sync
256+
python main.py scheduler force "c1b6e5e3-d9f0-48f2-bc65-3c9c33fcf561"
169257
```
170258

171-
...where `run_nightly_sync` is the job defined in the schedule file - not the name of the task.
172-
This doesn't depend on the UI being up, and you can get the logging output.
259+
...where `c1b6e5e3-d9f0-48f2-bc65-3c9c33fcf561` is the job id of the schedule you want to run.
260+
The job id can be found via the GraphQL API or directly in the database.
173261

174262
### Starting the scheduler
175263

@@ -213,30 +301,6 @@ fi
213301
```
214302

215303

216-
## Developer notes
217-
218-
### Executing multiple tasks
219-
220-
If one needs to execute multiple tasks in concert with each other, one can not call a task from another task. Which is to say, calling `start_process` is a "top level" call. Trying to call it inside an already invoked task does not work.
221-
222-
But the schedule (ie: crontab) files are also code modules so one can achieve the same thing there:
223-
224-
```python
225-
@scheduler(name="Nightly sync", time_unit="day", at="00:10")
226-
def run_nightly_sync() -> None:
227-
subs = Subscription.query.filter(
228-
Subscription.description.like("Node%Provisioned")
229-
).all()
230-
logger.info("Node schedule subs", subs=subs)
231-
232-
for sub in subs:
233-
sub_id = sub.subscription_id
234-
logger.info("Validate node enrollment", sub_id=sub_id)
235-
start_process("validate_node_enrollment", [{"subscription_id": sub_id}])
236-
237-
start_process("task_sync_from")
238-
```
239-
240304
[schedule]: https://pypi.org/project/schedule/
241305
[apscheduler]: https://pypi.org/project/APScheduler/
242306
[IntervalTrigger]: https://apscheduler.readthedocs.io/en/master/api.html#apscheduler.triggers.interval.IntervalTrigger

docs/guides/upgrading/4.0.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,3 +71,26 @@ This will update the `target` for all workflows that are `SYSTEM` or `VALIDATE`
7171

7272
This is a breaking change, so you will need to test your workflows after making this change to ensure that they are
7373
working as expected.
74+
75+
## Scheduling flows
76+
In 4.4.0 we have introduced a new way to schedule workflows via the decorator [@scheduler.scheduled_job(...)](../tasks.md#the-schedule-file-deprecated).
77+
In 4.7.0 we introduce a new way to schedule workflows via the [Scheduler API](../tasks.md#the-schedule-api).
78+
79+
The 4.7.0 migration will create the `apscheduler_jobs` table if missing, so upgrades from older versions still succeed, but the table will initially be empty.
80+
81+
### Who needs to take action?
82+
83+
**Users upgrading from 4.4–4.6:**
84+
- Nothing special required; your scheduler data already exists.
85+
86+
**Users upgrading from <4.4 who used the old decorator:**
87+
- Your schedules will not automatically reappear. Restore them by running:
88+
-
89+
```
90+
python main.py scheduler load-initial-schedule
91+
```
92+
(More information: [Scheduler API](../tasks.md#the-schedule-api).)
93+
Make sure your schedule definitions are imported so the scheduler can discover them.
94+
95+
**Users who never used the decorator:**
96+
- No action required; the migration creates the table and you can add schedules normally.

docs/reference-docs/websockets.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ The main component is the `WebSocketManager` (WSM) which has the following respo
1818
3. Provide an interface to pass messages from a backend process (workflow/task)
1919

2020
In a setup with multiple isolated Orchestrator instances the WSM is initialized multiple times as well, therefore clients can be connected to any arbitrary WSM instance.
21-
Letting a backend process broadcast messages to all clients thus requires a message broker, for which we use [Redis Pub/Sub](https://redis.io/docs/manual/pubsub).
21+
Letting a backend process broadcast messages to all clients thus requires a message broker, for which we use [Redis Pub/Sub](https://redis.io/docs/latest/develop/pubsub/).
2222

2323
There are 2 WSM implementations: a `MemoryWebsocketManager` for development/testing, and a `BroadcastWebsocketManager` that connects to Redis. We'll continue to discuss the latter.
2424

orchestrator/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
"""This is the orchestrator workflow engine."""
1515

16-
__version__ = "4.6.5"
16+
__version__ = "4.7.0rc1"
1717

1818

1919
from structlog import get_logger

orchestrator/api/api_v1/api.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
product_blocks,
2323
products,
2424
resource_types,
25+
schedules,
2526
settings,
2627
subscription_customer_descriptions,
2728
subscriptions,
@@ -88,6 +89,9 @@
8889
api_router.include_router(
8990
ws.router, prefix="/ws", tags=["Core", "Events"]
9091
) # Auth on the websocket is handled in the Websocket Manager
92+
api_router.include_router(
93+
schedules.router, prefix="/schedules", tags=["Core", "Schedules"], dependencies=[Depends(authorize)]
94+
)
9195

9296
if llm_settings.SEARCH_ENABLED:
9397
from orchestrator.api.api_v1.endpoints import search
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
# Copyright 2019-2025 SURF.
2+
# Licensed under the Apache License, Version 2.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
#
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
#
8+
# Unless required by applicable law or agreed to in writing, software
9+
# distributed under the License is distributed on an "AS IS" BASIS,
10+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
# See the License for the specific language governing permissions and
12+
# limitations under the License.
13+
from http import HTTPStatus
14+
15+
import structlog
16+
from fastapi.routing import APIRouter
17+
18+
from orchestrator.schedules.service import add_scheduled_task_to_queue
19+
from orchestrator.schemas.schedules import APSchedulerJobCreate, APSchedulerJobDelete, APSchedulerJobUpdate
20+
21+
logger = structlog.get_logger(__name__)
22+
23+
router: APIRouter = APIRouter()
24+
25+
26+
@router.post("/", status_code=HTTPStatus.CREATED)
27+
def create_scheduled_task(payload: APSchedulerJobCreate) -> dict[str, str]:
28+
"""Create a scheduled task."""
29+
add_scheduled_task_to_queue(payload)
30+
return {"message": "Added to Create Queue", "status": "CREATED"}
31+
32+
33+
@router.put("/", status_code=HTTPStatus.OK)
34+
async def update_scheduled_task(payload: APSchedulerJobUpdate) -> dict[str, str]:
35+
"""Update a scheduled task."""
36+
add_scheduled_task_to_queue(payload)
37+
return {"message": "Added to Update Queue", "status": "UPDATED"}
38+
39+
40+
@router.delete("/", status_code=HTTPStatus.OK)
41+
async def delete_scheduled_task(payload: APSchedulerJobDelete) -> dict[str, str]:
42+
"""Delete a scheduled task."""
43+
add_scheduled_task_to_queue(payload)
44+
return {"message": "Added to Delete Queue", "status": "DELETED"}

orchestrator/cli/scheduler.py

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,28 +10,56 @@
1010
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1111
# See the License for the specific language governing permissions and
1212
# limitations under the License.
13-
14-
1513
import time
14+
from typing import cast
1615

1716
import typer
17+
from redis import Redis
1818

1919
from orchestrator.schedules.scheduler import (
2020
get_all_scheduler_tasks,
2121
get_scheduler,
2222
get_scheduler_task,
2323
)
24+
from orchestrator.schedules.service import (
25+
SCHEDULER_QUEUE,
26+
add_scheduled_task_to_queue,
27+
workflow_scheduler_queue,
28+
)
29+
from orchestrator.schemas.schedules import APSchedulerJobCreate
30+
from orchestrator.services.workflows import get_workflow_by_name
31+
from orchestrator.settings import app_settings
32+
from orchestrator.utils.redis_client import create_redis_client
2433

2534
app: typer.Typer = typer.Typer()
2635

2736

2837
@app.command()
2938
def run() -> None:
3039
"""Start scheduler and loop eternally to keep thread alive."""
31-
with get_scheduler():
32-
while True:
40+
41+
def _get_scheduled_task_item_from_queue(redis_conn: Redis) -> tuple[str, bytes] | None:
42+
"""Get an item from the Redis Queue for scheduler tasks."""
43+
try:
44+
return redis_conn.brpop(SCHEDULER_QUEUE, timeout=1)
45+
except ConnectionError as e:
46+
typer.echo(f"There was a connection error with Redis. Retrying in 3 seconds... {e}")
47+
time.sleep(3)
48+
except Exception as e:
49+
typer.echo(f"There was an unexpected error with Redis. Retrying in 1 second... {e}")
3350
time.sleep(1)
3451

52+
return None
53+
54+
with get_scheduler() as scheduler_connection:
55+
redis_connection = create_redis_client(app_settings.CACHE_URI)
56+
while True:
57+
item = _get_scheduled_task_item_from_queue(redis_connection)
58+
if not item:
59+
continue
60+
61+
workflow_scheduler_queue(item, scheduler_connection)
62+
3563

3664
@app.command()
3765
def show_schedule() -> None:
@@ -59,3 +87,45 @@ def force(task_id: str) -> None:
5987
except Exception as e:
6088
typer.echo(f"Task execution failed: {e}")
6189
raise typer.Exit(code=1)
90+
91+
92+
@app.command()
93+
def load_initial_schedule() -> None:
94+
"""Load the initial schedule into the scheduler."""
95+
initial_schedules = [
96+
{
97+
"name": "Task Resume Workflows",
98+
"workflow_name": "task_resume_workflows",
99+
"workflow_id": "",
100+
"trigger": "interval",
101+
"trigger_kwargs": {"hours": 1},
102+
},
103+
{
104+
"name": "Task Clean Up Tasks",
105+
"workflow_name": "task_clean_up_tasks",
106+
"workflow_id": "",
107+
"trigger": "interval",
108+
"trigger_kwargs": {"hours": 6},
109+
},
110+
{
111+
"name": "Task Validate Subscriptions",
112+
"workflow_name": "task_validate_subscriptions",
113+
"workflow_id": "",
114+
"trigger": "cron",
115+
"trigger_kwargs": {"hour": 0, "minute": 10},
116+
},
117+
]
118+
119+
for schedule in initial_schedules:
120+
# enrich with workflow id
121+
workflow_name = cast(str, schedule.get("workflow_name"))
122+
workflow = get_workflow_by_name(workflow_name)
123+
124+
if not workflow:
125+
typer.echo(f"Workflow '{schedule['workflow_name']}' not found. Skipping schedule.")
126+
continue
127+
128+
schedule["workflow_id"] = workflow.workflow_id
129+
130+
typer.echo(f"Initial Schedule: {schedule}")
131+
add_scheduled_task_to_queue(APSchedulerJobCreate(**schedule)) # type: ignore

0 commit comments

Comments
 (0)