Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2206,3 +2206,100 @@ scrape_configs:
- Bump version: update `pyproject.toml` and add a new entry in `CHANGELOG.md`
- Tag release: `git tag vX.Y.Z`
- Use the repo's default branch name (e.g., `main`); do not assume a specific remote.

---

## Production telemetry

This deployment exposes public, aggregate metrics at `/api/stats`. The
endpoint is consumed by the Production Telemetry panel on
https://eleventh.dev. The schema is documented at
https://github.com/IgnazioDS/IgnazioDS/blob/main/TELEMETRY_SCHEMA.md.

NexusRAG runs in **Tier A** (`mode: "live"`) — counters reflect real
end-to-end RAG queries written to the `query_log` table by the FastAPI
middleware (`nexusrag/apps/api/main.py:request_context_middleware`).
Persistence on the existing pgvector Postgres means the counters survive
Vercel cold starts.

### Sample response

```bash
$ curl -i https://nexusrag-lyart.vercel.app/api/stats
HTTP/1.1 200 OK
Content-Type: application/json
Cache-Control: public, max-age=30, stale-while-revalidate=60
Access-Control-Allow-Origin: *

{
"system": "nexusrag",
"mode": "live",
"status": "operational",
"uptime_pct_30d": 100.0,
"last_deployed_at": "2026-04-28T01:30:00Z",
"last_active_at": "2026-04-28T02:00:00Z",
"metrics": {
"queries_total": 1234,
"queries_24h": 42,
"queries_7d": 300,
"p50_latency_ms": 480,
"p95_latency_ms": 910,
"avg_retrieval_size": 6,
"indexed_chunks": 12034
},
"schema_version": 1,
"generated_at": "2026-04-28T02:11:42Z"
}
```

### How the counters are populated

The FastAPI middleware fires for every request. When the request path
matches a query prefix (`/v1/run`, `/run`), the middleware schedules a
fire-and-forget task that writes one row to `query_log` with
`(query_id, started_at, completed_at, latency_ms, retrieved_chunks,
status)`. Telemetry failures never propagate to the request path; if
the DB is unreachable, the insert is logged and dropped.

Routes can populate `request.state.retrieved_chunks` to surface the
actual chunk count for the row; otherwise the field is recorded as 0.

### Aggregation

`/api/stats` runs a small set of windowed queries on `query_log`,
indexed on `completed_at DESC`:

- `queries_total` — `COUNT(*) FROM query_log`
- `queries_24h` / `queries_7d` — windowed `COUNT(*)` on `completed_at`
- `p50_latency_ms` / `p95_latency_ms` — `percentile_cont(0.50 / 0.95)
WITHIN GROUP (ORDER BY latency_ms)` over the 24h window
- `avg_retrieval_size` — `AVG(retrieved_chunks)` over the 24h window
- `indexed_chunks` — `COUNT(*) FROM chunks` (the pgvector table)
- `last_active_at` — `MAX(completed_at) WHERE status = 'ok'`

All counters are clamped by `SAFETY_CAPS` in
`nexusrag/persistence/repos/query_log.py` to prevent runaway exposure.

### Privacy

`query_log` stores **only** `id`, `query_id`, `started_at`,
`completed_at`, `latency_ms`, `retrieved_chunks`, and `status`. It does
not store prompt text, model output, tenant identifiers, or user
identifiers. The `/api/stats` endpoint never returns row-level data —
only counts and percentiles.

### Failure handling

`/api/stats` never returns HTTP 5xx. If the aggregator raises (DB
unreachable, schema drift, etc.), the response status flips to
`"degraded"` and metrics zero out, while the JSON envelope stays
contract-compliant. Internal error messages never appear in the
response body.

### Migration

```bash
alembic upgrade head
```

Adds the `query_log` table (revision `0031_query_log`).
7 changes: 5 additions & 2 deletions nexusrag/agent/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import time
from typing import Callable

from langgraph.graph import END, StateGraph

