Skip to content

chore: CI + coverage + type safety + security baseline#110

Draft
MTG-Thomas wants to merge 21 commits intomainfrom
chore/python-professionalization-ci-coverage
Draft

chore: CI + coverage + type safety + security baseline#110
MTG-Thomas wants to merge 21 commits intomainfrom
chore/python-professionalization-ci-coverage

Conversation

@MTG-Thomas
Copy link
Copy Markdown
Owner

@MTG-Thomas MTG-Thomas commented Apr 19, 2026

Summary

Adds production-grade engineering baseline:

  • Ruff linting + formatting
  • Pre-commit hooks
  • Coverage reporting (Codecov)
  • Mypy (incremental type checking)
  • Dependency vulnerability scanning (pip-audit)
  • Security policy

Why

This elevates the repository from a functional system to a maintainable, production-grade platform with:

  • Enforced code quality
  • Early detection of defects
  • Supply chain awareness
  • Clear security posture

Notes

  • Mypy is configured non-strict to avoid breaking existing code
  • Quality checks run in parallel via separate workflow
  • No changes to runtime behavior

Follow-ups

  • Gradually tighten mypy rules
  • Add branch protection rules
  • Consider SBOM generation (cyclonedx)

Summary by CodeRabbit

  • New Features

    • Added OpenTelemetry tracing and observability capabilities for monitoring workflow execution and API performance.
  • Chores

    • Configured automated dependency updates via Dependabot (weekly checks for pip, GitHub Actions, and Docker).
    • Added GitHub Actions workflows for code quality assurance (linting, type checking, and dependency auditing).
    • Added pre-commit hooks for automatic code formatting and linting.
    • Added project configuration for development tooling and testing.
  • Documentation

    • Added security vulnerability reporting policy.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 19, 2026

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro Plus

Run ID: 1a137e64-688d-4d68-a4f6-ffebaacee8cd

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

This pull request introduces OpenTelemetry-based distributed tracing infrastructure across the project. It adds CI/CD workflows for quality checks, SBOM generation, and dependency management; establishes observability configuration; integrates tracing into the API and worker applications; and instruments the workflow execution consumer with span tracking for execution lifecycle events.

Changes

Cohort / File(s) Summary
GitHub Actions & CI/CD
.github/dependabot.yml, .github/workflows/quality.yml, .github/workflows/sbom.yml
Adds automated dependency updates via Dependabot, quality checks (linting, type checking, dependency audits) on pull requests/main pushes, and SBOM generation with artifact uploads on main branch.
Project Tooling Configuration
.pre-commit-config.yaml, pyproject.toml
Configures pre-commit hooks for ruff linting/formatting and adds ruff, pytest, and mypy configurations in pyproject.toml with Python 3.11 targets and test discovery settings.
Security & Documentation
SECURITY.md
Establishes vulnerability reporting policy directing users to use GitHub security advisories and defining scope/response expectations.
Observability Module
api/src/observability/otel.py
New OpenTelemetry module providing tracing enablement gate, service configuration with resource setup, exporters (OTLP or console-based), and FastAPI instrumentation.
Workflow Execution Tracing
api/src/jobs/consumers/workflow_execution_traced.py
New consumer subclass wrapping workflow execution message processing, route execution, and success/failure persistence in OpenTelemetry spans with contextual attributes.
Application Integration
api/src/main.py, api/src/worker/main.py
Integrates tracing configuration and FastAPI instrumentation in the API; switches worker to traced consumer and wraps startup/shutdown lifecycle with span tracking.
Dependencies
requirements.txt
Adds OpenTelemetry packages: api, sdk, OTLP exporter, and FastAPI instrumentation.

Sequence Diagram

