Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
73e20dc
feat: Add FAST_START option to model enums
keiranjprice101 Feb 5, 2026
2b63b22
feat: Add JobType Filters
keiranjprice101 Feb 5, 2026
e368c4d
feat: Handle JobType-specific owner assignment in `create_job` test u…
keiranjprice101 Feb 5, 2026
36e77b7
feat: Add `FastStartJob` model to `job.py`
keiranjprice101 Feb 5, 2026
04b80b7
feat: Add support for `FAST_START` jobs handling and filtering
keiranjprice101 Feb 9, 2026
1d81f55
feat: Rename `exclude_fast_start_jobs` to `include_fast_start_jobs` f…
keiranjprice101 Feb 9, 2026
49c88ef
feat: Add `JobType.FAST_START` filtering to job count tests
keiranjprice101 Feb 9, 2026
3cdb2e1
temp build push
keiranjprice101 Feb 10, 2026
da00e20
Change host env var
keiranjprice101 Feb 10, 2026
b1e0f2b
Update `llsp_url` to include `/execute` endpoint in job creation
keiranjprice101 Feb 10, 2026
09db8d0
Remove repeated connection
keiranjprice101 Feb 12, 2026
a254f39
Merge branch 'main' into llsp_integration
keiranjprice101 Feb 12, 2026
150b80b
Fix broken route
keiranjprice101 Feb 12, 2026
5eb2684
Ruff fixes
keiranjprice101 Feb 12, 2026
04af2c3
Refactor type annotations for clarity and improve `Specification` han…
keiranjprice101 Feb 12, 2026
e8132b1
Fix test assertion to reflect correct broker call count
keiranjprice101 Feb 12, 2026
8da56f8
Add `LLSPSubmissionFailureError` and handler for failed LLSP-API subm…
keiranjprice101 Feb 23, 2026
f9d77e1
Add `LLSPSubmissionFailureError` and corresponding exception handler
keiranjprice101 Feb 23, 2026
fe9876c
Replace `JobRequestError` with `LLSPSubmissionFailureError` for faile…
keiranjprice101 Feb 23, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/build-and-push.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
name: Build and Push Docker Images

on:
push:
branches:
- main
on: push
# push:
# branches:
# - main

env:
REGISTRY: ghcr.io
Expand Down
4 changes: 4 additions & 0 deletions fia_api/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,7 @@ class DataIntegrityError(Exception):

class JobOwnerError(DataIntegrityError):
"""Job has no owner"""


class LLSPSubmissionFailureError(Exception):
"""LLSP submission failed"""
64 changes: 58 additions & 6 deletions fia_api/core/job_maker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@
import functools
import json
import logging
import os
from collections.abc import Callable
from pathlib import Path
from typing import Any
from typing import Any, Concatenate, ParamSpec, TypeVar, cast

import requests
from pika.adapters.blocking_connection import BlockingConnection # type: ignore[import-untyped]
from pika.connection import ConnectionParameters # type: ignore[import-untyped]
from pika.credentials import PlainCredentials # type: ignore[import-untyped]
from sqlalchemy.orm import Session

from fia_api.core.exceptions import JobRequestError
from fia_api.core.exceptions import JobRequestError, LLSPSubmissionFailureError
from fia_api.core.models import Job, JobOwner, JobType, Script, State
from fia_api.core.repositories import Repo
from fia_api.core.specifications.job import JobSpecification
Expand All @@ -23,7 +25,11 @@
logger = logging.getLogger(__name__)


def require_owner(func: Callable[..., Any]) -> Callable[..., Any]:
P = ParamSpec("P")
R = TypeVar("R")


def require_owner(func: Callable[Concatenate[JobMaker, P], R]) -> Callable[Concatenate[JobMaker, P], R]:
"""
Decorator to ensure that either a user_number or experiment_number is provided to the function. if not, raise a
JobRequestError
Expand All @@ -32,12 +38,12 @@ def require_owner(func: Callable[..., Any]) -> Callable[..., Any]:
"""

@functools.wraps(func)
def wrapper(self: JobMaker, *args: tuple[Any], **kwargs: dict[str, Any]) -> Any:
def wrapper(self: JobMaker, *args: P.args, **kwargs: P.kwargs) -> R:
if kwargs.get("user_number") is None and kwargs.get("experiment_number") is None:
raise JobRequestError("Something needs to own the job, either experiment_number or user_number.")
return func(self, *args, **kwargs)

return wrapper
return cast("Callable[Concatenate[JobMaker, P], R]", wrapper)