from nexusrag.agent.prompts import build_messages
from nexusrag.domain.state import AgentState
from nexusrag.persistence.repos.messages import list_messages
Expand All @@ -23,6 +21,11 @@ def build_graph(
max_output_tokens: int | None = None,
token_estimator_ratio: float | None = None,
):
# Lazy import so module load on Vercel cold-start does not require the
# full langgraph dependency tree (~30 MB). Heavy deps live in the
# `agent` optional install group.
from langgraph.graph import END, StateGraph

graph = StateGraph(AgentState)

async def load_history(state: AgentState) -> dict:
Expand Down
65 changes: 64 additions & 1 deletion nexusrag/apps/api/main.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from __future__ import annotations

import asyncio
from datetime import datetime, timedelta, timezone
from email.utils import format_datetime
import json
import logging
import time
from uuid import uuid4
from uuid import UUID, uuid4

from fastapi import FastAPI, HTTPException, Request
from fastapi.exceptions import RequestValidationError
Expand Down Expand Up @@ -40,6 +42,7 @@
from nexusrag.apps.api.routes.self_serve import router as self_serve_router
from nexusrag.apps.api.routes.sla_admin import router as sla_admin_router
from nexusrag.apps.api.routes.sso import router as sso_router
from nexusrag.apps.api.routes.stats import router as stats_router
from nexusrag.apps.api.routes.ui import router as ui_router
from nexusrag.core.logging import configure_logging
from nexusrag.apps.api.errors import (
Expand All @@ -52,6 +55,8 @@
from nexusrag.apps.api.response import API_VERSION, is_versioned_request
from nexusrag.apps.api.rate_limit import route_class_for_request
from nexusrag.services.telemetry import record_request
from nexusrag.persistence.db import SessionLocal
from nexusrag.persistence.repos.query_log import record_query
from nexusrag.persistence.guards import TenantPredicateError


Expand All @@ -61,13 +66,44 @@
"/docs",
"/openapi.json",
"/redoc",
"/api/stats",
)
_ENVELOPE_EXEMPT_PREFIXES = (
"/v1/openapi.json",
"/v1/docs",
"/v1/redoc",
)

# Path prefixes whose requests count as "queries" for the public /api/stats
# aggregator. Each request to one of these paths produces one query_log row.
_QUERY_PATH_PREFIXES = ("/v1/run", "/run")

_telemetry_log = logging.getLogger("nexusrag.telemetry.query_log")


async def _record_query_async(
*,
query_id: UUID,
started_at: datetime,
completed_at: datetime,
retrieved_chunks: int,
status: str,
) -> None:
# Fire-and-forget DB write. Telemetry must never break the request path,
# so we open our own session and swallow any failure.
try:
async with SessionLocal() as session:
await record_query(
session,
query_id=query_id,
started_at=started_at,
completed_at=completed_at,
retrieved_chunks=retrieved_chunks,
status=status,
)
except Exception as exc: # noqa: BLE001 - telemetry failure must not propagate
_telemetry_log.warning("query_log insert failed", exc_info=exc)


def create_app() -> FastAPI:
configure_logging()
Expand All @@ -79,7 +115,9 @@ async def request_context_middleware(request: Request, call_next): # type: igno
request_id = request.headers.get("X-Request-Id") or str(uuid4())
request.state.request_id = request_id
start = time.monotonic()
started_at_dt = datetime.now(timezone.utc)
response = await call_next(request)
completed_at_dt = datetime.now(timezone.utc)
latency_ms = (time.monotonic() - start) * 1000.0
Comment on lines 119 to 121
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Measure query completion after SSE stream ends

The query log timestamps are captured immediately after call_next, but /run returns a StreamingResponse whose generation continues after this point, so completed_at and latency_ms are recorded before the actual query finishes. This makes the published p50/p95 latency metrics systematically too low for real streamed runs and breaks the intended end-to-end timing semantics.

