Skip to content

Handle processing_service_name parameters in requests from workers#1117

Open
carlosgjs wants to merge 21 commits intoRolnickLab:mainfrom
uw-ssec:carlosg/servname2
Open

Handle processing_service_name parameters in requests from workers#1117
carlosgjs wants to merge 21 commits intoRolnickLab:mainfrom
uw-ssec:carlosg/servname2

Conversation

@carlosgjs
Copy link
Collaborator

@carlosgjs carlosgjs commented Feb 5, 2026

Summary

This pull request introduces a new optional query parameter, processing_service_name, to several job-related API endpoints. This parameter allows clients to specify the name of the processing service making the request, which is then logged for auditing and debugging purposes. The change also removes unused OpenAPI parameter definitions in ami/utils/requests.py (previously moved to ami/jobs/schemas.py).

The parameter is optional for easier deployment of this and the corresponding change in the worker.

Companion PR

Worker-side changes: RolnickLab/ami-data-companion#108

Related Issues

Closes #1112
Support for #1087

Testing Logs

Logs:

[2026-02-04 17:49:15] INFO Job 60 result received from processing service: AMI Data Companion (Carloss-MacBook-Pro.local)
[2026-02-04 17:49:13] INFO Job 60 tasks (4) requested by processing service: AMI Data Companion (Carloss-MacBook-Pro.local)

Deployment Notes

Include instructions if this PR requires specific steps for its deployment (database migrations, config changes, etc.)

Checklist

  • I have tested these changes appropriately.
  • I have added and/or modified relevant tests.
  • I updated relevant documentation or comments.
  • I have verified that this PR follows the project's coding standards.
  • Any dependent changes have already been merged to main.

Summary by CodeRabbit

  • New Features

    • API endpoints now accept an optional processing_service_name parameter; it is documented and logged to improve request traceability.
  • Behavior Changes

    • Classification default resolution can now honor request-scoped settings, which may change the default threshold selection.
  • Tests

    • Added tests for the processing_service_name parameter across job endpoints; test helpers updated to allow extra job fields.

@netlify
Copy link

netlify bot commented Feb 5, 2026

👷 Deploy request for antenna-ssec pending review.

Visit the deploys page to approve it

Name Link
🔨 Latest commit 9cb886b

@netlify
Copy link

netlify bot commented Feb 5, 2026

Deploy Preview for antenna-preview canceled.

Name Link
🔨 Latest commit 9cb886b
🔍 Latest deploy log https://app.netlify.com/projects/antenna-preview/deploys/699603213f152f0008b9fde7

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 5, 2026

📝 Walkthrough

Walkthrough

Adds an optional processing_service_name query parameter and logging to Job endpoints (list, tasks, result); makes default classification threshold request-aware by adding a request parameter and removes four module-level OpenAPI parameter exports; tests added and a test helper signature updated.

Changes

Cohort / File(s) Summary
Processing service parameter
ami/jobs/schemas.py, ami/jobs/views.py, ami/jobs/tests.py
Add processing_service_name_param OpenApiParameter; import and document it on list, tasks, and result endpoints; add non-exported helper _log_processing_service_name and log calls; add test_processing_service_name_parameter; update test helper _create_ml_job(..., **kwargs).
Request-aware threshold & cleanup
ami/utils/requests.py
Change get_default_classification_threshold(project=None, request=None) to be request-aware (uses get_apply_default_filters_flag / request-scoped threshold); remove module-level OpenAPI parameter exports (project_id_doc_param, ids_only_param, incomplete_only_param, batch_param).

Sequence Diagram(s)

sequenceDiagram
    participant Client as Client
    participant API as Django\ API
    participant JobView as JobViewSet
    participant Logger as Logger

    Client->>API: GET /jobs?processing_service_name=worker-A
    API->>JobView: route to list/tasks/result
    JobView->>JobView: extract `processing_service_name` from query
    JobView->>Logger: _log_processing_service_name("worker-A", endpoint)
    Logger-->>JobView: logged
    JobView-->>API: response (200)
    API-->>Client: 200 OK
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Suggested labels

backend, ml

Suggested reviewers

  • mihow

Poem