sequenceDiagram
    participant Client as Client/Request
    participant FastAPI as FastAPI App
    participant OTel as OpenTelemetry<br/>Tracer
    participant RabbitMQ as RabbitMQ<br/>Broker
    participant Consumer as Traced<br/>Execution Consumer
    participant Worker as Process<br/>Pool
    participant Exporter as Span<br/>Exporter

    Client->>FastAPI: HTTP Request
    FastAPI->>OTel: Start HTTP span
    FastAPI->>RabbitMQ: Publish execution message

    RabbitMQ->>Consumer: Message delivery
    Consumer->>OTel: Start workflow.execution span
    Consumer->>Consumer: Read execution metadata
    
    Consumer->>Worker: Route execution (traced)
    Worker->>OTel: Start workflow.execute.process_pool span
    Worker->>Worker: Process execution
    Worker->>OTel: End process_pool span
    
    Consumer->>OTel: _process_success/failure span
    Consumer->>OTel: Record duration & status
    Consumer->>OTel: End execution span
    
    OTel->>Exporter: Batch spans
    Exporter->>Exporter: Export traces (OTLP/Console)
Loading

Estimated Code Review Effort

🎯 4 (Complex) | ⏱️ ~50 minutes


🐰 Traces now dance through our code with care,
Each span a breadcrumb floating through the air,
From API calls to workers' busy hops,
OpenTelemetry never stops,
Observability springs to life so bright! ✨🔍

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 40.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and concisely summarizes the main changes: adding CI/coverage/type safety/security infrastructure as a baseline for the project.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch chore/python-professionalization-ci-coverage

Comment @coderabbitai help to get the list of available commands and usage tips.

Comment thread .github/workflows/quality.yml Fixed
Comment thread .github/workflows/quality.yml Fixed
Comment thread .github/workflows/quality.yml Fixed
Comment thread .github/workflows/quality.yml Fixed
Copy link
Copy Markdown
Owner Author

@coderabbitai review

Focus especially on:

  • OpenTelemetry instrumentation approach
  • consumer wrapper vs direct modification tradeoffs
  • any concurrency or async safety issues

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 19, 2026

@MTG-Thomas: Kicking off a full review with your specified focus areas:

  • OpenTelemetry instrumentation approach — evaluating how OTel is wired in, span lifecycle management, attribute conventions, and propagation correctness.
  • Consumer wrapper vs. direct modification tradeoffs — assessing the architectural decision of wrapping consumers rather than modifying them directly (encapsulation, maintainability, testability, and overhead).
  • Concurrency / async safety — looking for race conditions, improper shared-state access, missing await, unhandled coroutine lifecycles, and thread-safety concerns.

Focus on:

  1. OpenTelemetry instrumentation approach — span lifecycle, attribute conventions, context propagation, and SDK configuration correctness.
  2. Consumer wrapper vs. direct modification tradeoffs — encapsulation, maintainability, testability, and performance overhead implications.
  3. Concurrency and async safety — race conditions, shared state, missing awaits, unhandled coroutine lifecycles, and thread-safety concerns.
✅ Actions performed

Full review triggered.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 15

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In @.github/workflows/quality.yml:
- Around line 16-18: The lint job currently installs ruff each run using the
step that runs "pip install ruff" and "ruff check ."; update the workflow to add
a pip cache step before that install by using actions/cache to cache pip's cache
dir (e.g., ~/.cache/pip) with a key that includes runner.os and cache-version or
python-version, and restore-keys for fallback; apply the same caching pattern to
the typecheck and dependency-audit jobs so their pip installs reuse the cache
and speed up CI.

In @.github/workflows/sbom.yml:
- Around line 29-31: The GitHub Actions step named "Generate Python SBOM" is
using an invalid cyclonedx-py CLI flag (--format) which breaks on cyclonedx-py
v4+; update the command invoked in that step (the line starting with
"cyclonedx-py requirements -i requirements.txt -o sbom-python.json") to use the
supported flag --output-format instead of --format so the workflow executes
successfully.