Useful? React with 👍 / 👎.

route_class, _cost = route_class_for_request(request)
record_request(
Expand All @@ -88,6 +126,25 @@ async def request_context_middleware(request: Request, call_next): # type: igno
status_code=response.status_code,
latency_ms=latency_ms,
)
# Persist a query_log row for every request that hits a query path,
# so the public /api/stats aggregator returns real counters that
# survive cold starts. Routes can populate request.state.retrieved_chunks
# to surface the actual chunk count; otherwise we record 0.
if request.url.path.startswith(_QUERY_PATH_PREFIXES):
chunks_attr = getattr(request.state, "retrieved_chunks", 0)
try:
retrieved_chunks = int(chunks_attr)
except (TypeError, ValueError):
retrieved_chunks = 0
Comment on lines +134 to +138
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Persist actual retrieved chunk counts

The middleware falls back to 0 unless request.state.retrieved_chunks is set, but there is no assignment to that request-state field in the request handlers, so every inserted row records retrieved_chunks=0. As a result, /api/stats will report avg_retrieval_size as zero regardless of real retrieval behavior.

Useful? React with 👍 / 👎.

asyncio.create_task(
_record_query_async(
query_id=uuid4(),
started_at=started_at_dt,
completed_at=completed_at_dt,
retrieved_chunks=retrieved_chunks,
status="ok" if response.status_code < 400 else "error",
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Derive query status from run outcome, not HTTP code

For /run, many failures are encoded as SSE error events while the HTTP status remains 200, so this status mapping marks failed runs as ok. That skews last_active_at (which filters on status='ok') and any future success/error reporting from query_log. The status should be set from the stream’s logical outcome, not only response.status_code.

Useful? React with 👍 / 👎.

)
)
# Wrap versioned JSON responses in the standardized success envelope.
if (
is_versioned_request(request)
Expand Down Expand Up @@ -155,6 +212,12 @@ async def _http_exception_handler(request: Request, exc: HTTPException):
async def _tenant_predicate_exception_handler(request: Request, exc: TenantPredicateError):
return await tenant_predicate_exception_handler(request, exc)

# Public, unauthenticated /api/stats endpoint for the Production
# Telemetry panel on https://eleventh.dev. Mounted before the versioned
# v1 routes so the path is canonical and not subject to /v1 routing
# rules. See docs/TELEMETRY_SCHEMA reference.
app.include_router(stats_router)

# Mount versioned v1 API routes.
app.include_router(audio_router, prefix=f"/{API_VERSION}")
# Expose admin endpoints for tenant quota management.
Expand Down
124 changes: 124 additions & 0 deletions nexusrag/apps/api/routes/stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
"""Public, unauthenticated /api/stats endpoint.

Implements the Tier-A telemetry contract from
https://github.com/IgnazioDS/IgnazioDS/blob/main/TELEMETRY_SCHEMA.md.

The endpoint is consumed by the Production Telemetry panel on
https://eleventh.dev. Polling cadence is ~30s. Contract guarantees:

- HTTP 200 in all branches, even when the database is unreachable
- Privacy: aggregate counts only, no row-level fields ever leave this route
- CORS: wildcard origin (response is non-PII aggregates)
- Cache: public, max-age=30, stale-while-revalidate=60
"""
from __future__ import annotations

import logging
import os
from datetime import datetime, timezone
from typing import Any

from fastapi import APIRouter, Depends, Response
from sqlalchemy.ext.asyncio import AsyncSession

from nexusrag.apps.api.deps import get_db
from nexusrag.persistence.repos.query_log import (
aggregate,
to_metrics_dict,
zero_metrics,
)


_log = logging.getLogger(__name__)
router = APIRouter()


SCHEMA_VERSION = 1
SYSTEM_SLUG = "nexusrag"

# Captured at module import (cold start). Subsequent warm invocations
# return the same value, which is a reasonable proxy for "this lambda
# instance was deployed at this time". A new deploy spawns new lambdas
# with a new value, so the field freshens on every push to main.
_DEPLOYED_AT = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")


def _now_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")


def _vercel_deploy_time() -> str | None:
# Prefer the commit author date Vercel injects; fall back to module-load
# capture (set when the lambda cold-started).
raw = os.environ.get("VERCEL_GIT_COMMIT_AUTHOR_DATE")
if not raw:
return _DEPLOYED_AT
if raw.isdigit():
# Vercel sometimes exposes this as unix-seconds.
try:
return (
datetime.fromtimestamp(int(raw), tz=timezone.utc)
.strftime("%Y-%m-%dT%H:%M:%SZ")
)
except (ValueError, OSError):
return _DEPLOYED_AT
return raw


def _set_public_headers(response: Response) -> None:
response.headers["Cache-Control"] = "public, max-age=30, stale-while-revalidate=60"
response.headers["Access-Control-Allow-Origin"] = "*"
response.headers["Access-Control-Allow-Methods"] = "GET, OPTIONS"
response.headers["Access-Control-Allow-Headers"] = "Content-Type"


@router.options("/api/stats", include_in_schema=False)
async def stats_options(response: Response) -> Response:
# CORS preflight: 204 with the same wildcard headers.
_set_public_headers(response)
response.status_code = 204
return response


@router.get("/api/stats")
async def stats(
response: Response,
session: AsyncSession = Depends(get_db),
) -> dict[str, Any]:
_set_public_headers(response)
last_deployed_at = _vercel_deploy_time()

try:
agg = await aggregate(session)
metrics = to_metrics_dict(agg)
last_active_at = (
agg.last_active_at.astimezone(timezone.utc)
.strftime("%Y-%m-%dT%H:%M:%SZ")
if agg.last_active_at is not None
else None
)
status_value = "operational"
except Exception as exc: # noqa: BLE001 - contract forbids 5xx
# Internal error message must NEVER appear in the response body
# (it would leak detail to an unauthenticated public endpoint).
# Log internally and degrade gracefully.
_log.warning("stats aggregator failed", exc_info=exc)
metrics = zero_metrics()
last_active_at = None
status_value = "degraded"

return {
"system": SYSTEM_SLUG,
"mode": "live",
"status": status_value,
# Vercel deploys are READY whenever the function responds; we treat
# 30-day uptime as 100% by definition. The public schema documents
# this approximation; replace with a self-pinger when the system
# moves to a real long-lived runtime.
"uptime_pct_30d": 100.0,
"last_deployed_at": last_deployed_at,
"last_active_at": last_active_at,
"metrics": metrics,
"schema_version": SCHEMA_VERSION,
"generated_at": _now_iso(),
}
18 changes: 18 additions & 0 deletions nexusrag/domain/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1501,3 +1501,21 @@ class Chunk(Base):
Index("ix_dsar_requests_created_at", DsarRequest.created_at.desc())
Index("ix_policy_rules_rule_priority", PolicyRule.rule_key, PolicyRule.priority.desc())
Index("ix_governance_retention_runs_tenant_started", GovernanceRetentionRun.tenant_id, GovernanceRetentionRun.started_at.desc())


class QueryLog(Base):
# Persisted record of each end-to-end RAG query. The public /api/stats
# aggregator reads from this table on every request — counters survive
# cold starts because they are derived here, not held in memory.
__tablename__ = "query_log"

id: Mapped[UUID] = mapped_column(primary_key=True, default=uuid4)
query_id: Mapped[UUID] = mapped_column(nullable=False)
started_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
completed_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
latency_ms: Mapped[int] = mapped_column(Integer, nullable=False)
retrieved_chunks: Mapped[int] = mapped_column(Integer, nullable=False, server_default="0")
status: Mapped[str] = mapped_column(String(32), nullable=False)


Index("ix_query_log_completed_at_desc", QueryLog.completed_at.desc())
Loading
Loading