🐰 I hopped along the HTTP vine,
A worker's name now tagged in line,
I logged the footsteps, soft and neat,
Jobs now know which paws they meet.
Carrot cheers for every logged heartbeat.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 70.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main change: introducing support for handling processing_service_name parameters from worker requests.
Linked Issues check ✅ Passed The PR implements the core objective of providing visibility into processing services by adding logging of processing_service_name across job endpoints (list, tasks, result), directly addressing issue #1112.
Out of Scope Changes check ✅ Passed The PR includes removal of unused OpenAPI parameter definitions from ami/utils/requests.py, which is documented as part of this change and supports the overall refactoring. All changes remain focused on processing_service_name handling.
Description check ✅ Passed The pull request description covers the summary, list of changes, related issues, detailed description with testing logs, but is missing deployment notes detail and has incomplete checklist items.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@carlosgjs carlosgjs marked this pull request as ready for review February 5, 2026 01:29
Copilot AI review requested due to automatic review settings February 5, 2026 01:29
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request adds logging capabilities to track which processing services are making requests to job-related API endpoints. The changes introduce an optional processing_service_name query parameter to the list, tasks, and result endpoints, which is logged when provided. The PR also consolidates OpenAPI parameter definitions by moving them from ami/utils/requests.py to ami/jobs/schemas.py.

Changes:

  • Added processing_service_name query parameter to job list, tasks, and result endpoints
  • Created _log_processing_service_name() helper function to log service names from requests
  • Removed unused OpenAPI parameter imports and definitions from ami/utils/requests.py

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.

