Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions openviking/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
resources_router,
search_router,
sessions_router,
stats_router,
system_router,
tasks_router,
)
Expand Down Expand Up @@ -173,6 +174,7 @@ async def general_error_handler(request: Request, exc: Exception):
app.include_router(search_router)
app.include_router(relations_router)
app.include_router(sessions_router)
app.include_router(stats_router)
app.include_router(pack_router)
app.include_router(debug_router)
app.include_router(observer_router)
Expand Down
2 changes: 2 additions & 0 deletions openviking/server/routers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from openviking.server.routers.resources import router as resources_router
from openviking.server.routers.search import router as search_router
from openviking.server.routers.sessions import router as sessions_router
from openviking.server.routers.stats import router as stats_router
from openviking.server.routers.system import router as system_router
from openviking.server.routers.tasks import router as tasks_router

Expand All @@ -26,6 +27,7 @@
"search_router",
"relations_router",
"sessions_router",
"stats_router",
"pack_router",
"debug_router",
"observer_router",
Expand Down
80 changes: 80 additions & 0 deletions openviking/server/routers/stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd.
# SPDX-License-Identifier: Apache-2.0
"""Memory health statistics endpoints for OpenViking HTTP Server."""

import logging
from typing import Optional

from fastapi import APIRouter, Depends, Path, Query

from openviking.server.auth import get_request_context
from openviking.server.dependencies import get_service
from openviking.server.identity import RequestContext
from openviking.server.models import ErrorInfo, Response
from openviking.storage.stats_aggregator import MEMORY_CATEGORIES, StatsAggregator

router = APIRouter(prefix="/api/v1/stats", tags=["stats"])
logger = logging.getLogger(__name__)


def _get_aggregator() -> StatsAggregator:
"""Build a StatsAggregator from the current service."""
service = get_service()
return StatsAggregator(service.vikingdb_manager)


@router.get("/memories")
async def get_memory_stats(
category: Optional[str] = Query(
None,
description="Filter by memory category (e.g. cases, patterns, tools)",
),
_ctx: RequestContext = Depends(get_request_context),
):
"""Get aggregate memory health statistics.

Returns counts by category, hotness distribution, and staleness metrics.
Optionally filter by a single category.
"""
if category and category not in MEMORY_CATEGORIES:
return Response(
status="error",
error=ErrorInfo(
code="INVALID_ARGUMENT",
message=f"Unknown category: {category}. Valid categories: {', '.join(MEMORY_CATEGORIES)}",
),
)

aggregator = _get_aggregator()
result = await aggregator.get_memory_stats(_ctx, category=category)
return Response(status="ok", result=result)


@router.get("/sessions/{session_id}")
async def get_session_stats(
session_id: str = Path(..., description="Session ID"),
_ctx: RequestContext = Depends(get_request_context),
):
"""Get extraction statistics for a specific session."""
service = get_service()
aggregator = _get_aggregator()
try:
result = await aggregator.get_session_extraction_stats(session_id, service, _ctx)
return Response(status="ok", result=result)
except KeyError:
return Response(
status="error",
error=ErrorInfo(
code="NOT_FOUND",
message=f"Session not found: {session_id}",
),
)
except Exception as e:
logger.error("Failed to get session stats for %s: %s", session_id, e)
return Response(
status="error",
error=ErrorInfo(
code="INTERNAL_ERROR",
message=f"Failed to retrieve session stats: {type(e).__name__}",
),
)
198 changes: 198 additions & 0 deletions openviking/storage/stats_aggregator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd.
# SPDX-License-Identifier: Apache-2.0
"""Memory health statistics aggregator.

Queries VikingDB indexes and the hotness_score function to produce
aggregate memory health metrics without introducing new storage.
"""

from datetime import datetime, timezone
from typing import Any, Dict, List, Optional

from openviking.retrieve.memory_lifecycle import hotness_score
from openviking.server.identity import RequestContext
from openviking.storage.expr import Eq
from openviking_cli.utils import get_logger

logger = get_logger(__name__)

# Memory categories from MemoryCategory enum
MEMORY_CATEGORIES = [
"profile",
"preferences",
"entities",
"events",
"cases",
"patterns",
"tools",
"skills",
]

# Hotness buckets
COLD_THRESHOLD = 0.2
HOT_THRESHOLD = 0.6


class StatsAggregator:
"""Aggregates memory health statistics from VikingDB.

Reads from existing indexes and the hotness_score function.
No new storage required.
"""