class JobMaker:
Expand All @@ -58,7 +64,6 @@ def __init__(
self.queue_name = queue_name
self.connection = None
self.channel = None
self._connect_to_broker()

def _connect_to_broker(self) -> None:
"""
Expand Down Expand Up @@ -195,6 +200,53 @@ def create_simple_job(
self._send_message(json.dumps(message_dict))
return job.id

@require_owner
def create_fast_start_job(
self, runner_image: str, script: str, experiment_number: int | None = None, user_number: int | None = None
) -> int:
"""
Create a fast start job by calling the external LLSP API.
:param runner_image: The image used as a runner
:param script: The script to be used
:param experiment_number: (unused but required by decorator/signature consistency if reused)
:param user_number: the user number of the owner
:return: created job id
"""

job_owner = self._get_or_create_job_owner(None, user_number) # fast starts are user jobs only

script_object = self._get_or_create_script(script)

job = Job(
owner=job_owner,
job_type=JobType.FAST_START,
runner_image=runner_image,
script_id=script_object.id,
state=State.NOT_STARTED,
inputs={},
)

# Call External API
llsp_url = os.environ.get("LLSP_API_HOST", "http://localhost:8001")
llsp_key = os.environ.get("LLSP_API_KEY", "shh")

try:
response = requests.post(
f"{llsp_url}/execute",
json={"script": script},
headers={"Authorization": f"Bearer {llsp_key}"},
timeout=10,
)
response.raise_for_status()
except requests.RequestException as e:
logger.error(f"Failed to submit fast start job to LLSP: {e}")
job.state = State.UNSUCCESSFUL
job.outputs = "Job failed to submit to LLSP."
raise LLSPSubmissionFailureError(f"Failed to submit fast start job: {e}") from e
finally:
job = self._job_repo.add_one(job)
return job.id

def _get_or_create_script(self, script: str) -> Script:
script_hash = hash_script(script)
script_object = self._script_repo.find_one(ScriptSpecification().by_script_hash(script_hash))
Expand Down
1 change: 1 addition & 0 deletions fia_api/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class JobType(enum.Enum):
RERUN = "RERUN"
SIMPLE = "SIMPLE"
AUTOREDUCTION = "AUTOREDUCTION"
FAST_START = "FAST_START"


class Base(DeclarativeBase):
Expand Down
41 changes: 34 additions & 7 deletions fia_api/core/services/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from fia_api.core.repositories import Repo
from fia_api.core.request_models import AutoreductionRequest, PartialJobUpdateRequest
from fia_api.core.session import get_db_session
from fia_api.core.specifications.base import Specification
from fia_api.core.specifications.filters import apply_filters_to_spec
from fia_api.core.specifications.instrument import InstrumentSpecification
from fia_api.core.specifications.job import JobSpecification
Expand Down Expand Up @@ -53,6 +54,10 @@ class SimpleJob(BaseModel):
script: str


class FastStartJob(BaseModel):
script: str


class RerunJob(BaseModel):
job_id: int
runner_image: str
Expand Down Expand Up @@ -82,6 +87,7 @@ def get_job_by_instrument(
order_direction: Literal["asc", "desc"] = "desc",
user_number: int | None = None,
filters: Mapping[str, Any] | None = None,
include_fast_start_jobs: bool = False,
) -> Sequence[Job]:
"""
Given an instrument name return a sequence of jobs for that instrument. Optionally providing a limit and
Expand All @@ -94,6 +100,7 @@ def get_job_by_instrument(
:param order_by: (str) Field to order by.
:param user_number: (optional[str]) The user number of who is making the request
:param filters: Optional Mapping[str,Any] the filters to be applied to the query
:param include_fast_start_jobs: (bool) Whether to include fast start jobs
:return: Sequence of Jobs for an instrument
"""
specification = JobSpecification().by_instruments(
Expand All @@ -104,10 +111,13 @@ def get_job_by_instrument(
order_direction=order_direction,
user_number=user_number,
)
spec: Specification[Job] = specification
if filters:
specification = apply_filters_to_spec(filters, specification)
spec = apply_filters_to_spec(filters, spec)
if not include_fast_start_jobs:
spec = apply_filters_to_spec({"job_type_not_in": [JobType.FAST_START]}, spec)
job_repo: Repo[Job] = Repo(session)
return job_repo.find(specification)
return job_repo.find(spec)


def get_all_jobs(
Expand All @@ -118,6 +128,7 @@ def get_all_jobs(
order_direction: Literal["asc", "desc"] = "desc",
user_number: int | None = None,
filters: Mapping[str, Any] | None = None,
include_fast_start_jobs: bool = False,
) -> Sequence[Job]:
"""
Get all jobs, if a user number is provided then only the jobs that user has permission for will be
Expand All @@ -129,6 +140,7 @@ def get_all_jobs(
:param order_direction: (str) Direction to der by "asc" | "desc"
:param order_by: (str) Field to order by.
:param filters: Optional Mapping[str,Any] the filters to be applied
:param include_fast_start_jobs: (bool) Whether to include fast start jobs
:return: A Sequence of Jobs
"""
specification = JobSpecification()
Expand All @@ -141,10 +153,13 @@ def get_all_jobs(
specification = specification.by_experiment_numbers(
experiment_numbers, limit=limit, offset=offset, order_by=order_by, order_direction=order_direction
)
spec: Specification[Job] = specification
if filters:
apply_filters_to_spec(filters, specification)
spec = apply_filters_to_spec(filters, spec)
if not include_fast_start_jobs:
spec = apply_filters_to_spec({"job_type_not_in": [JobType.FAST_START]}, spec)
job_repo: Repo[Job] = Repo(session)
return job_repo.find(specification)
return job_repo.find(spec)


def get_job_by_id(
Expand Down Expand Up @@ -174,33 +189,45 @@ def get_job_by_id(
return job


def count_jobs_by_instrument(instrument: str, session: Session, filters: Mapping[str, Any] | None) -> int:
def count_jobs_by_instrument(
instrument: str,
session: Session,
filters: Mapping[str, Any] | None,
include_fast_start_jobs: bool = False,
) -> int:
"""
Given an instrument name, count the jobs for that instrument
:param instrument: Instruments to count from
:param session: The current session of the request
:param include_fast_start_jobs: (bool) Whether to include fast start jobs
:return: Number of jobs
"""
spec = JobSpecification().by_instruments(instruments=[instrument])
spec: Specification[Job] = JobSpecification().by_instruments(instruments=[instrument])
if filters:
spec = apply_filters_to_spec(filters, spec)
if not include_fast_start_jobs:
spec = apply_filters_to_spec({"job_type_not_in": [JobType.FAST_START]}, spec)
job_repo: Repo[Job] = Repo(session)
return job_repo.count(spec)


def count_jobs(
session: Session,
filters: Mapping[str, Any] | None = None,
include_fast_start_jobs: bool = False,
) -> int:
"""
Count the total number of jobs
:param filters: Optional Mapping[str,Any] the filters to be applied
:param session: The current session of the request
:param include_fast_start_jobs: (bool) Whether to include fast start jobs
:return: (int) number of jobs
"""
spec = JobSpecification().all()
spec: Specification[Job] = JobSpecification().all()
if filters:
spec = apply_filters_to_spec(filters, spec)
if not include_fast_start_jobs:
spec = apply_filters_to_spec({"job_type_not_in": [JobType.FAST_START]}, spec)
job_repo: Repo[Job] = Repo(session)
return job_repo.count(spec)

Expand Down
24 changes: 23 additions & 1 deletion fia_api/core/specifications/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from typing import Any

from fia_api.core.exceptions import BadRequestError
from fia_api.core.models import Instrument, Job, JobOwner, Run
from fia_api.core.models import Instrument, Job, JobOwner, JobType, Run
from fia_api.core.specifications.base import Specification, T

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -50,6 +50,24 @@ def apply(self, specification: Specification[T]) -> Specification[T]:
return specification


class JobTypeInFilter(Filter):
"""Filter implementation that checks if job types are included in the query."""

def apply(self, specification: Specification[T]) -> Specification[T]:
job_types = [JobType(val) for val in self.value]
specification.value = specification.value.where(Job.job_type.in_(job_types))
return specification


class JobTypeNotInFilter(Filter):
"""Filter implementation that checks if job types are NOT included in the query."""

def apply(self, specification: Specification[T]) -> Specification[T]:
job_types = [JobType(val) for val in self.value]
specification.value = specification.value.where(Job.job_type.notin_(job_types))
return specification


class JobStateFilter(Filter):
"""Filter implementation that checks if job states match the specified value in the query."""

Expand Down Expand Up @@ -170,6 +188,10 @@ def get_filter(key: str, value: Any) -> Filter: # noqa: C901, PLR0911, PLR0912
return JobStateFilter(value)
case "experiment_number_in":
return ExperimentNumberInFilter(value)
case "job_type_in":
return JobTypeInFilter(value)
case "job_type_not_in":
return JobTypeNotInFilter(value)
case "title":
return TitleFilter(value)
case "filename":
Expand Down
13 changes: 13 additions & 0 deletions fia_api/exception_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,19 @@ async def bad_job_request_handler(_: Request, __: Exception) -> JSONResponse:
)


async def llsp_api_request_handler(_: Request, __: Exception) -> JSONResponse:
"""
Automatically return a 424 when a job submission to LLSP-API fails
:param _:
:param __:
:return: JSONResponse with 424
"""
return JSONResponse(
status_code=HTTPStatus.FAILED_DEPENDENCY,
content={"message": "Job Failed to submit to LLSP-API"},
)


async def missing_script_handler(_: Request, __: Exception) -> JSONResponse:
"""
Automatically return a 404 when the script could not be found locally or remote
Expand Down
3 changes: 3 additions & 0 deletions fia_api/fia_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
GithubAPIRequestError,
JobOwnerError,
JobRequestError,
LLSPSubmissionFailureError,
MissingRecordError,
MissingScriptError,
NoFilesAddedError,
Expand All @@ -29,6 +30,7 @@
data_integrity_handler,
github_api_request_handler,
job_owner_err_handler,
llsp_api_request_handler,
missing_record_handler,
missing_script_handler,
no_files_added_handler,
Expand Down Expand Up @@ -98,3 +100,4 @@ def filter(self, record: logging.LogRecord) -> bool:
app.add_exception_handler(BadRequestError, bad_request_handler)
app.add_exception_handler(DataIntegrityError, data_integrity_handler)
app.add_exception_handler(JobOwnerError, job_owner_err_handler)
app.add_exception_handler(LLSPSubmissionFailureError, llsp_api_request_handler)
Loading
Loading