File Description
ami/utils/requests.py Removed unused OpenAPI imports and parameter definitions that were moved to ami/jobs/schemas.py
ami/jobs/schemas.py Added processing_service_name_param OpenAPI parameter definition
ami/jobs/views.py Imported new parameter, added logging calls in list/tasks/result endpoints, implemented _log_processing_service_name() helper function

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@ami/jobs/views.py`:
- Around line 335-351: The function _log_processing_service_name can return None
when the "processing_service_name" query param is missing, so update its return
type annotation from -> str to include None (either -> str | None for Python
3.10+ or -> Optional[str] with "from typing import Optional" for older
versions); ensure you add the Optional import if used and run a quick type-check
to confirm no other callers require changes.

@carlosgjs carlosgjs requested a review from mihow February 5, 2026 01:37
mihow added a commit that referenced this pull request Feb 17, 2026
Resolve conflicts in ami/jobs/views.py and ami/jobs/tests.py:
- views.py: keep both dispatch_mode guard and service name logging
- tests.py: keep both test classes, fix service name test to set async dispatch_mode
carlosgjs and others added 8 commits February 17, 2026 10:30
* merge

* fix: Properly handle async job state with celery tasks

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Delete implemented plan

---------

Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* merge

* feat: PSv2 - Queue/redis clean-up upon job completion

* fix: catch specific exception

* chore: move tests to a subdir

---------

Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com>
Co-authored-by: Michael Bunsen <notbot@gmail.com>
…kLab#1118)

Introduces the dispatch_mode field on the Job model to track how each job dispatches its workload.

This allows API clients (including the AMI worker) to filter jobs by dispatch mode — for example, fetching only async_api jobs so workers don't pull synchronous or internal jobs.

JobDispatchMode enum (ami/jobs/models.py):

internal — work handled entirely within the platform (Celery worker, no external calls). Default for all jobs.
sync_api — worker calls an external processing service API synchronously and waits for each response.
async_api — worker publishes items to NATS for external processing service workers to pick up independently.
Database and Model Changes:

Added dispatch_mode CharField with TextChoices, defaulting to internal, with the migration in ami/jobs/migrations/0019_job_dispatch_mode.py.
ML jobs set dispatch_mode = async_api when the project's async_pipeline_workers feature flag is enabled.
ML jobs set dispatch_mode = sync_api on the synchronous processing path (previously unset).
API and Filtering:

dispatch_mode is exposed (read-only) in job list and detail serializers.
Filterable via query parameter: ?dispatch_mode=async_api
The /tasks endpoint now returns 400 for non-async_api jobs, since only those have NATS tasks to fetch.
Architecture doc: docs/claude/job-dispatch-modes.md documents the three modes, naming decisions, and per-job-type mapping.

---------

Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com>
Co-authored-by: Michael Bunsen <notbot@gmail.com>
Co-authored-by: Claude <noreply@anthropic.com>
…dler (RolnickLab#1125)

* refactor: use is_complete() and dispatch_mode in job progress handler

Replace hardcoded `stage == "results"` check with `job.progress.is_complete()`
which verifies ALL stages are done, making it work for any job type.

Replace feature flag check in cleanup with `dispatch_mode == ASYNC_API`
which is immutable for the job's lifetime and more correct than re-reading
a mutable flag that could change between job creation and completion.

Co-Authored-By: Claude <noreply@anthropic.com>

* test: update cleanup tests for is_complete() and dispatch_mode checks

Set dispatch_mode=ASYNC_API on test jobs to match the new cleanup guard.
Complete all stages (collect, process, results) in the completion test
since is_complete() correctly requires all stages to be done.

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>
* merge

* Tests for async result processing

* fix formatting

* CR feedback

* refactor: add public get_progress() to TaskStateManager

Add a read-only get_progress(stage) method that returns a progress
snapshot without acquiring a lock or mutating state. Use it in
test_tasks.py instead of calling the private _get_progress() directly.

Co-Authored-By: Claude <noreply@anthropic.com>

* docs: clarify Celery .delay() vs .apply_async() calling convention

Three reviewers were confused by how mock.call_args works here.
.delay(**kw) passes ((), kw) as two positional args to apply_async,
which is different from apply_async(kwargs=kw).

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com>
Co-authored-by: Michael Bunsen <notbot@gmail.com>
Co-authored-by: Claude <noreply@anthropic.com>
* merge

* Update ML job counts in async case

* Update date picker version and tweak layout logic (RolnickLab#1105)

* fix: update date picker version and tweak layout logic

* feat: set start month based on selected date

* fix: Properly handle async job state with celery tasks  (RolnickLab#1114)

* merge

* fix: Properly handle async job state with celery tasks

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Delete implemented plan

---------

Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* PSv2: Implement queue clean-up upon job completion (RolnickLab#1113)

* merge

* feat: PSv2 - Queue/redis clean-up upon job completion

* fix: catch specific exception

* chore: move tests to a subdir

---------

Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com>
Co-authored-by: Michael Bunsen <notbot@gmail.com>

* fix: PSv2: Workers should not try to fetch tasks from v1 jobs (RolnickLab#1118)

Introduces the dispatch_mode field on the Job model to track how each job dispatches its workload. 


This allows API clients (including the AMI worker) to filter jobs by dispatch mode — for example, fetching only async_api jobs so workers don't pull synchronous or internal jobs.

JobDispatchMode enum (ami/jobs/models.py):

internal — work handled entirely within the platform (Celery worker, no external calls). Default for all jobs.
sync_api — worker calls an external processing service API synchronously and waits for each response.
async_api — worker publishes items to NATS for external processing service workers to pick up independently.
Database and Model Changes:

Added dispatch_mode CharField with TextChoices, defaulting to internal, with the migration in ami/jobs/migrations/0019_job_dispatch_mode.py.
ML jobs set dispatch_mode = async_api when the project's async_pipeline_workers feature flag is enabled.
ML jobs set dispatch_mode = sync_api on the synchronous processing path (previously unset).
API and Filtering:

dispatch_mode is exposed (read-only) in job list and detail serializers.
Filterable via query parameter: ?dispatch_mode=async_api
The /tasks endpoint now returns 400 for non-async_api jobs, since only those have NATS tasks to fetch.
Architecture doc: docs/claude/job-dispatch-modes.md documents the three modes, naming decisions, and per-job-type mapping.

---------

Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com>
Co-authored-by: Michael Bunsen <notbot@gmail.com>
Co-authored-by: Claude <noreply@anthropic.com>

* PSv2 cleanup: use is_complete() and dispatch_mode in job progress handler (RolnickLab#1125)

* refactor: use is_complete() and dispatch_mode in job progress handler

Replace hardcoded `stage == "results"` check with `job.progress.is_complete()`
which verifies ALL stages are done, making it work for any job type.

Replace feature flag check in cleanup with `dispatch_mode == ASYNC_API`
which is immutable for the job's lifetime and more correct than re-reading
a mutable flag that could change between job creation and completion.

Co-Authored-By: Claude <noreply@anthropic.com>

* test: update cleanup tests for is_complete() and dispatch_mode checks

Set dispatch_mode=ASYNC_API on test jobs to match the new cleanup guard.
Complete all stages (collect, process, results) in the completion test
since is_complete() correctly requires all stages to be done.

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>

* track captures and failures

* Update tests, CR feedback, log error images

* CR feedback

* fix type checking

* refactor: rename _get_progress to _commit_update in TaskStateManager

Clarify naming to distinguish mutating vs read-only methods:
- _commit_update(): private, writes mutations to Redis, returns progress
- get_progress(): public, read-only snapshot (added in RolnickLab#1129)
- update_state(): public API, acquires lock, calls _commit_update()

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: unify FAILURE_THRESHOLD and convert TaskProgress to dataclass

- Single FAILURE_THRESHOLD constant in tasks.py, imported by models.py
- Fix async path to use `> FAILURE_THRESHOLD` (was `>=`) to match
  the sync path's boundary behavior at exactly 50%
- Convert TaskProgress from namedtuple to dataclass with defaults,
  so new fields don't break existing callers

Co-Authored-By: Claude <noreply@anthropic.com>

* refactor: rename TaskProgress to JobStateProgress

Clarify that this dataclass tracks job-level progress in Redis,
not individual task/image progress. Aligns with the naming of
JobProgress (the Django/Pydantic model equivalent).

Co-Authored-By: Claude <noreply@anthropic.com>

* docs: update NATS todo and planning docs with session learnings

Mark connection handling as done (PR RolnickLab#1130), add worktree/remote
mapping and docker testing notes for future sessions.

Co-Authored-By: Claude <noreply@anthropic.com>

* Rename TaskStateManager to AsyncJobStateManager

* Track results counts in the job itself vs Redis

* small simplification

* Reset counts to 0 on reset

* chore: remove local planning docs from PR branch

Co-Authored-By: Claude <noreply@anthropic.com>

* docs: clarify three-layer job state architecture in docstrings

Explain the relationship between AsyncJobStateManager (Redis),
JobProgress (JSONB), and JobState (enum). Clarify that all counts
in JobStateProgress refer to source images (captures).

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com>
Co-authored-by: Anna Viklund <annamariaviklund@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Michael Bunsen <notbot@gmail.com>
Co-authored-by: Claude <noreply@anthropic.com>
…ines (RolnickLab#1076)

* RFC: V2 endpoint to register pipeliens

* merge

* Allow null enpoint_url for processing services

* Add tests

* Add processing_service_name

* Tests for pipeline registration

* Add default (None) for endpoint_url

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>

* Better assert

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* CR feedback

* Simplify registration payload

* CR feedback

* fix test

* chore: remove old comment

* chore: update display name

* feat: filter with db query instead of python, update docstring

* docs: add plan for migrating to an existing DRF pattern

* Refactor pipeline registration to nested DRF route (#10)

* feat: add ProjectNestedPermission for nested project routes

Reusable permission class for nested routes under /projects/{pk}/.
Allows read access to any user, write access to project owners,
superusers, and ProjectManagers. Designed for pipelines, tags,
taxa lists, and similar nested resources.

Co-Authored-By: Claude <noreply@anthropic.com>

* refactor: move pipeline registration to nested DRF route

Replace the pipelines action on ProjectViewSet with a proper nested
ViewSet at /api/v2/projects/{project_pk}/pipelines/. Adds GET (list)
and POST (register) using standard DRF patterns with SchemaField for
pydantic validation, transaction.atomic() for DB ops, and idempotent
re-registration.

- Add PipelineRegistrationSerializer in ami/ml/serializers.py
- Add ProjectPipelineViewSet in ami/ml/views.py
- Register nested route in config/api_router.py
- Remove old pipelines action + unused imports from ProjectViewSet

Co-Authored-By: Claude <noreply@anthropic.com>

* test: update pipeline registration tests for nested route

- Payload uses flat {processing_service_name, pipelines} format
- Success returns 201 instead of 200
- Re-registration is now idempotent (no longer returns 400)
- Add test_list_pipelines for GET endpoint

Co-Authored-By: Claude <noreply@anthropic.com>

* feat: add guardian permissions for ProjectPipelineConfig

Add CREATE/UPDATE/DELETE_PROJECT_PIPELINE_CONFIG to Project.Permissions
and Meta.permissions. Assign create permission to MLDataManager and all
three to ProjectManager, enabling granular access control for pipeline
registration instead of the coarse ProjectManager.has_role() check.

Co-Authored-By: Claude <noreply@anthropic.com>

* refactor: replace ProjectNestedPermission with ProjectPipelineConfigPermission

Replace the generic ProjectNestedPermission with
ProjectPipelineConfigPermission following the UserMembershipPermission
pattern. The new class extends ObjectPermission and creates a temporary
ProjectPipelineConfig instance to leverage BaseModel.check_permission(),
which handles draft project visibility and guardian permission checks.

Update ProjectPipelineViewSet to use ProjectMixin with
require_project=True instead of manual kwargs lookups.

Co-Authored-By: Claude <noreply@anthropic.com>

* test: add permission tests for project pipelines endpoint

Update setUp to use create_roles_for_project and guardian permissions
instead of is_staff=True. Add tests for draft project access (403 for
non-members), unauthenticated writes (401/403), and public project
reads (200 for non-members).

Co-Authored-By: Claude <noreply@anthropic.com>

* test: verify list pipelines response contains project pipelines

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>

* chore: remove planning doc from PR branch

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Michael Bunsen <notbot@gmail.com>
Co-authored-by: Claude <noreply@anthropic.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
ami/jobs/tests.py (1)

417-418: Redundant save after creation.

_create_ml_job now passes dispatch_mode directly to Job.objects.create, which persists it on INSERT. The subsequent job.save(update_fields=["dispatch_mode"]) issues an unnecessary UPDATE to the same value and can be removed.

♻️ Proposed fix
-        job = self._create_ml_job("Job for batch test", pipeline, dispatch_mode=JobDispatchMode.ASYNC_API)
-        job.save(update_fields=["dispatch_mode"])
+        job = self._create_ml_job("Job for batch test", pipeline, dispatch_mode=JobDispatchMode.ASYNC_API)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/jobs/tests.py` around lines 417 - 418, The test creates a job with
dispatch_mode already persisted by _create_ml_job (which calls
Job.objects.create with dispatch_mode), so the subsequent
job.save(update_fields=["dispatch_mode"]) is redundant; remove the job.save(...)
call (the line containing job.save(update_fields=["dispatch_mode"])) so the test
relies on the initial INSERT and avoids an unnecessary UPDATE when using
JobDispatchMode in _create_ml_job.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@ami/jobs/views.py`:
- Around line 350-355: The log embeds the untrusted query param
processing_service_name via f-strings (logger.info/logger.debug) which allows
log injection and forces eager formatting; instead sanitize
processing_service_name to remove control/newline characters (e.g., strip or
regex-remove \r, \n and other non-printables) and then pass it to the logger
using lazy formatting (logger.info("Jobs %s by processing service: %s", context,
safe_processing_service_name) and logger.debug("Jobs %s without processing
service name", context)) so replace the f-strings around processing_service_name
with sanitized value and percent-style/argument logging in the
processing_service_name handling code.

---

Nitpick comments:
In `@ami/jobs/tests.py`:
- Around line 417-418: The test creates a job with dispatch_mode already
persisted by _create_ml_job (which calls Job.objects.create with dispatch_mode),
so the subsequent job.save(update_fields=["dispatch_mode"]) is redundant; remove
the job.save(...) call (the line containing
job.save(update_fields=["dispatch_mode"])) so the test relies on the initial
INSERT and avoids an unnecessary UPDATE when using JobDispatchMode in
_create_ml_job.

Comment on lines +350 to +355
processing_service_name = request.query_params.get("processing_service_name", None)

if processing_service_name:
logger.info(f"Jobs {context} by processing service: {processing_service_name}")
else:
logger.debug(f"Jobs {context} without processing service name")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Sanitize user-supplied processing_service_name before embedding in log messages.

processing_service_name is an unvalidated, user-controlled query parameter. Embedding it verbatim in log messages enables log injection: a compromised worker can supply a value containing \n followed by fabricated log records (e.g., "svc\nCRITICAL:ami.jobs:Fake security event"), which can poison structured log aggregators, SIEM pipelines, or flat log files. While callers must be authenticated, any valid processing service can exploit this.

Additionally, f-strings force eager string evaluation regardless of the effective log level; use %s lazy formatting instead.

🛡️ Proposed fix – sanitize input and use lazy log formatting
-    if processing_service_name:
-        logger.info(f"Jobs {context} by processing service: {processing_service_name}")
-    else:
-        logger.debug(f"Jobs {context} without processing service name")
+    if processing_service_name:
+        # Sanitize to prevent log injection via newlines / carriage returns
+        sanitized_name = processing_service_name.replace("\n", "\\n").replace("\r", "\\r")
+        logger.info("Jobs %s by processing service: %s", context, sanitized_name)
+    else:
+        logger.debug("Jobs %s without processing service name", context)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/jobs/views.py` around lines 350 - 355, The log embeds the untrusted query
param processing_service_name via f-strings (logger.info/logger.debug) which
allows log injection and forces eager formatting; instead sanitize
processing_service_name to remove control/newline characters (e.g., strip or
regex-remove \r, \n and other non-printables) and then pass it to the logger
using lazy formatting (logger.info("Jobs %s by processing service: %s", context,
safe_processing_service_name) and logger.debug("Jobs %s without processing
service name", context)) so replace the f-strings around processing_service_name
with sanitized value and percent-style/argument logging in the
processing_service_name handling code.

Copy link
Collaborator

@mihow mihow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review: processing_service_name approach

Thanks for adding service identification to the job endpoints — this is useful for debugging and auditing. I have some suggestions on how to make this fit better with existing patterns in the codebase.

Current pattern issues

1. Raw query_params.get() instead of validated field

The codebase uses Django form fields or SingleParamSerializer to clean query params:

  • IntegerField(required=True, min_value=1).clean(...) for batch (views.py:231)
  • CharField(required=False).clean(...) for q (main/api/views.py:1370)

But the new param reads it raw with no validation or length limit (views.py:350). An unbounded string goes directly into f-string log messages.

2. Hand-crafted dict responses

The result endpoint returns ad-hoc dicts (views.py:304-324, 330-335) and the tasks endpoint declares responses={200: dict} — no useful OpenAPI schema for the ADC worker to consume.

3. Query param on a POST endpoint

On the result action, processing_service_name is a query parameter on a POST request. Metadata about the caller doesn't naturally belong in the query string of a POST.

Proposal: use a request header instead

The service name is metadata about the caller, not about the resource — exactly what HTTP headers are for. A header like X-Antenna-Processing-Service would:

  1. Work uniformly across GET and POST — no query-param-on-POST awkwardness
  2. Be set once on the HTTP client — the ADC worker sets it on the requests.Session and every request carries it automatically, no need to thread it through each API call
  3. Need no body schema changes for PipelineTaskResult or any existing schemas

Shared sanitizer in ami/utils/requests.py

import re

HEADER_NAME = "HTTP_X_ANTENNA_PROCESSING_SERVICE"
MAX_SERVICE_NAME_LENGTH = 200

def get_processing_service_name(request) -> str:
    """
    Extract and sanitize the processing service name from the request.

    Sources (in priority order):
    1. X-Antenna-Processing-Service header
    2. processing_service_name in request body (for registration endpoint)
    3. Fallback to caller IP
    """
    name = request.META.get(HEADER_NAME, "").strip()

    if not name and isinstance(request.data, dict):
        name = (request.data.get("processing_service_name") or "").strip()

    if not name:
        name = _get_client_ip(request)

    return _sanitize_service_name(name)


def _sanitize_service_name(name: str) -> str:
    """Strip control characters/newlines and truncate."""
    name = re.sub(r"[\x00-\x1f\x7f]", "", name)
    return name[:MAX_SERVICE_NAME_LENGTH]


def _get_client_ip(request) -> str:
    forwarded = request.META.get("HTTP_X_FORWARDED_FOR")
    if forwarded:
        return forwarded.split(",")[0].strip()
    return request.META.get("REMOTE_ADDR", "unknown")

This always returns a usable string (never None). The registration endpoint keeps its body field for creating the ProcessingService model, but logging goes through the same sanitizer.

A ViewSet mixin for centralized logging

Instead of per-action _log_processing_service_name() calls, a mixin on JobViewSet:

class ProcessingServiceMixin:
    """Mixin that extracts and logs the calling processing service."""

    def initial(self, request, *args, **kwargs):
        super().initial(request, *args, **kwargs)
        request.processing_service_name = get_processing_service_name(request)

    def finalize_response(self, request, response, *args, **kwargs):
        response = super().finalize_response(request, response, *args, **kwargs)
        name = getattr(request, "processing_service_name", None)
        if name:
            logger.info(f"{request.method} {request.path} by service: {name}")
        return response

Then JobViewSet(DefaultViewSet, ProjectMixin, ProcessingServiceMixin) — zero per-action boilerplate.

OpenAPI via header parameter

processing_service_header = OpenApiParameter(
    name="X-Antenna-Processing-Service",
    location=OpenApiParameter.HEADER,
    description="Name of the calling processing service. Falls back to caller IP.",
    required=False,
    type=str,
)

ADC worker side

Instead of threading processing_service_name through each API call:

session.headers["X-Antenna-Processing-Service"] = settings.ANTENNA_SERVICE_NAME

Summary

Current Proposed
Query param OpenApiParameter Header OpenApiParameter
_log_processing_service_name() called 3× ProcessingServiceMixin logs once per request
Raw query_params.get(), no validation get_processing_service_name() with sanitization + IP fallback
Missing name → debug noise Missing name → uses caller IP, always has a value
Registration body field standalone Registration keeps body field, logging shares same sanitizer

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: PSv2 - Have visibility into which workers are subscribed to a job

3 participants

Comments