In `@api/src/jobs/consumers/workflow_execution_traced.py`:
- Line 27: process_message is calling
self._redis_client.get_pending_execution(execution_id) multiple times (also
invoked in _process_success and _process_failure and by the parent), so cache
the pending execution result once and reuse it: retrieve pending = await
self._redis_client.get_pending_execution(execution_id) once in process_message
(or earlier in the parent), store it on the instance (e.g.,
self._pending_execution) or pass it as an argument down to _process_success and
_process_failure, and update callers to use that cached value instead of calling
get_pending_execution again; ensure any code paths that mutate or need a fresh
read explicitly refresh the cache by re-calling get_pending_execution via a
single helper method.
- Around line 50-72: The current monkey-patching of self._pool.route_execution
by replacing it with traced_route_execution inside process_message is unsafe for
concurrent message processing; instead remove the dynamic assignment and
propagate the tracing span via a thread-safe mechanism (e.g., set a ContextVar
like _current_execution_span before awaiting super().process_message or pass the
span through the context dict) and update the route_execution implementation to
read that ContextVar or context["trace_span"] rather than relying on
monkey-patching; alternatively, if you guarantee no concurrent processing per
consumer, add a clear comment on process_message documenting that
single-concurrency invariant.
- Around line 43-48: The parsed created_at (from
datetime.fromisoformat(created_at_raw)) may be naive and is being subtracted
from datetime.now(timezone.utc); update the block handling created_at to ensure
created_at is normalized to UTC before computing queue_latency_ms: after parsing
created_at_raw, if created_at.tzinfo is None set its timezone to timezone.utc
(or call created_at = created_at.replace(tzinfo=timezone.utc)), otherwise call
created_at = created_at.astimezone(timezone.utc), then compute now =
datetime.now(timezone.utc) and queue_latency_ms = max(0, int((now -
created_at).total_seconds() * 1000)) and keep
span.set_attribute("queue.latency_ms", queue_latency_ms); preserve the existing
ValueError except handling.