def __init__(self, vikingdb_manager) -> None:
self._vikingdb = vikingdb_manager

async def get_memory_stats(
self,
ctx: RequestContext,
category: Optional[str] = None,
) -> Dict[str, Any]:
"""Get aggregate memory statistics.

Args:
ctx: Request context for tenant scoping.
category: Optional category filter (e.g. "cases").

Returns:
Dictionary with total counts, category breakdown,
hotness distribution, and staleness metrics.
"""
now = datetime.now(timezone.utc)

# Build category list to query
categories = [category] if category else MEMORY_CATEGORIES

by_category: Dict[str, int] = {}
hotness_dist = {"cold": 0, "warm": 0, "hot": 0}
staleness = {
"not_accessed_7d": 0,
"not_accessed_30d": 0,
"oldest_memory_age_days": 0,
}

# Fetch all memories once and group by category in Python
all_records = await self._query_all_memories(ctx)
grouped: Dict[str, List[Dict[str, Any]]] = {cat: [] for cat in categories}
for record in all_records:
uri = record.get("uri", "")
for cat in categories:
if f"/{cat}/" in uri:
grouped[cat].append(record)
break

for cat in categories:
records = grouped[cat]
by_category[cat] = len(records)

for record in records:
active_count = record.get("active_count", 0)
updated_at_raw = record.get("updated_at")
updated_at = _parse_datetime(updated_at_raw)
created_at_raw = record.get("created_at")
created_at = _parse_datetime(created_at_raw)

# Hotness distribution
score = hotness_score(active_count, updated_at, now=now)
if score < COLD_THRESHOLD:
hotness_dist["cold"] += 1
elif score > HOT_THRESHOLD:
hotness_dist["hot"] += 1
else:
hotness_dist["warm"] += 1

# Staleness: use updated_at for access tracking
if updated_at:
age_days = (now - updated_at).total_seconds() / 86400.0
if age_days > 7:
staleness["not_accessed_7d"] += 1
if age_days > 30:
staleness["not_accessed_30d"] += 1

# Track oldest memory by created_at
if created_at:
age = (now - created_at).total_seconds() / 86400.0
if age > staleness["oldest_memory_age_days"]:
staleness["oldest_memory_age_days"] = round(age, 1)

total_memories = sum(by_category.values())

return {
"total_memories": total_memories,
"by_category": by_category,
"hotness_distribution": hotness_dist,
"staleness": staleness,
}

async def get_session_extraction_stats(
self,
session_id: str,
service,
ctx: RequestContext,
) -> Dict[str, Any]:
"""Get extraction stats for a specific session.

Args:
session_id: The session to query.
service: OpenVikingService instance.
ctx: Request context for tenant scoping.

Returns:
Dictionary with session extraction statistics.
"""
session = service.sessions.session(ctx, session_id)
await session.load()

stats = session.stats
return {
"session_id": session_id,
"total_turns": stats.total_turns,
"memories_extracted": stats.memories_extracted,
"contexts_used": stats.contexts_used,
"skills_used": stats.skills_used,
}

async def _query_all_memories(
self,
ctx: RequestContext,
) -> List[Dict[str, Any]]:
"""Query all memory records in a single DB round-trip.

Uses the context_type="memory" filter. Callers group by category
in Python to avoid N+1 queries.
"""
try:
return await self._vikingdb.query(
filter=Eq("context_type", "memory"),
limit=10000,
output_fields=[
"uri",
"active_count",
"updated_at",
"created_at",
"context_type",
],
ctx=ctx,
)
except Exception as e:
logger.error("Error querying memories: %s", e)
return []


def _parse_datetime(value) -> Optional[datetime]:
"""Parse a datetime value from a VikingDB record."""
if value is None:
return None
if isinstance(value, datetime):
if value.tzinfo is None:
return value.replace(tzinfo=timezone.utc)
return value
if isinstance(value, str):
try:
dt = datetime.fromisoformat(value.replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except (ValueError, TypeError):
return None
return None
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ dependencies = [
pyagfs = { path = "third_party/agfs/agfs-sdk/python" }

[project.optional-dependencies]
audio = [
"mutagen>=1.47.0",
]
test = [
"pytest>=7.0.0",
"pytest-asyncio>=0.21.0",
Expand Down
Empty file added tests/unit/stats/__init__.py
Empty file.
Loading
Loading