diff --git a/openviking/server/app.py b/openviking/server/app.py index c22794e1..518f90d7 100644 --- a/openviking/server/app.py +++ b/openviking/server/app.py @@ -26,6 +26,7 @@ resources_router, search_router, sessions_router, + stats_router, system_router, tasks_router, ) @@ -173,6 +174,7 @@ async def general_error_handler(request: Request, exc: Exception): app.include_router(search_router) app.include_router(relations_router) app.include_router(sessions_router) + app.include_router(stats_router) app.include_router(pack_router) app.include_router(debug_router) app.include_router(observer_router) diff --git a/openviking/server/routers/__init__.py b/openviking/server/routers/__init__.py index 12aa9f34..437c5fb8 100644 --- a/openviking/server/routers/__init__.py +++ b/openviking/server/routers/__init__.py @@ -13,6 +13,7 @@ from openviking.server.routers.resources import router as resources_router from openviking.server.routers.search import router as search_router from openviking.server.routers.sessions import router as sessions_router +from openviking.server.routers.stats import router as stats_router from openviking.server.routers.system import router as system_router from openviking.server.routers.tasks import router as tasks_router @@ -26,6 +27,7 @@ "search_router", "relations_router", "sessions_router", + "stats_router", "pack_router", "debug_router", "observer_router", diff --git a/openviking/server/routers/stats.py b/openviking/server/routers/stats.py new file mode 100644 index 00000000..33f79197 --- /dev/null +++ b/openviking/server/routers/stats.py @@ -0,0 +1,80 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 +"""Memory health statistics endpoints for OpenViking HTTP Server.""" + +import logging +from typing import Optional + +from fastapi import APIRouter, Depends, Path, Query + +from openviking.server.auth import get_request_context +from openviking.server.dependencies import get_service +from openviking.server.identity import RequestContext +from openviking.server.models import ErrorInfo, Response +from openviking.storage.stats_aggregator import MEMORY_CATEGORIES, StatsAggregator + +router = APIRouter(prefix="/api/v1/stats", tags=["stats"]) +logger = logging.getLogger(__name__) + + +def _get_aggregator() -> StatsAggregator: + """Build a StatsAggregator from the current service.""" + service = get_service() + return StatsAggregator(service.vikingdb_manager) + + +@router.get("/memories") +async def get_memory_stats( + category: Optional[str] = Query( + None, + description="Filter by memory category (e.g. cases, patterns, tools)", + ), + _ctx: RequestContext = Depends(get_request_context), +): + """Get aggregate memory health statistics. + + Returns counts by category, hotness distribution, and staleness metrics. + Optionally filter by a single category. + """ + if category and category not in MEMORY_CATEGORIES: + return Response( + status="error", + error=ErrorInfo( + code="INVALID_ARGUMENT", + message=f"Unknown category: {category}. Valid categories: {', '.join(MEMORY_CATEGORIES)}", + ), + ) + + aggregator = _get_aggregator() + result = await aggregator.get_memory_stats(_ctx, category=category) + return Response(status="ok", result=result) + + +@router.get("/sessions/{session_id}") +async def get_session_stats( + session_id: str = Path(..., description="Session ID"), + _ctx: RequestContext = Depends(get_request_context), +): + """Get extraction statistics for a specific session.""" + service = get_service() + aggregator = _get_aggregator() + try: + result = await aggregator.get_session_extraction_stats(session_id, service, _ctx) + return Response(status="ok", result=result) + except KeyError: + return Response( + status="error", + error=ErrorInfo( + code="NOT_FOUND", + message=f"Session not found: {session_id}", + ), + ) + except Exception as e: + logger.error("Failed to get session stats for %s: %s", session_id, e) + return Response( + status="error", + error=ErrorInfo( + code="INTERNAL_ERROR", + message=f"Failed to retrieve session stats: {type(e).__name__}", + ), + ) diff --git a/openviking/storage/stats_aggregator.py b/openviking/storage/stats_aggregator.py new file mode 100644 index 00000000..76bf6240 --- /dev/null +++ b/openviking/storage/stats_aggregator.py @@ -0,0 +1,198 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 +"""Memory health statistics aggregator. + +Queries VikingDB indexes and the hotness_score function to produce +aggregate memory health metrics without introducing new storage. +""" + +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional + +from openviking.retrieve.memory_lifecycle import hotness_score +from openviking.server.identity import RequestContext +from openviking.storage.expr import Eq +from openviking_cli.utils import get_logger + +logger = get_logger(__name__) + +# Memory categories from MemoryCategory enum +MEMORY_CATEGORIES = [ + "profile", + "preferences", + "entities", + "events", + "cases", + "patterns", + "tools", + "skills", +] + +# Hotness buckets +COLD_THRESHOLD = 0.2 +HOT_THRESHOLD = 0.6 + + +class StatsAggregator: + """Aggregates memory health statistics from VikingDB. + + Reads from existing indexes and the hotness_score function. + No new storage required. + """ + + def __init__(self, vikingdb_manager) -> None: + self._vikingdb = vikingdb_manager + + async def get_memory_stats( + self, + ctx: RequestContext, + category: Optional[str] = None, + ) -> Dict[str, Any]: + """Get aggregate memory statistics. + + Args: + ctx: Request context for tenant scoping. + category: Optional category filter (e.g. "cases"). + + Returns: + Dictionary with total counts, category breakdown, + hotness distribution, and staleness metrics. + """ + now = datetime.now(timezone.utc) + + # Build category list to query + categories = [category] if category else MEMORY_CATEGORIES + + by_category: Dict[str, int] = {} + hotness_dist = {"cold": 0, "warm": 0, "hot": 0} + staleness = { + "not_accessed_7d": 0, + "not_accessed_30d": 0, + "oldest_memory_age_days": 0, + } + + # Fetch all memories once and group by category in Python + all_records = await self._query_all_memories(ctx) + grouped: Dict[str, List[Dict[str, Any]]] = {cat: [] for cat in categories} + for record in all_records: + uri = record.get("uri", "") + for cat in categories: + if f"/{cat}/" in uri: + grouped[cat].append(record) + break + + for cat in categories: + records = grouped[cat] + by_category[cat] = len(records) + + for record in records: + active_count = record.get("active_count", 0) + updated_at_raw = record.get("updated_at") + updated_at = _parse_datetime(updated_at_raw) + created_at_raw = record.get("created_at") + created_at = _parse_datetime(created_at_raw) + + # Hotness distribution + score = hotness_score(active_count, updated_at, now=now) + if score < COLD_THRESHOLD: + hotness_dist["cold"] += 1 + elif score > HOT_THRESHOLD: + hotness_dist["hot"] += 1 + else: + hotness_dist["warm"] += 1 + + # Staleness: use updated_at for access tracking + if updated_at: + age_days = (now - updated_at).total_seconds() / 86400.0 + if age_days > 7: + staleness["not_accessed_7d"] += 1 + if age_days > 30: + staleness["not_accessed_30d"] += 1 + + # Track oldest memory by created_at + if created_at: + age = (now - created_at).total_seconds() / 86400.0 + if age > staleness["oldest_memory_age_days"]: + staleness["oldest_memory_age_days"] = round(age, 1) + + total_memories = sum(by_category.values()) + + return { + "total_memories": total_memories, + "by_category": by_category, + "hotness_distribution": hotness_dist, + "staleness": staleness, + } + + async def get_session_extraction_stats( + self, + session_id: str, + service, + ctx: RequestContext, + ) -> Dict[str, Any]: + """Get extraction stats for a specific session. + + Args: + session_id: The session to query. + service: OpenVikingService instance. + ctx: Request context for tenant scoping. + + Returns: + Dictionary with session extraction statistics. + """ + session = service.sessions.session(ctx, session_id) + await session.load() + + stats = session.stats + return { + "session_id": session_id, + "total_turns": stats.total_turns, + "memories_extracted": stats.memories_extracted, + "contexts_used": stats.contexts_used, + "skills_used": stats.skills_used, + } + + async def _query_all_memories( + self, + ctx: RequestContext, + ) -> List[Dict[str, Any]]: + """Query all memory records in a single DB round-trip. + + Uses the context_type="memory" filter. Callers group by category + in Python to avoid N+1 queries. + """ + try: + return await self._vikingdb.query( + filter=Eq("context_type", "memory"), + limit=10000, + output_fields=[ + "uri", + "active_count", + "updated_at", + "created_at", + "context_type", + ], + ctx=ctx, + ) + except Exception as e: + logger.error("Error querying memories: %s", e) + return [] + + +def _parse_datetime(value) -> Optional[datetime]: + """Parse a datetime value from a VikingDB record.""" + if value is None: + return None + if isinstance(value, datetime): + if value.tzinfo is None: + return value.replace(tzinfo=timezone.utc) + return value + if isinstance(value, str): + try: + dt = datetime.fromisoformat(value.replace("Z", "+00:00")) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt + except (ValueError, TypeError): + return None + return None diff --git a/pyproject.toml b/pyproject.toml index c26aa7cf..241997d8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,6 +76,9 @@ dependencies = [ pyagfs = { path = "third_party/agfs/agfs-sdk/python" } [project.optional-dependencies] +audio = [ + "mutagen>=1.47.0", +] test = [ "pytest>=7.0.0", "pytest-asyncio>=0.21.0", diff --git a/tests/unit/stats/__init__.py b/tests/unit/stats/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/stats/test_stats_aggregator.py b/tests/unit/stats/test_stats_aggregator.py new file mode 100644 index 00000000..449b25a6 --- /dev/null +++ b/tests/unit/stats/test_stats_aggregator.py @@ -0,0 +1,159 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 +"""Tests for StatsAggregator.""" + +from datetime import datetime, timedelta, timezone +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from openviking.storage.stats_aggregator import StatsAggregator, _parse_datetime + + +@pytest.fixture +def mock_vikingdb(): + """Create a mock VikingDB manager.""" + return AsyncMock() + + +@pytest.fixture +def mock_ctx(): + """Create a mock request context.""" + return MagicMock() + + +@pytest.fixture +def aggregator(mock_vikingdb): + return StatsAggregator(mock_vikingdb) + + +def _make_memory_record( + category: str, + active_count: int = 1, + updated_at: datetime = None, + created_at: datetime = None, +): + """Helper to build a mock memory record.""" + now = datetime.now(timezone.utc) + return { + "uri": f"viking://memories/{category}/test-item", + "context_type": "memory", + "active_count": active_count, + "updated_at": (updated_at or now).isoformat(), + "created_at": (created_at or now).isoformat(), + } + + +class TestStatsAggregator: + @pytest.mark.asyncio + async def test_empty_store(self, aggregator, mock_vikingdb, mock_ctx): + """Stats for an empty memory store should return zeros.""" + mock_vikingdb.query = AsyncMock(return_value=[]) + + result = await aggregator.get_memory_stats(mock_ctx) + + assert result["total_memories"] == 0 + assert result["total_vectors"] == 0 + assert result["hotness_distribution"] == {"cold": 0, "warm": 0, "hot": 0} + + @pytest.mark.asyncio + async def test_counts_by_category(self, aggregator, mock_vikingdb, mock_ctx): + """Records should be bucketed into the correct category.""" + now = datetime.now(timezone.utc) + records = [ + _make_memory_record("cases", active_count=5, updated_at=now), + _make_memory_record("cases", active_count=3, updated_at=now), + _make_memory_record("tools", active_count=1, updated_at=now), + ] + mock_vikingdb.query = AsyncMock(return_value=records) + + result = await aggregator.get_memory_stats(mock_ctx) + + assert result["by_category"]["cases"] == 2 + assert result["by_category"]["tools"] == 1 + assert result["total_memories"] == 3 + + @pytest.mark.asyncio + async def test_category_filter(self, aggregator, mock_vikingdb, mock_ctx): + """Passing a category filter should only query that category.""" + now = datetime.now(timezone.utc) + records = [ + _make_memory_record("patterns", active_count=2, updated_at=now), + ] + mock_vikingdb.query = AsyncMock(return_value=records) + + result = await aggregator.get_memory_stats(mock_ctx, category="patterns") + + assert "patterns" in result["by_category"] + assert len(result["by_category"]) == 1 + + @pytest.mark.asyncio + async def test_hotness_buckets(self, aggregator, mock_vikingdb, mock_ctx): + """Records should be classified into cold/warm/hot based on score.""" + now = datetime.now(timezone.utc) + # Recent + high access -> hot + hot_record = _make_memory_record("cases", active_count=50, updated_at=now) + # Old + no access -> cold + cold_record = _make_memory_record( + "cases", active_count=0, updated_at=now - timedelta(days=60) + ) + mock_vikingdb.query = AsyncMock(return_value=[hot_record, cold_record]) + + result = await aggregator.get_memory_stats(mock_ctx, category="cases") + + dist = result["hotness_distribution"] + assert dist["hot"] >= 1 + assert dist["cold"] >= 1 + + @pytest.mark.asyncio + async def test_staleness_metrics(self, aggregator, mock_vikingdb, mock_ctx): + """Staleness should detect records not accessed in 7 and 30 days.""" + now = datetime.now(timezone.utc) + old_record = _make_memory_record( + "events", + active_count=1, + updated_at=now - timedelta(days=40), + created_at=now - timedelta(days=50), + ) + mock_vikingdb.query = AsyncMock(return_value=[old_record]) + + result = await aggregator.get_memory_stats(mock_ctx, category="events") + + assert result["staleness"]["not_accessed_7d"] >= 1 + assert result["staleness"]["not_accessed_30d"] >= 1 + assert result["staleness"]["oldest_memory_age_days"] >= 49 + + @pytest.mark.asyncio + async def test_query_error_returns_empty(self, aggregator, mock_vikingdb, mock_ctx): + """If VikingDB query fails, the category should show 0 records.""" + mock_vikingdb.query = AsyncMock(side_effect=Exception("connection error")) + + result = await aggregator.get_memory_stats(mock_ctx, category="cases") + + assert result["by_category"]["cases"] == 0 + assert result["total_memories"] == 0 + + +class TestParseDatetime: + def test_none(self): + assert _parse_datetime(None) is None + + def test_datetime_object(self): + dt = datetime(2025, 1, 1, tzinfo=timezone.utc) + assert _parse_datetime(dt) == dt + + def test_naive_datetime(self): + dt = datetime(2025, 1, 1) + result = _parse_datetime(dt) + assert result.tzinfo == timezone.utc + + def test_iso_string(self): + result = _parse_datetime("2025-01-01T00:00:00Z") + assert result is not None + assert result.year == 2025 + + def test_invalid_string(self): + assert _parse_datetime("not-a-date") is None + + def test_integer(self): + assert _parse_datetime(12345) is None diff --git a/tests/unit/stats/test_stats_api.py b/tests/unit/stats/test_stats_api.py new file mode 100644 index 00000000..9acc2972 --- /dev/null +++ b/tests/unit/stats/test_stats_api.py @@ -0,0 +1,109 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 +"""Tests for the stats API router.""" + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from openviking.server.routers.stats import router + + +@pytest.fixture +def mock_service(): + """Create a mock OpenVikingService.""" + service = MagicMock() + service.vikingdb_manager = AsyncMock() + service.vikingdb_manager.query = AsyncMock(return_value=[]) + + # Mock session access + mock_session = MagicMock() + mock_session.load = AsyncMock() + mock_stats = MagicMock() + mock_stats.total_turns = 5 + mock_stats.memories_extracted = 3 + mock_stats.contexts_used = 2 + mock_stats.skills_used = 1 + mock_session.stats = mock_stats + service.sessions.session.return_value = mock_session + + return service + + +@pytest.fixture +def mock_ctx(): + """Create a mock request context.""" + return MagicMock() + + +@pytest.fixture +def client(mock_service, mock_ctx): + """Create a test client with mocked dependencies.""" + app = FastAPI() + app.include_router(router) + + with ( + patch("openviking.server.routers.stats.get_service", return_value=mock_service), + patch("openviking.server.routers.stats.get_request_context", return_value=mock_ctx), + ): + yield TestClient(app) + + +class TestGetMemoryStats: + def test_empty_store(self, client): + """Empty store returns zero counts.""" + response = client.get("/api/v1/stats/memories") + assert response.status_code == 200 + data = response.json() + assert data["status"] == "ok" + assert data["result"]["total_memories"] == 0 + + def test_invalid_category(self, client): + """Unknown category returns an error.""" + response = client.get("/api/v1/stats/memories?category=bogus") + assert response.status_code == 200 + data = response.json() + assert data["status"] == "error" + assert "INVALID_ARGUMENT" in data["error"]["code"] + + def test_valid_category_filter(self, client): + """Valid category returns filtered stats.""" + response = client.get("/api/v1/stats/memories?category=cases") + assert response.status_code == 200 + data = response.json() + assert data["status"] == "ok" + assert "cases" in data["result"]["by_category"] + + def test_response_shape(self, client): + """Response includes all expected top-level keys.""" + response = client.get("/api/v1/stats/memories") + data = response.json()["result"] + assert "total_memories" in data + assert "by_category" in data + assert "hotness_distribution" in data + assert "staleness" in data + assert "total_vectors" in data + + +class TestGetSessionStats: + def test_session_stats(self, client): + """Session stats endpoint returns session data.""" + response = client.get("/api/v1/stats/sessions/test-session-123") + assert response.status_code == 200 + data = response.json() + assert data["status"] == "ok" + result = data["result"] + assert result["session_id"] == "test-session-123" + assert result["total_turns"] == 5 + assert result["memories_extracted"] == 3 + + def test_session_not_found(self, client, mock_service): + """Missing session returns NOT_FOUND error.""" + mock_service.sessions.session.side_effect = Exception("not found") + response = client.get("/api/v1/stats/sessions/nonexistent") + assert response.status_code == 200 + data = response.json() + assert data["status"] == "error" + assert data["error"]["code"] == "NOT_FOUND"