In `@api/src/observability/otel.py`:
- Around line 51-52: The get_tracer function lacks a return type annotation;
update its signature (get_tracer) to declare and return the correct tracer type
from OpenTelemetry (e.g., add -> trace.Tracer) so it reads as returning
trace.Tracer and keep the body returning trace.get_tracer(name); this improves
type safety and IDE support.
- Line 2: The import Optional from typing in otel.py is unused; remove Optional
from the import statement (i.e., change or delete the line "from typing import
Optional") so the file no longer imports an unused symbol and linter warnings
are resolved; ensure any other typing imports remain intact if present.
- Around line 22-26: The Resource.create call currently hardcodes
"service.version": "2.0.0" which can drift; replace the literal with a dynamic
version lookup (e.g., read from package metadata via
importlib.metadata.version(<package-name>) or import a central VERSION constant)
and use that value in the Resource.create dictionary for the "service.version"
key, with a safe fallback like "unknown" if the lookup fails; update the
Resource.create invocation (and any imports at top of the module) so
service.version reflects the real package version instead of the fixed string.
- Around line 32-35: The variable exporter is inferred as OTLPSpanExporter when
assigned conditionally which causes a mypy type error when assigning
ConsoleSpanExporter; update the exporter declaration to use the common base type
SpanExporter (or Annotate as Union[OTLPSpanExporter, ConsoleSpanExporter]) so
both assignments are type-compatible, and ensure you import SpanExporter from
opentelemetry.sdk.trace.export if not already present; change usages that rely
on a more specific type to use the base-class API.
- Around line 37-41: The BatchSpanProcessor instance (processor) is never shut
down, risking lost buffered spans on process exit; add a graceful shutdown that
calls processor.shutdown() (and provider.shutdown() if applicable) and removes
the service from _initialized_services—either by exporting a shutdown function
(e.g., shutdown_otel or stop_tracing) that callers can invoke, or by registering
an atexit handler / signal handler in this module that calls
processor.shutdown() and provider.shutdown() for the given service_name to
ensure buffered spans are flushed before exit.

In `@api/src/worker/main.py`:
- Line 35: The tracer is created at module import with get_tracer(__name__)
before configure_tracing() is called in Worker.start(), which relies on OTel's
proxy behavior; either defer tracer acquisition by moving the get_tracer call
into configure_tracing() or Worker.start(), or keep it at module level but add a
concise comment near get_tracer(__name__) explaining that OpenTelemetry returns
a proxy and spans will use the provider set by configure_tracing(); update
references to tracer accordingly (e.g., trace.get_tracer/__name__ and
configure_tracing()/Worker.start()) so maintainers understand the order.
- Around line 117-119: Mypy can't infer the lambda parameter type for the signal
handler; replace the inline lambda with a typed factory function (e.g. define
make_handler(s: int) -> Callable[[], None] that returns a zero-arg callable
which calls worker.handle_signal(s, None)) and then call
loop.add_signal_handler(sig, make_handler(sig)) for each sig so the handler
parameter s is explicitly typed and mypy can infer types.
- Around line 62-65: The except block calls a missing helper
_cleanup_after_failed_start which raises AttributeError at runtime; replace that
call with the appropriate teardown (either reintroduce and implement async def
_cleanup_after_failed_start(self): ... to perform the original partial-start
cleanup, or remove the call and invoke the existing stop logic by awaiting
self.stop() or inlining the necessary cleanup steps) so the except path performs
a proper async cleanup without referencing the nonexistent
_cleanup_after_failed_start method.

In `@pyproject.toml`:
- Around line 33-37: The pytest ini option currently sets coverage to
"--cov=api" which will include test files under the api tree; update the addopts
entry under [tool.pytest.ini_options] to target only your source package/module
instead of the whole api directory (replace "--cov=api" with
"--cov=<your_source_package>" or "--cov=api.<module_name>" or point to the
specific source subdirectory), or alternatively add an explicit coverage config
to omit tests; make the change in the addopts value so tests under testpaths are
excluded from coverage calculations.

In `@SECURITY.md`:
- Around line 7-8: Replace the vague fallback "Or email the maintainer directly
if available" in the SECURITY.md security contact section by providing a
concrete security contact (e.g., security@example.com) or a clear pointer to
where maintainer contact info lives (e.g., "See CONTRIBUTORS.md or repository
settings for maintainer emails" or a link to a dedicated security page); update
the text near the "Open a GitHub security advisory (preferred)" line so it
explicitly lists the security email address or a URL to the contact location for
security reports.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro Plus

Run ID: eae2cc39-2205-408d-8449-6d0f5669f185

📥 Commits

Reviewing files that changed from the base of the PR and between 298c8fb and d7863a1.

📒 Files selected for processing (12)
  • .github/dependabot.yml
  • .github/workflows/quality.yml
  • .github/workflows/sbom.yml
  • .pre-commit-config.yaml
  • SECURITY.md
  • api/src/jobs/consumers/workflow_execution_traced.py
  • api/src/main.py
  • api/src/observability/__init__.py
  • api/src/observability/otel.py
  • api/src/worker/main.py
  • pyproject.toml
  • requirements.txt

Comment on lines +16 to +18
- run: |
pip install ruff
ruff check .
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider caching pip dependencies.

Adding pip caching reduces CI time significantly for repeated runs.

⚡ Proposed improvement for lint job
       - uses: actions/setup-python@v5
         with:
           python-version: "3.11"
+          cache: 'pip'
       - run: |
           pip install ruff
           ruff check .

Apply similar caching to typecheck and dependency-audit jobs.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In @.github/workflows/quality.yml around lines 16 - 18, The lint job currently
installs ruff each run using the step that runs "pip install ruff" and "ruff
check ."; update the workflow to add a pip cache step before that install by
using actions/cache to cache pip's cache dir (e.g., ~/.cache/pip) with a key
that includes runner.os and cache-version or python-version, and restore-keys
for fallback; apply the same caching pattern to the typecheck and
dependency-audit jobs so their pip installs reuse the cache and speed up CI.

Comment thread .github/workflows/sbom.yml Outdated
Comment on lines +29 to +31
- name: Generate Python SBOM
run: |
cyclonedx-py requirements -i requirements.txt -o sbom-python.json --format json
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🌐 Web query:

cyclonedx-py requirements command syntax output format

💡 Result:

The cyclonedx-py tool (CycloneDX SBOM generator for Python projects) uses a subcommand requirements to build an SBOM from Pip requirements files (e.g., requirements.txt). This requires pinned versions (use pip freeze > requirements.txt first). Command syntax (latest v7.3.0): cyclonedx-py requirements [REQUIREMENTS_FILE] - REQUIREMENTS_FILE: Path to requirements file (default: requirements.txt). Use - for STDIN. Key options (mimic pip CLI): - -i INDEX_URL, --index-url INDEX_URL: Base PyPI URL (default: https://pypi.org/simple). - --extra-index-url EXTRA_URLS: Extra index URLs. - --pyproject PYPROJECT_FILE: Path to pyproject.toml. - --mc-type {application,firmware,library}: Metadata component type (default: application). - -o OUTPUT_FILE, --output OUTPUT_FILE: Output file (- for STDOUT, default: -). - --sv SCHEMA_VERSION, --schema-version SCHEMA_VERSION: CycloneDX schema version. - --of OUTPUT_FORMAT, --output-format {json,xml}: Output format (default: xml). - --short-PURLs: Omit qualifiers from PackageURLs. - --output-reproducible: Make output reproducible. - --validate / --no-validate: Validate BOM (default: True). - -v, --verbose: Increase verbosity. Example usage: cyclonedx-py requirements requirements-prod.txt -o sbom.json --output-format json cat requirements/*.txt | cyclonedx-py requirements - -o sbom.xml Note: Pre-v4 versions used flat flags like cyclonedx-py -r [-i FILE] [--format {json,xml}] [--schema-version {1.4,1.3,...}] -o FILE. Use subcommands in current versions. Requirements parser supports nested -r includes and warns/excludes unpinned versions.

Citations:


Fix cyclonedx-py CLI syntax causing pipeline failure.

The pipeline fails because --format is not a recognized argument in cyclonedx-py v4+. Use --output-format instead.

🐛 Proposed fix
       - name: Generate Python SBOM
         run: |
-          cyclonedx-py requirements -i requirements.txt -o sbom-python.json --format json
+          cyclonedx-py requirements -i requirements.txt -o sbom-python.json --output-format json
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
- name: Generate Python SBOM
run: |
cyclonedx-py requirements -i requirements.txt -o sbom-python.json --format json
- name: Generate Python SBOM
run: |
cyclonedx-py requirements -i requirements.txt -o sbom-python.json --output-format json
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In @.github/workflows/sbom.yml around lines 29 - 31, The GitHub Actions step
named "Generate Python SBOM" is using an invalid cyclonedx-py CLI flag
(--format) which breaks on cyclonedx-py v4+; update the command invoked in that
step (the line starting with "cyclonedx-py requirements -i requirements.txt -o
sbom-python.json") to use the supported flag --output-format instead of --format
so the workflow executes successfully.

is_sync = bool(message_data.get("sync", False))
is_script = bool(message_data.get("code"))

pending = await self._redis_client.get_pending_execution(execution_id)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Redundant Redis calls for pending execution data.

get_pending_execution(execution_id) is called at:

  • Line 27 in process_message
  • Line 75 in _process_success
  • Line 89 in _process_failure

The parent's process_message (see context snippet line 430) also calls it. Consider caching the result as an instance variable or passing it through the call chain to avoid multiple Redis round-trips per message.

Also applies to: 75-75, 89-89

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/src/jobs/consumers/workflow_execution_traced.py` at line 27,
process_message is calling
self._redis_client.get_pending_execution(execution_id) multiple times (also
invoked in _process_success and _process_failure and by the parent), so cache
the pending execution result once and reuse it: retrieve pending = await
self._redis_client.get_pending_execution(execution_id) once in process_message
(or earlier in the parent), store it on the instance (e.g.,
self._pending_execution) or pass it as an argument down to _process_success and
_process_failure, and update callers to use that cached value instead of calling
get_pending_execution again; ensure any code paths that mutate or need a fresh
read explicitly refresh the cache by re-calling get_pending_execution via a
single helper method.

Comment on lines +43 to +48
created_at = datetime.fromisoformat(created_at_raw)
now = datetime.now(timezone.utc)
queue_latency_ms = max(0, int((now - created_at).total_seconds() * 1000))
span.set_attribute("queue.latency_ms", queue_latency_ms)
except ValueError:
pass
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Queue latency calculation assumes UTC but doesn't enforce it.

If created_at from Redis lacks timezone info (naive datetime), subtracting it from datetime.now(timezone.utc) may produce incorrect results. Consider normalizing the parsed datetime to UTC.

🔧 Proposed defensive fix
                     try:
                         created_at = datetime.fromisoformat(created_at_raw)
+                        if created_at.tzinfo is None:
+                            created_at = created_at.replace(tzinfo=timezone.utc)
                         now = datetime.now(timezone.utc)
                         queue_latency_ms = max(0, int((now - created_at).total_seconds() * 1000))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
created_at = datetime.fromisoformat(created_at_raw)
now = datetime.now(timezone.utc)
queue_latency_ms = max(0, int((now - created_at).total_seconds() * 1000))
span.set_attribute("queue.latency_ms", queue_latency_ms)
except ValueError:
pass
created_at = datetime.fromisoformat(created_at_raw)
if created_at.tzinfo is None:
created_at = created_at.replace(tzinfo=timezone.utc)
now = datetime.now(timezone.utc)
queue_latency_ms = max(0, int((now - created_at).total_seconds() * 1000))
span.set_attribute("queue.latency_ms", queue_latency_ms)
except ValueError:
pass
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/src/jobs/consumers/workflow_execution_traced.py` around lines 43 - 48,
The parsed created_at (from datetime.fromisoformat(created_at_raw)) may be naive
and is being subtracted from datetime.now(timezone.utc); update the block
handling created_at to ensure created_at is normalized to UTC before computing
queue_latency_ms: after parsing created_at_raw, if created_at.tzinfo is None set
its timezone to timezone.utc (or call created_at =
created_at.replace(tzinfo=timezone.utc)), otherwise call created_at =
created_at.astimezone(timezone.utc), then compute now =
datetime.now(timezone.utc) and queue_latency_ms = max(0, int((now -
created_at).total_seconds() * 1000)) and keep
span.set_attribute("queue.latency_ms", queue_latency_ms); preserve the existing
ValueError except handling.

Comment on lines +50 to +72
original_route_execution: Callable[..., Awaitable[None]] = self._pool.route_execution

async def traced_route_execution(*args: Any, **kwargs: Any) -> None:
routed_execution_id = kwargs.get("execution_id") or (args[0] if args else execution_id)
routed_workflow_id = None
context = kwargs.get("context") or (args[1] if len(args) > 1 else None)
if isinstance(context, dict):
routed_workflow_id = context.get("workflow_id")
with tracer.start_as_current_span("workflow.execute.process_pool") as pool_span:
pool_span.set_attribute("execution.id", routed_execution_id)
if routed_workflow_id:
pool_span.set_attribute("workflow.id", routed_workflow_id)
await original_route_execution(*args, **kwargs)

self._pool.route_execution = traced_route_execution
try:
await super().process_message(message_data)
except Exception as exc:
span.set_attribute("error", True)
span.set_attribute("execution.error_type", type(exc).__name__)
raise
finally:
self._pool.route_execution = original_route_execution
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Monkey-patching route_execution is not safe for concurrent message processing.

If multiple messages are processed concurrently (overlapping async tasks on the same consumer instance), one task's finally block could restore original_route_execution while another task's traced wrapper is still in use, causing the second task's spans to be lost or the first task to use an unexpected function.

Consider a thread-safe approach such as passing a tracing context through the call chain or using context variables.

💡 Alternative: Use contextvars for thread-safe tracing context
from contextvars import ContextVar

_current_execution_span: ContextVar[Any] = ContextVar("current_execution_span", default=None)

# In process_message, set the context var instead of monkey-patching:
# _current_execution_span.set(span)
# Then in route_execution (if you control it), read from context var

Alternatively, if concurrent processing is not expected on a single consumer instance, add a comment documenting this assumption.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/src/jobs/consumers/workflow_execution_traced.py` around lines 50 - 72,
The current monkey-patching of self._pool.route_execution by replacing it with
traced_route_execution inside process_message is unsafe for concurrent message
processing; instead remove the dynamic assignment and propagate the tracing span
via a thread-safe mechanism (e.g., set a ContextVar like _current_execution_span
before awaiting super().process_message or pass the span through the context
dict) and update the route_execution implementation to read that ContextVar or
context["trace_span"] rather than relying on monkey-patching; alternatively, if
you guarantee no concurrent processing per consumer, add a clear comment on
process_message documenting that single-concurrency invariant.

Comment thread api/src/worker/main.py
logging.getLogger("src.jobs.consumers.workflow_execution").setLevel(logging.DEBUG)

logger = logging.getLogger(__name__)
tracer = get_tracer(__name__)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Tracer initialization order appears problematic but works due to OTel's proxy pattern.

get_tracer(__name__) at line 35 executes at module import time, before configure_tracing() is called at line 52 during Worker.start(). OpenTelemetry's trace.get_tracer() returns a proxy that delegates to the globally registered TracerProvider at span-creation time, so spans created after configure_tracing() will use the configured provider.

However, this relies on implicit behavior and could confuse maintainers. Consider adding a clarifying comment or deferring tracer acquisition.

💡 Optional: Add clarifying comment
 logger = logging.getLogger(__name__)
+# Note: get_tracer() returns a proxy; spans will use the provider configured
+# later via configure_tracing() in Worker.start()
 tracer = get_tracer(__name__)

Also applies to: 51-52

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/src/worker/main.py` at line 35, The tracer is created at module import
with get_tracer(__name__) before configure_tracing() is called in
Worker.start(), which relies on OTel's proxy behavior; either defer tracer
acquisition by moving the get_tracer call into configure_tracing() or
Worker.start(), or keep it at module level but add a concise comment near
get_tracer(__name__) explaining that OpenTelemetry returns a proxy and spans
will use the provider set by configure_tracing(); update references to tracer
accordingly (e.g., trace.get_tracer/__name__ and
configure_tracing()/Worker.start()) so maintainers understand the order.

Comment thread api/src/worker/main.py
Comment on lines +62 to +65
except Exception:
logger.error("Startup failed; tearing down partially-started worker")
await self._cleanup_after_failed_start()
raise
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Missing method _cleanup_after_failed_start causes runtime AttributeError.

The pipeline failure confirms this: "Worker" has no attribute "_cleanup_after_failed_start". The AI summary indicates this helper was removed, but the call remains. This will crash the worker if startup fails.

Either restore the cleanup method or replace it with appropriate cleanup logic (e.g., call self.stop() or inline the necessary teardown).

🐛 Proposed fix
             except Exception:
                 logger.error("Startup failed; tearing down partially-started worker")
-                await self._cleanup_after_failed_start()
+                await self.stop()
                 raise
🧰 Tools
🪛 GitHub Actions: Quality

[error] 64-64: mypy error: "Worker" has no attribute "_cleanup_after_failed_start". (attr-defined) await self._cleanup_after_failed_start()

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/src/worker/main.py` around lines 62 - 65, The except block calls a
missing helper _cleanup_after_failed_start which raises AttributeError at
runtime; replace that call with the appropriate teardown (either reintroduce and
implement async def _cleanup_after_failed_start(self): ... to perform the
original partial-start cleanup, or remove the call and invoke the existing stop
logic by awaiting self.stop() or inlining the necessary cleanup steps) so the
except path performs a proper async cleanup without referencing the nonexistent
_cleanup_after_failed_start method.

Comment thread api/src/worker/main.py
Comment on lines 117 to 119
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, lambda s=sig: worker.handle_signal(s, None))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add type annotation to fix mypy lambda inference error.

The pipeline reports: Cannot infer type of lambda. Provide an explicit type for the lambda parameter.

🔧 Proposed fix
     for sig in (signal.SIGINT, signal.SIGTERM):
-        loop.add_signal_handler(sig, lambda s=sig: worker.handle_signal(s, None))
+        loop.add_signal_handler(sig, lambda s: int = sig: worker.handle_signal(s, None))

Alternatively, extract to a named function:

def make_handler(s: int) -> Callable[[], None]:
    return lambda: worker.handle_signal(s, None)

for sig in (signal.SIGINT, signal.SIGTERM):
    loop.add_signal_handler(sig, make_handler(sig))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, lambda s=sig: worker.handle_signal(s, None))
def make_handler(s: int) -> Callable[[], None]:
return lambda: worker.handle_signal(s, None)
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, make_handler(sig))
🧰 Tools
🪛 GitHub Actions: Quality

[error] 119-119: mypy error: Cannot infer type of lambda. (misc) loop.add_signal_handler(sig, lambda s=sig: ...)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/src/worker/main.py` around lines 117 - 119, Mypy can't infer the lambda
parameter type for the signal handler; replace the inline lambda with a typed
factory function (e.g. define make_handler(s: int) -> Callable[[], None] that
returns a zero-arg callable which calls worker.handle_signal(s, None)) and then
call loop.add_signal_handler(sig, make_handler(sig)) for each sig so the handler
parameter s is explicitly typed and mypy can infer types.

Comment thread pyproject.toml
Comment on lines +33 to +37
[tool.pytest.ini_options]
addopts = "--cov=api --cov-report=term-missing --cov-report=xml --cov-report=html"
testpaths = ["api/tests"]
pythonpath = ["api"]
asyncio_mode = "auto"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Coverage target may be too broad.

--cov=api includes test code in coverage metrics. Consider targeting only source code:

♻️ Proposed fix
 [tool.pytest.ini_options]
-addopts = "--cov=api --cov-report=term-missing --cov-report=xml --cov-report=html"
+addopts = "--cov=api/src --cov-report=term-missing --cov-report=xml --cov-report=html"
 testpaths = ["api/tests"]
 pythonpath = ["api"]
 asyncio_mode = "auto"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
[tool.pytest.ini_options]
addopts = "--cov=api --cov-report=term-missing --cov-report=xml --cov-report=html"
testpaths = ["api/tests"]
pythonpath = ["api"]
asyncio_mode = "auto"
[tool.pytest.ini_options]
addopts = "--cov=api/src --cov-report=term-missing --cov-report=xml --cov-report=html"
testpaths = ["api/tests"]
pythonpath = ["api"]
asyncio_mode = "auto"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pyproject.toml` around lines 33 - 37, The pytest ini option currently sets
coverage to "--cov=api" which will include test files under the api tree; update
the addopts entry under [tool.pytest.ini_options] to target only your source
package/module instead of the whole api directory (replace "--cov=api" with
"--cov=<your_source_package>" or "--cov=api.<module_name>" or point to the
specific source subdirectory), or alternatively add an explicit coverage config
to omit tests; make the change in the addopts value so tests under testpaths are
excluded from coverage calculations.

Comment thread SECURITY.md
Comment on lines +7 to +8
- Open a GitHub security advisory (preferred)
- Or email the maintainer directly if available
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider providing a concrete contact email.

The fallback option "email the maintainer directly if available" lacks actionable information. Consider adding a security-specific email address (e.g., security@example.com) or referencing where to find maintainer contact info.

📝 Suggested improvement
 - Open a GitHub security advisory (preferred)
-- Or email the maintainer directly if available
+- Or email security@yourproject.example (if GitHub advisories are unavailable)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
- Open a GitHub security advisory (preferred)
- Or email the maintainer directly if available
- Open a GitHub security advisory (preferred)
- Or email security@yourproject.example (if GitHub advisories are unavailable)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@SECURITY.md` around lines 7 - 8, Replace the vague fallback "Or email the
maintainer directly if available" in the SECURITY.md security contact section by
providing a concrete security contact (e.g., security@example.com) or a clear
pointer to where maintainer contact info lives (e.g., "See CONTRIBUTORS.md or
repository settings for maintainer emails" or a link to a dedicated security
page); update the text near the "Open a GitHub security advisory (preferred)"
line so it explicitly lists the security email address or a URL to the contact
location for security reports.

@sonarqubecloud
Copy link
Copy Markdown

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants