diff --git a/README.md b/README.md index 1ec1b5d..0d7138d 100644 --- a/README.md +++ b/README.md @@ -2206,3 +2206,100 @@ scrape_configs: - Bump version: update `pyproject.toml` and add a new entry in `CHANGELOG.md` - Tag release: `git tag vX.Y.Z` - Use the repo's default branch name (e.g., `main`); do not assume a specific remote. + +--- + +## Production telemetry + +This deployment exposes public, aggregate metrics at `/api/stats`. The +endpoint is consumed by the Production Telemetry panel on +https://eleventh.dev. The schema is documented at +https://github.com/IgnazioDS/IgnazioDS/blob/main/TELEMETRY_SCHEMA.md. + +NexusRAG runs in **Tier A** (`mode: "live"`) — counters reflect real +end-to-end RAG queries written to the `query_log` table by the FastAPI +middleware (`nexusrag/apps/api/main.py:request_context_middleware`). +Persistence on the existing pgvector Postgres means the counters survive +Vercel cold starts. + +### Sample response + +```bash +$ curl -i https://nexusrag-lyart.vercel.app/api/stats +HTTP/1.1 200 OK +Content-Type: application/json +Cache-Control: public, max-age=30, stale-while-revalidate=60 +Access-Control-Allow-Origin: * + +{ + "system": "nexusrag", + "mode": "live", + "status": "operational", + "uptime_pct_30d": 100.0, + "last_deployed_at": "2026-04-28T01:30:00Z", + "last_active_at": "2026-04-28T02:00:00Z", + "metrics": { + "queries_total": 1234, + "queries_24h": 42, + "queries_7d": 300, + "p50_latency_ms": 480, + "p95_latency_ms": 910, + "avg_retrieval_size": 6, + "indexed_chunks": 12034 + }, + "schema_version": 1, + "generated_at": "2026-04-28T02:11:42Z" +} +``` + +### How the counters are populated + +The FastAPI middleware fires for every request. When the request path +matches a query prefix (`/v1/run`, `/run`), the middleware schedules a +fire-and-forget task that writes one row to `query_log` with +`(query_id, started_at, completed_at, latency_ms, retrieved_chunks, +status)`. Telemetry failures never propagate to the request path; if +the DB is unreachable, the insert is logged and dropped. + +Routes can populate `request.state.retrieved_chunks` to surface the +actual chunk count for the row; otherwise the field is recorded as 0. + +### Aggregation + +`/api/stats` runs a small set of windowed queries on `query_log`, +indexed on `completed_at DESC`: + +- `queries_total` — `COUNT(*) FROM query_log` +- `queries_24h` / `queries_7d` — windowed `COUNT(*)` on `completed_at` +- `p50_latency_ms` / `p95_latency_ms` — `percentile_cont(0.50 / 0.95) + WITHIN GROUP (ORDER BY latency_ms)` over the 24h window +- `avg_retrieval_size` — `AVG(retrieved_chunks)` over the 24h window +- `indexed_chunks` — `COUNT(*) FROM chunks` (the pgvector table) +- `last_active_at` — `MAX(completed_at) WHERE status = 'ok'` + +All counters are clamped by `SAFETY_CAPS` in +`nexusrag/persistence/repos/query_log.py` to prevent runaway exposure. + +### Privacy + +`query_log` stores **only** `id`, `query_id`, `started_at`, +`completed_at`, `latency_ms`, `retrieved_chunks`, and `status`. It does +not store prompt text, model output, tenant identifiers, or user +identifiers. The `/api/stats` endpoint never returns row-level data — +only counts and percentiles. + +### Failure handling + +`/api/stats` never returns HTTP 5xx. If the aggregator raises (DB +unreachable, schema drift, etc.), the response status flips to +`"degraded"` and metrics zero out, while the JSON envelope stays +contract-compliant. Internal error messages never appear in the +response body. + +### Migration + +```bash +alembic upgrade head +``` + +Adds the `query_log` table (revision `0031_query_log`). diff --git a/nexusrag/agent/graph.py b/nexusrag/agent/graph.py index a9d232f..4e7d156 100644 --- a/nexusrag/agent/graph.py +++ b/nexusrag/agent/graph.py @@ -4,8 +4,6 @@ import time from typing import Callable -from langgraph.graph import END, StateGraph - from nexusrag.agent.prompts import build_messages from nexusrag.domain.state import AgentState from nexusrag.persistence.repos.messages import list_messages @@ -23,6 +21,11 @@ def build_graph( max_output_tokens: int | None = None, token_estimator_ratio: float | None = None, ): + # Lazy import so module load on Vercel cold-start does not require the + # full langgraph dependency tree (~30 MB). Heavy deps live in the + # `agent` optional install group. + from langgraph.graph import END, StateGraph + graph = StateGraph(AgentState) async def load_history(state: AgentState) -> dict: diff --git a/nexusrag/apps/api/main.py b/nexusrag/apps/api/main.py index 5b39afc..eccc483 100644 --- a/nexusrag/apps/api/main.py +++ b/nexusrag/apps/api/main.py @@ -1,10 +1,12 @@ from __future__ import annotations +import asyncio from datetime import datetime, timedelta, timezone from email.utils import format_datetime import json +import logging import time -from uuid import uuid4 +from uuid import UUID, uuid4 from fastapi import FastAPI, HTTPException, Request from fastapi.exceptions import RequestValidationError @@ -40,6 +42,7 @@ from nexusrag.apps.api.routes.self_serve import router as self_serve_router from nexusrag.apps.api.routes.sla_admin import router as sla_admin_router from nexusrag.apps.api.routes.sso import router as sso_router +from nexusrag.apps.api.routes.stats import router as stats_router from nexusrag.apps.api.routes.ui import router as ui_router from nexusrag.core.logging import configure_logging from nexusrag.apps.api.errors import ( @@ -52,6 +55,8 @@ from nexusrag.apps.api.response import API_VERSION, is_versioned_request from nexusrag.apps.api.rate_limit import route_class_for_request from nexusrag.services.telemetry import record_request +from nexusrag.persistence.db import SessionLocal +from nexusrag.persistence.repos.query_log import record_query from nexusrag.persistence.guards import TenantPredicateError @@ -61,6 +66,7 @@ "/docs", "/openapi.json", "/redoc", + "/api/stats", ) _ENVELOPE_EXEMPT_PREFIXES = ( "/v1/openapi.json", @@ -68,6 +74,36 @@ "/v1/redoc", ) +# Path prefixes whose requests count as "queries" for the public /api/stats +# aggregator. Each request to one of these paths produces one query_log row. +_QUERY_PATH_PREFIXES = ("/v1/run", "/run") + +_telemetry_log = logging.getLogger("nexusrag.telemetry.query_log") + + +async def _record_query_async( + *, + query_id: UUID, + started_at: datetime, + completed_at: datetime, + retrieved_chunks: int, + status: str, +) -> None: + # Fire-and-forget DB write. Telemetry must never break the request path, + # so we open our own session and swallow any failure. + try: + async with SessionLocal() as session: + await record_query( + session, + query_id=query_id, + started_at=started_at, + completed_at=completed_at, + retrieved_chunks=retrieved_chunks, + status=status, + ) + except Exception as exc: # noqa: BLE001 - telemetry failure must not propagate + _telemetry_log.warning("query_log insert failed", exc_info=exc) + def create_app() -> FastAPI: configure_logging() @@ -79,7 +115,9 @@ async def request_context_middleware(request: Request, call_next): # type: igno request_id = request.headers.get("X-Request-Id") or str(uuid4()) request.state.request_id = request_id start = time.monotonic() + started_at_dt = datetime.now(timezone.utc) response = await call_next(request) + completed_at_dt = datetime.now(timezone.utc) latency_ms = (time.monotonic() - start) * 1000.0 route_class, _cost = route_class_for_request(request) record_request( @@ -88,6 +126,25 @@ async def request_context_middleware(request: Request, call_next): # type: igno status_code=response.status_code, latency_ms=latency_ms, ) + # Persist a query_log row for every request that hits a query path, + # so the public /api/stats aggregator returns real counters that + # survive cold starts. Routes can populate request.state.retrieved_chunks + # to surface the actual chunk count; otherwise we record 0. + if request.url.path.startswith(_QUERY_PATH_PREFIXES): + chunks_attr = getattr(request.state, "retrieved_chunks", 0) + try: + retrieved_chunks = int(chunks_attr) + except (TypeError, ValueError): + retrieved_chunks = 0 + asyncio.create_task( + _record_query_async( + query_id=uuid4(), + started_at=started_at_dt, + completed_at=completed_at_dt, + retrieved_chunks=retrieved_chunks, + status="ok" if response.status_code < 400 else "error", + ) + ) # Wrap versioned JSON responses in the standardized success envelope. if ( is_versioned_request(request) @@ -155,6 +212,12 @@ async def _http_exception_handler(request: Request, exc: HTTPException): async def _tenant_predicate_exception_handler(request: Request, exc: TenantPredicateError): return await tenant_predicate_exception_handler(request, exc) + # Public, unauthenticated /api/stats endpoint for the Production + # Telemetry panel on https://eleventh.dev. Mounted before the versioned + # v1 routes so the path is canonical and not subject to /v1 routing + # rules. See docs/TELEMETRY_SCHEMA reference. + app.include_router(stats_router) + # Mount versioned v1 API routes. app.include_router(audio_router, prefix=f"/{API_VERSION}") # Expose admin endpoints for tenant quota management. diff --git a/nexusrag/apps/api/routes/stats.py b/nexusrag/apps/api/routes/stats.py new file mode 100644 index 0000000..f90d795 --- /dev/null +++ b/nexusrag/apps/api/routes/stats.py @@ -0,0 +1,124 @@ +"""Public, unauthenticated /api/stats endpoint. + +Implements the Tier-A telemetry contract from +https://github.com/IgnazioDS/IgnazioDS/blob/main/TELEMETRY_SCHEMA.md. + +The endpoint is consumed by the Production Telemetry panel on +https://eleventh.dev. Polling cadence is ~30s. Contract guarantees: + +- HTTP 200 in all branches, even when the database is unreachable +- Privacy: aggregate counts only, no row-level fields ever leave this route +- CORS: wildcard origin (response is non-PII aggregates) +- Cache: public, max-age=30, stale-while-revalidate=60 +""" +from __future__ import annotations + +import logging +import os +from datetime import datetime, timezone +from typing import Any + +from fastapi import APIRouter, Depends, Response +from sqlalchemy.ext.asyncio import AsyncSession + +from nexusrag.apps.api.deps import get_db +from nexusrag.persistence.repos.query_log import ( + aggregate, + to_metrics_dict, + zero_metrics, +) + + +_log = logging.getLogger(__name__) +router = APIRouter() + + +SCHEMA_VERSION = 1 +SYSTEM_SLUG = "nexusrag" + +# Captured at module import (cold start). Subsequent warm invocations +# return the same value, which is a reasonable proxy for "this lambda +# instance was deployed at this time". A new deploy spawns new lambdas +# with a new value, so the field freshens on every push to main. +_DEPLOYED_AT = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +def _now_iso() -> str: + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +def _vercel_deploy_time() -> str | None: + # Prefer the commit author date Vercel injects; fall back to module-load + # capture (set when the lambda cold-started). + raw = os.environ.get("VERCEL_GIT_COMMIT_AUTHOR_DATE") + if not raw: + return _DEPLOYED_AT + if raw.isdigit(): + # Vercel sometimes exposes this as unix-seconds. + try: + return ( + datetime.fromtimestamp(int(raw), tz=timezone.utc) + .strftime("%Y-%m-%dT%H:%M:%SZ") + ) + except (ValueError, OSError): + return _DEPLOYED_AT + return raw + + +def _set_public_headers(response: Response) -> None: + response.headers["Cache-Control"] = "public, max-age=30, stale-while-revalidate=60" + response.headers["Access-Control-Allow-Origin"] = "*" + response.headers["Access-Control-Allow-Methods"] = "GET, OPTIONS" + response.headers["Access-Control-Allow-Headers"] = "Content-Type" + + +@router.options("/api/stats", include_in_schema=False) +async def stats_options(response: Response) -> Response: + # CORS preflight: 204 with the same wildcard headers. + _set_public_headers(response) + response.status_code = 204 + return response + + +@router.get("/api/stats") +async def stats( + response: Response, + session: AsyncSession = Depends(get_db), +) -> dict[str, Any]: + _set_public_headers(response) + last_deployed_at = _vercel_deploy_time() + + try: + agg = await aggregate(session) + metrics = to_metrics_dict(agg) + last_active_at = ( + agg.last_active_at.astimezone(timezone.utc) + .strftime("%Y-%m-%dT%H:%M:%SZ") + if agg.last_active_at is not None + else None + ) + status_value = "operational" + except Exception as exc: # noqa: BLE001 - contract forbids 5xx + # Internal error message must NEVER appear in the response body + # (it would leak detail to an unauthenticated public endpoint). + # Log internally and degrade gracefully. + _log.warning("stats aggregator failed", exc_info=exc) + metrics = zero_metrics() + last_active_at = None + status_value = "degraded" + + return { + "system": SYSTEM_SLUG, + "mode": "live", + "status": status_value, + # Vercel deploys are READY whenever the function responds; we treat + # 30-day uptime as 100% by definition. The public schema documents + # this approximation; replace with a self-pinger when the system + # moves to a real long-lived runtime. + "uptime_pct_30d": 100.0, + "last_deployed_at": last_deployed_at, + "last_active_at": last_active_at, + "metrics": metrics, + "schema_version": SCHEMA_VERSION, + "generated_at": _now_iso(), + } diff --git a/nexusrag/domain/models.py b/nexusrag/domain/models.py index 15f03cc..9db6f64 100644 --- a/nexusrag/domain/models.py +++ b/nexusrag/domain/models.py @@ -1501,3 +1501,21 @@ class Chunk(Base): Index("ix_dsar_requests_created_at", DsarRequest.created_at.desc()) Index("ix_policy_rules_rule_priority", PolicyRule.rule_key, PolicyRule.priority.desc()) Index("ix_governance_retention_runs_tenant_started", GovernanceRetentionRun.tenant_id, GovernanceRetentionRun.started_at.desc()) + + +class QueryLog(Base): + # Persisted record of each end-to-end RAG query. The public /api/stats + # aggregator reads from this table on every request — counters survive + # cold starts because they are derived here, not held in memory. + __tablename__ = "query_log" + + id: Mapped[UUID] = mapped_column(primary_key=True, default=uuid4) + query_id: Mapped[UUID] = mapped_column(nullable=False) + started_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + completed_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + latency_ms: Mapped[int] = mapped_column(Integer, nullable=False) + retrieved_chunks: Mapped[int] = mapped_column(Integer, nullable=False, server_default="0") + status: Mapped[str] = mapped_column(String(32), nullable=False) + + +Index("ix_query_log_completed_at_desc", QueryLog.completed_at.desc()) diff --git a/nexusrag/persistence/alembic/versions/0031_query_log.py b/nexusrag/persistence/alembic/versions/0031_query_log.py new file mode 100644 index 0000000..dde3ac5 --- /dev/null +++ b/nexusrag/persistence/alembic/versions/0031_query_log.py @@ -0,0 +1,53 @@ +"""add query_log table for public /api/stats aggregation + +Revision ID: 0031_query_log +Revises: 0030_notify_delivery_guarantees +Create Date: 2026-04-28 +""" + +from __future__ import annotations + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + + +revision = "0031_query_log" +down_revision = "0030_notify_delivery_guarantees" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # Persistent record of each end-to-end RAG query for the public /api/stats + # aggregator. Counters survive cold starts because they are derived from + # this table on read rather than from in-memory state. + op.create_table( + "query_log", + sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True), + sa.Column("query_id", postgresql.UUID(as_uuid=True), nullable=False), + sa.Column("started_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("completed_at", sa.DateTime(timezone=True), nullable=False), + # Pre-computed end-to-end latency so the percentile query stays a single + # PERCENTILE_CONT call without extract() arithmetic on every row. + sa.Column("latency_ms", sa.Integer(), nullable=False), + sa.Column("retrieved_chunks", sa.Integer(), nullable=False, server_default="0"), + # Constrained vocabulary keeps the aggregator's WHERE clauses cheap. + sa.Column("status", sa.String(length=32), nullable=False), + sa.CheckConstraint( + "status IN ('ok', 'error', 'cancelled')", + name="ck_query_log_status", + ), + ) + # Descending index on completed_at supports the rolling-window queries + # (queries_24h, queries_7d, p50/p95 over last 24h) without a sequential scan. + op.create_index( + "ix_query_log_completed_at_desc", + "query_log", + [sa.text("completed_at DESC")], + ) + + +def downgrade() -> None: + op.drop_index("ix_query_log_completed_at_desc", table_name="query_log") + op.drop_table("query_log") diff --git a/nexusrag/persistence/repos/query_log.py b/nexusrag/persistence/repos/query_log.py new file mode 100644 index 0000000..44c3ada --- /dev/null +++ b/nexusrag/persistence/repos/query_log.py @@ -0,0 +1,185 @@ +"""Read/write helpers for the query_log table. + +Insertion happens fire-and-forget from the FastAPI middleware so the +response path never blocks on the telemetry write. Reads happen on every +/api/stats request and use windowed aggregations indexed on completed_at. + +Privacy invariant: query_log NEVER stores prompt text, model output, +tenant identifiers, or user identifiers. Only the aggregate-friendly +fields (id, started_at, completed_at, latency_ms, retrieved_chunks, +status) are persisted. The /api/stats endpoint never returns row-level +data — only counts and percentiles. +""" +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from typing import Any +from uuid import UUID + +from sqlalchemy import func, select, text +from sqlalchemy.ext.asyncio import AsyncSession + +from nexusrag.domain.models import Chunk, QueryLog + + +@dataclass(frozen=True) +class QueryAggregates: + queries_total: int + queries_24h: int + queries_7d: int + p50_latency_ms: int | None + p95_latency_ms: int | None + avg_retrieval_size: int + indexed_chunks: int + last_active_at: datetime | None + + +# --- safety caps per the public schema --- +SAFETY_CAPS: dict[str, int] = { + "queries_total": 10_000_000, + "queries_24h": 1_000_000, + "queries_7d": 7_000_000, + "p50_latency_ms": 600_000, # 10 min sanity cap + "p95_latency_ms": 600_000, + "avg_retrieval_size": 10_000, + "indexed_chunks": 100_000_000, +} + + +def _cap(name: str, value: int) -> int: + cap = SAFETY_CAPS.get(name) + return min(value, cap) if cap is not None else value + + +async def record_query( + session: AsyncSession, + *, + query_id: UUID, + started_at: datetime, + completed_at: datetime, + retrieved_chunks: int, + status: str, +) -> None: + """Insert one query_log row. Latency is computed at insert time.""" + delta = completed_at - started_at + latency_ms = int(delta.total_seconds() * 1000) + if latency_ms < 0: + # Clock skew on a request straddling worker boundaries; clamp to zero. + latency_ms = 0 + row = QueryLog( + query_id=query_id, + started_at=started_at, + completed_at=completed_at, + latency_ms=latency_ms, + retrieved_chunks=max(0, retrieved_chunks), + status=status if status in {"ok", "error", "cancelled"} else "error", + ) + session.add(row) + await session.commit() + + +async def aggregate(session: AsyncSession) -> QueryAggregates: + """Compute the public Tier-A aggregates in a small set of queries. + + Each scalar query is cheap given the descending index on completed_at. + We deliberately avoid a single mega-CTE so a transient failure in any + one rollup doesn't blow up the whole response. + """ + now = datetime.now(timezone.utc) + cutoff_24h = now - timedelta(hours=24) + cutoff_7d = now - timedelta(days=7) + + # Total count, all-time. + queries_total_result = await session.execute(select(func.count()).select_from(QueryLog)) + queries_total = int(queries_total_result.scalar() or 0) + + # 24h window. + queries_24h_result = await session.execute( + select(func.count()) + .select_from(QueryLog) + .where(QueryLog.completed_at >= cutoff_24h) + ) + queries_24h = int(queries_24h_result.scalar() or 0) + + # 7d window. + queries_7d_result = await session.execute( + select(func.count()) + .select_from(QueryLog) + .where(QueryLog.completed_at >= cutoff_7d) + ) + queries_7d = int(queries_7d_result.scalar() or 0) + + # p50 + p95 latency over the 24h window. Use percentile_cont via raw SQL + # because SQLAlchemy's Core lacks first-class percentile expressions. + pct_result = await session.execute( + text( + """ + SELECT + percentile_cont(0.50) WITHIN GROUP (ORDER BY latency_ms) AS p50, + percentile_cont(0.95) WITHIN GROUP (ORDER BY latency_ms) AS p95 + FROM query_log + WHERE completed_at >= :cutoff + """ + ), + {"cutoff": cutoff_24h}, + ) + pct_row = pct_result.first() + p50_latency_ms = int(pct_row.p50) if pct_row and pct_row.p50 is not None else None + p95_latency_ms = int(pct_row.p95) if pct_row and pct_row.p95 is not None else None + + # Average retrieval size, 24h window. Round to int per the public schema. + avg_result = await session.execute( + select(func.avg(QueryLog.retrieved_chunks)) + .where(QueryLog.completed_at >= cutoff_24h) + ) + avg_raw = avg_result.scalar() + avg_retrieval_size = int(round(float(avg_raw))) if avg_raw is not None else 0 + + # Vector index size. Read on the same path so a healthy /api/stats + # response confirms both the telemetry table and the vector store. + chunks_result = await session.execute(select(func.count()).select_from(Chunk)) + indexed_chunks = int(chunks_result.scalar() or 0) + + # Most recent successful query (powers Tier-A `last_active_at`). + last_active_result = await session.execute( + select(func.max(QueryLog.completed_at)).where(QueryLog.status == "ok") + ) + last_active_at = last_active_result.scalar() + + return QueryAggregates( + queries_total=_cap("queries_total", queries_total), + queries_24h=_cap("queries_24h", queries_24h), + queries_7d=_cap("queries_7d", queries_7d), + p50_latency_ms=_cap("p50_latency_ms", p50_latency_ms) if p50_latency_ms is not None else None, + p95_latency_ms=_cap("p95_latency_ms", p95_latency_ms) if p95_latency_ms is not None else None, + avg_retrieval_size=_cap("avg_retrieval_size", avg_retrieval_size), + indexed_chunks=_cap("indexed_chunks", indexed_chunks), + last_active_at=last_active_at, + ) + + +def to_metrics_dict(agg: QueryAggregates) -> dict[str, Any]: + """Render the aggregates into the public Tier-A `metrics` payload.""" + return { + "queries_total": agg.queries_total, + "queries_24h": agg.queries_24h, + "queries_7d": agg.queries_7d, + "p50_latency_ms": agg.p50_latency_ms if agg.p50_latency_ms is not None else 0, + "p95_latency_ms": agg.p95_latency_ms if agg.p95_latency_ms is not None else 0, + "avg_retrieval_size": agg.avg_retrieval_size, + "indexed_chunks": agg.indexed_chunks, + } + + +def zero_metrics() -> dict[str, Any]: + """Fallback used when the database is unreachable. Contract stays valid.""" + return { + "queries_total": 0, + "queries_24h": 0, + "queries_7d": 0, + "p50_latency_ms": 0, + "p95_latency_ms": 0, + "avg_retrieval_size": 0, + "indexed_chunks": 0, + } diff --git a/nexusrag/providers/retrieval/bedrock_kb.py b/nexusrag/providers/retrieval/bedrock_kb.py index a50efa4..5906e60 100644 --- a/nexusrag/providers/retrieval/bedrock_kb.py +++ b/nexusrag/providers/retrieval/bedrock_kb.py @@ -4,7 +4,18 @@ import time from typing import Any -from botocore.exceptions import BotoCoreError, ClientError +# botocore is an optional dependency (installed via the `aws` extra). The +# Vercel API-skeleton deploy ships without it; on that runtime the AWS code +# paths are dormant and the exception types are unreachable. Stub them so +# module load succeeds and the isinstance() guards downstream stay valid. +try: + from botocore.exceptions import BotoCoreError, ClientError +except ImportError: # pragma: no cover + class BotoCoreError(Exception): # type: ignore[no-redef] + pass + + class ClientError(Exception): # type: ignore[no-redef] + pass from nexusrag.core.errors import AwsAuthError, AwsConfigMissingError, AwsRetrievalError from nexusrag.services.audit import record_system_event diff --git a/nexusrag/providers/retrieval/vertex_ai.py b/nexusrag/providers/retrieval/vertex_ai.py index d326f4a..70ef544 100644 --- a/nexusrag/providers/retrieval/vertex_ai.py +++ b/nexusrag/providers/retrieval/vertex_ai.py @@ -4,7 +4,18 @@ import time from typing import Any -from google.api_core.exceptions import GoogleAPICallError, RetryError +# google-api-core is an optional dependency (installed via the `vertex` +# extra). On the Vercel API-skeleton deploy it isn't present; stub the +# exception types so module load succeeds and the isinstance() guard +# below stays valid (and never matches because no Google call is made there). +try: + from google.api_core.exceptions import GoogleAPICallError, RetryError +except ImportError: # pragma: no cover + class GoogleAPICallError(Exception): # type: ignore[no-redef] + pass + + class RetryError(Exception): # type: ignore[no-redef] + pass from nexusrag.core.errors import ( VertexRetrievalAuthError, diff --git a/nexusrag/tests/unit/test_stats_route.py b/nexusrag/tests/unit/test_stats_route.py new file mode 100644 index 0000000..76a5967 --- /dev/null +++ b/nexusrag/tests/unit/test_stats_route.py @@ -0,0 +1,161 @@ +"""Unit test for the public /api/stats Tier-A route. + +Covers the must-hold guarantees from +https://github.com/IgnazioDS/IgnazioDS/blob/main/TELEMETRY_SCHEMA.md: + +- HTTP 200 in all branches (never 5xx) +- All required Tier-A fields present +- schema_version == 1 +- Response is parseable JSON +- Status flips to "degraded" when the aggregator raises, but the contract + envelope stays valid and metrics zero out (no internal error leaks) +- CORS + cache headers are set +""" +from __future__ import annotations + +from datetime import datetime, timezone +from unittest.mock import AsyncMock, patch + +import pytest +from fastapi.testclient import TestClient + +from nexusrag.apps.api.main import create_app +from nexusrag.persistence.repos.query_log import QueryAggregates + + +_REQUIRED_TOP_FIELDS = { + "system", + "mode", + "status", + "uptime_pct_30d", + "last_deployed_at", + "last_active_at", + "metrics", + "schema_version", + "generated_at", +} +_REQUIRED_METRIC_FIELDS = { + "queries_total", + "queries_24h", + "queries_7d", + "p50_latency_ms", + "p95_latency_ms", + "avg_retrieval_size", + "indexed_chunks", +} + + +def _aggregates_with_data() -> QueryAggregates: + return QueryAggregates( + queries_total=1234, + queries_24h=42, + queries_7d=300, + p50_latency_ms=480, + p95_latency_ms=910, + avg_retrieval_size=6, + indexed_chunks=12_034, + last_active_at=datetime(2026, 4, 27, 18, 0, tzinfo=timezone.utc), + ) + + +def _client_with_mocked_db(): + app = create_app() + # Override get_db so the route can run without a real Postgres connection. + from nexusrag.apps.api import deps as deps_module + + async def _fake_db(): + yield AsyncMock() + + app.dependency_overrides[deps_module.get_db] = _fake_db + return TestClient(app) + + +def test_happy_path_matches_tier_a_contract() -> None: + client = _client_with_mocked_db() + with patch( + "nexusrag.apps.api.routes.stats.aggregate", + new=AsyncMock(return_value=_aggregates_with_data()), + ): + response = client.get("/api/stats") + + assert response.status_code == 200 + payload = response.json() + assert _REQUIRED_TOP_FIELDS.issubset(payload.keys()) + assert _REQUIRED_METRIC_FIELDS.issubset(payload["metrics"].keys()) + assert payload["schema_version"] == 1 + assert payload["mode"] == "live" + assert payload["status"] == "operational" + assert payload["system"] == "nexusrag" + assert payload["metrics"]["queries_total"] == 1234 + assert payload["metrics"]["queries_24h"] == 42 + assert payload["last_active_at"] == "2026-04-27T18:00:00Z" + + +def test_degraded_when_aggregator_fails() -> None: + client = _client_with_mocked_db() + with patch( + "nexusrag.apps.api.routes.stats.aggregate", + new=AsyncMock(side_effect=RuntimeError("postgres unreachable")), + ): + response = client.get("/api/stats") + + # Contract: HTTP 200 even when the underlying DB is unreachable. + assert response.status_code == 200 + payload = response.json() + # Envelope still valid. + assert _REQUIRED_TOP_FIELDS.issubset(payload.keys()) + assert payload["schema_version"] == 1 + assert payload["status"] == "degraded" + # Metrics zeroed; no internal error string leaked. + body_text = response.text + assert "postgres unreachable" not in body_text + assert payload["metrics"]["queries_total"] == 0 + assert payload["metrics"]["queries_24h"] == 0 + assert payload["last_active_at"] is None + + +def test_response_headers_match_contract() -> None: + client = _client_with_mocked_db() + with patch( + "nexusrag.apps.api.routes.stats.aggregate", + new=AsyncMock(return_value=_aggregates_with_data()), + ): + response = client.get("/api/stats") + + assert response.headers["cache-control"].startswith("public, max-age=30") + assert response.headers["access-control-allow-origin"] == "*" + assert "GET" in response.headers["access-control-allow-methods"] + assert response.headers["content-type"].startswith("application/json") + + +def test_options_preflight_returns_204_with_cors_headers() -> None: + client = _client_with_mocked_db() + response = client.options("/api/stats") + assert response.status_code == 204 + assert response.headers["access-control-allow-origin"] == "*" + assert "GET" in response.headers["access-control-allow-methods"] + + +@pytest.mark.parametrize( + "field,expected_type", + [ + ("system", str), + ("mode", str), + ("status", str), + ("uptime_pct_30d", float), + ("metrics", dict), + ("schema_version", int), + ("generated_at", str), + ], +) +def test_field_types(field: str, expected_type: type) -> None: + client = _client_with_mocked_db() + with patch( + "nexusrag.apps.api.routes.stats.aggregate", + new=AsyncMock(return_value=_aggregates_with_data()), + ): + response = client.get("/api/stats") + payload = response.json() + assert isinstance(payload[field], expected_type), ( + f"{field} expected {expected_type.__name__}, got {type(payload[field]).__name__}" + ) diff --git a/pyproject.toml b/pyproject.toml index f163f75..db209c8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,10 +16,6 @@ dependencies = [ "asyncpg>=0.29", "alembic>=1.13", "pgvector>=0.2.5", - "langgraph>=0.0.59", - "google-cloud-aiplatform>=1.50.0", - "google-cloud-discoveryengine>=0.11.0", - "boto3>=1.34.0", "python-dotenv>=1.0", "orjson>=3.9", "arq>=0.26.0", @@ -31,6 +27,19 @@ dependencies = [ [project.optional-dependencies] +agent = [ + "langgraph>=0.0.59", +] + +vertex = [ + "google-cloud-aiplatform>=1.50.0", + "google-cloud-discoveryengine>=0.11.0", +] + +aws = [ + "boto3>=1.34.0", +] + test = [ "pytest>=8.0", "pytest-asyncio>=0.23", diff --git a/requirements.txt b/requirements.txt index 180a478..e83b9a9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,13 +7,10 @@ sqlalchemy>=2.0 asyncpg>=0.29 alembic>=1.13 pgvector>=0.2.5 -langgraph>=0.0.59 -google-cloud-aiplatform>=1.50.0 -google-cloud-discoveryengine>=0.11.0 -boto3>=1.34.0 python-dotenv>=1.0 orjson>=3.9 arq>=0.26.0 cryptography>=42.0 httpx>=0.27 PyJWT>=2.8 +prometheus-client>=0.20