From 28f6c4e4b2d247ed9cb43254eff2bd06ab8d532a Mon Sep 17 00:00:00 2001 From: Matt Van Horn <455140+mvanhorn@users.noreply.github.com> Date: Tue, 17 Mar 2026 07:15:43 -0700 Subject: [PATCH 1/5] feat(server): add memory health statistics API endpoints Add two new API endpoints for querying aggregate memory health: - GET /stats/memories - global memory stats (counts by category, hotness distribution, staleness metrics) - GET /stats/sessions/{id} - per-session extraction statistics The StatsAggregator reads from existing VikingDB indexes and the hotness_score function without introducing new storage. Includes unit tests with mocked VikingDB backend. --- openviking/server/app.py | 2 + openviking/server/routers/__init__.py | 2 + openviking/server/routers/stats.py | 74 ++++++++ openviking/storage/stats_aggregator.py | 201 ++++++++++++++++++++++ tests/unit/stats/__init__.py | 0 tests/unit/stats/test_stats_aggregator.py | 161 +++++++++++++++++ tests/unit/stats/test_stats_api.py | 112 ++++++++++++ 7 files changed, 552 insertions(+) create mode 100644 openviking/server/routers/stats.py create mode 100644 openviking/storage/stats_aggregator.py create mode 100644 tests/unit/stats/__init__.py create mode 100644 tests/unit/stats/test_stats_aggregator.py create mode 100644 tests/unit/stats/test_stats_api.py diff --git a/openviking/server/app.py b/openviking/server/app.py index c22794e1..518f90d7 100644 --- a/openviking/server/app.py +++ b/openviking/server/app.py @@ -26,6 +26,7 @@ resources_router, search_router, sessions_router, + stats_router, system_router, tasks_router, ) @@ -173,6 +174,7 @@ async def general_error_handler(request: Request, exc: Exception): app.include_router(search_router) app.include_router(relations_router) app.include_router(sessions_router) + app.include_router(stats_router) app.include_router(pack_router) app.include_router(debug_router) app.include_router(observer_router) diff --git a/openviking/server/routers/__init__.py b/openviking/server/routers/__init__.py index 12aa9f34..437c5fb8 100644 --- a/openviking/server/routers/__init__.py +++ b/openviking/server/routers/__init__.py @@ -13,6 +13,7 @@ from openviking.server.routers.resources import router as resources_router from openviking.server.routers.search import router as search_router from openviking.server.routers.sessions import router as sessions_router +from openviking.server.routers.stats import router as stats_router from openviking.server.routers.system import router as system_router from openviking.server.routers.tasks import router as tasks_router @@ -26,6 +27,7 @@ "search_router", "relations_router", "sessions_router", + "stats_router", "pack_router", "debug_router", "observer_router", diff --git a/openviking/server/routers/stats.py b/openviking/server/routers/stats.py new file mode 100644 index 00000000..1de6b224 --- /dev/null +++ b/openviking/server/routers/stats.py @@ -0,0 +1,74 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 +"""Memory health statistics endpoints for OpenViking HTTP Server.""" + +import logging +from typing import Optional + +from fastapi import APIRouter, Depends, Path, Query + +from openviking.server.auth import get_request_context +from openviking.server.dependencies import get_service +from openviking.server.identity import RequestContext +from openviking.server.models import ErrorInfo, Response +from openviking.storage.stats_aggregator import MEMORY_CATEGORIES, StatsAggregator + +router = APIRouter(prefix="/api/v1/stats", tags=["stats"]) +logger = logging.getLogger(__name__) + + +def _get_aggregator() -> StatsAggregator: + """Build a StatsAggregator from the current service.""" + service = get_service() + return StatsAggregator(service.vikingdb_manager) + + +@router.get("/memories") +async def get_memory_stats( + category: Optional[str] = Query( + None, + description="Filter by memory category (e.g. cases, patterns, tools)", + ), + _ctx: RequestContext = Depends(get_request_context), +): + """Get aggregate memory health statistics. + + Returns counts by category, hotness distribution, and staleness metrics. + Optionally filter by a single category. + """ + if category and category not in MEMORY_CATEGORIES: + return Response( + status="error", + error=ErrorInfo( + code="INVALID_ARGUMENT", + message=f"Unknown category: {category}. Valid categories: {', '.join(MEMORY_CATEGORIES)}", + ), + ) + + aggregator = _get_aggregator() + result = await aggregator.get_memory_stats(_ctx, category=category) + return Response(status="ok", result=result) + + +@router.get("/sessions/{session_id}") +async def get_session_stats( + session_id: str = Path(..., description="Session ID"), + _ctx: RequestContext = Depends(get_request_context), +): + """Get extraction statistics for a specific session.""" + service = get_service() + aggregator = _get_aggregator() + try: + result = await aggregator.get_session_extraction_stats( + session_id, service, _ctx + ) + return Response(status="ok", result=result) + except Exception as e: + logger.warning("Failed to get session stats for %s: %s", session_id, e) + return Response( + status="error", + error=ErrorInfo( + code="NOT_FOUND", + message=f"Session not found: {session_id}", + ), + ) diff --git a/openviking/storage/stats_aggregator.py b/openviking/storage/stats_aggregator.py new file mode 100644 index 00000000..46e87eb3 --- /dev/null +++ b/openviking/storage/stats_aggregator.py @@ -0,0 +1,201 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 +"""Memory health statistics aggregator. + +Queries VikingDB indexes and the hotness_score function to produce +aggregate memory health metrics without introducing new storage. +""" + +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional + +from openviking.retrieve.memory_lifecycle import hotness_score +from openviking.server.identity import RequestContext +from openviking.storage.expr import Eq +from openviking_cli.utils import get_logger + +logger = get_logger(__name__) + +# Memory categories from MemoryCategory enum +MEMORY_CATEGORIES = [ + "profile", + "preferences", + "entities", + "events", + "cases", + "patterns", + "tools", + "skills", +] + +# Hotness buckets +COLD_THRESHOLD = 0.2 +HOT_THRESHOLD = 0.6 + + +class StatsAggregator: + """Aggregates memory health statistics from VikingDB. + + Reads from existing indexes and the hotness_score function. + No new storage required. + """ + + def __init__(self, vikingdb_manager) -> None: + self._vikingdb = vikingdb_manager + + async def get_memory_stats( + self, + ctx: RequestContext, + category: Optional[str] = None, + ) -> Dict[str, Any]: + """Get aggregate memory statistics. + + Args: + ctx: Request context for tenant scoping. + category: Optional category filter (e.g. "cases"). + + Returns: + Dictionary with total counts, category breakdown, + hotness distribution, and staleness metrics. + """ + now = datetime.now(timezone.utc) + + # Build category list to query + categories = [category] if category else MEMORY_CATEGORIES + + by_category: Dict[str, int] = {} + hotness_dist = {"cold": 0, "warm": 0, "hot": 0} + staleness = { + "not_accessed_7d": 0, + "not_accessed_30d": 0, + "oldest_memory_age_days": 0, + } + total_vectors = 0 + + for cat in categories: + records = await self._query_memories_by_category(ctx, cat) + count = len(records) + by_category[cat] = count + total_vectors += count + + for record in records: + active_count = record.get("active_count", 0) + updated_at_raw = record.get("updated_at") + updated_at = _parse_datetime(updated_at_raw) + created_at_raw = record.get("created_at") + created_at = _parse_datetime(created_at_raw) + + # Hotness distribution + score = hotness_score(active_count, updated_at, now=now) + if score < COLD_THRESHOLD: + hotness_dist["cold"] += 1 + elif score > HOT_THRESHOLD: + hotness_dist["hot"] += 1 + else: + hotness_dist["warm"] += 1 + + # Staleness: use updated_at for access tracking + if updated_at: + age_days = (now - updated_at).total_seconds() / 86400.0 + if age_days > 7: + staleness["not_accessed_7d"] += 1 + if age_days > 30: + staleness["not_accessed_30d"] += 1 + + # Track oldest memory by created_at + if created_at: + age = (now - created_at).total_seconds() / 86400.0 + if age > staleness["oldest_memory_age_days"]: + staleness["oldest_memory_age_days"] = round(age, 1) + + total_memories = sum(by_category.values()) + + return { + "total_memories": total_memories, + "by_category": by_category, + "hotness_distribution": hotness_dist, + "staleness": staleness, + "total_vectors": total_vectors, + } + + async def get_session_extraction_stats( + self, + session_id: str, + service, + ctx: RequestContext, + ) -> Dict[str, Any]: + """Get extraction stats for a specific session. + + Args: + session_id: The session to query. + service: OpenVikingService instance. + ctx: Request context for tenant scoping. + + Returns: + Dictionary with session extraction statistics. + """ + session = service.sessions.session(ctx, session_id) + await session.load() + + stats = session.stats + return { + "session_id": session_id, + "total_turns": stats.total_turns, + "memories_extracted": stats.memories_extracted, + "contexts_used": stats.contexts_used, + "skills_used": stats.skills_used, + } + + async def _query_memories_by_category( + self, + ctx: RequestContext, + category: str, + ) -> List[Dict[str, Any]]: + """Query all memory records for a given category. + + Uses the context_type="memory" filter and checks the category + in the URI path. Fetches fields needed for hotness and staleness + computation. + """ + try: + records = await self._vikingdb.query( + filter=Eq("context_type", "memory"), + limit=10000, + output_fields=[ + "uri", + "active_count", + "updated_at", + "created_at", + "context_type", + ], + ctx=ctx, + ) + + # Filter by category using URI prefix + category_prefix = f"/{category}/" + return [ + r for r in records + if category_prefix in r.get("uri", "") + ] + except Exception as e: + logger.error("Error querying memories for category %s: %s", category, e) + return [] + + +def _parse_datetime(value) -> Optional[datetime]: + """Parse a datetime value from a VikingDB record.""" + if value is None: + return None + if isinstance(value, datetime): + if value.tzinfo is None: + return value.replace(tzinfo=timezone.utc) + return value + if isinstance(value, str): + try: + dt = datetime.fromisoformat(value.replace("Z", "+00:00")) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt + except (ValueError, TypeError): + return None + return None diff --git a/tests/unit/stats/__init__.py b/tests/unit/stats/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/stats/test_stats_aggregator.py b/tests/unit/stats/test_stats_aggregator.py new file mode 100644 index 00000000..6cdc6c49 --- /dev/null +++ b/tests/unit/stats/test_stats_aggregator.py @@ -0,0 +1,161 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 +"""Tests for StatsAggregator.""" + +from datetime import datetime, timedelta, timezone +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from openviking.storage.stats_aggregator import StatsAggregator, _parse_datetime + + +@pytest.fixture +def mock_vikingdb(): + """Create a mock VikingDB manager.""" + return AsyncMock() + + +@pytest.fixture +def mock_ctx(): + """Create a mock request context.""" + return MagicMock() + + +@pytest.fixture +def aggregator(mock_vikingdb): + return StatsAggregator(mock_vikingdb) + + +def _make_memory_record( + category: str, + active_count: int = 1, + updated_at: datetime = None, + created_at: datetime = None, +): + """Helper to build a mock memory record.""" + now = datetime.now(timezone.utc) + return { + "uri": f"viking://memories/{category}/test-item", + "context_type": "memory", + "active_count": active_count, + "updated_at": (updated_at or now).isoformat(), + "created_at": (created_at or now).isoformat(), + } + + +class TestStatsAggregator: + @pytest.mark.asyncio + async def test_empty_store(self, aggregator, mock_vikingdb, mock_ctx): + """Stats for an empty memory store should return zeros.""" + mock_vikingdb.query = AsyncMock(return_value=[]) + + result = await aggregator.get_memory_stats(mock_ctx) + + assert result["total_memories"] == 0 + assert result["total_vectors"] == 0 + assert result["hotness_distribution"] == {"cold": 0, "warm": 0, "hot": 0} + + @pytest.mark.asyncio + async def test_counts_by_category(self, aggregator, mock_vikingdb, mock_ctx): + """Records should be bucketed into the correct category.""" + now = datetime.now(timezone.utc) + records = [ + _make_memory_record("cases", active_count=5, updated_at=now), + _make_memory_record("cases", active_count=3, updated_at=now), + _make_memory_record("tools", active_count=1, updated_at=now), + ] + mock_vikingdb.query = AsyncMock(return_value=records) + + result = await aggregator.get_memory_stats(mock_ctx) + + assert result["by_category"]["cases"] == 2 + assert result["by_category"]["tools"] == 1 + assert result["total_memories"] == 3 + + @pytest.mark.asyncio + async def test_category_filter(self, aggregator, mock_vikingdb, mock_ctx): + """Passing a category filter should only query that category.""" + now = datetime.now(timezone.utc) + records = [ + _make_memory_record("patterns", active_count=2, updated_at=now), + ] + mock_vikingdb.query = AsyncMock(return_value=records) + + result = await aggregator.get_memory_stats(mock_ctx, category="patterns") + + assert "patterns" in result["by_category"] + assert len(result["by_category"]) == 1 + + @pytest.mark.asyncio + async def test_hotness_buckets(self, aggregator, mock_vikingdb, mock_ctx): + """Records should be classified into cold/warm/hot based on score.""" + now = datetime.now(timezone.utc) + # Recent + high access -> hot + hot_record = _make_memory_record( + "cases", active_count=50, updated_at=now + ) + # Old + no access -> cold + cold_record = _make_memory_record( + "cases", active_count=0, updated_at=now - timedelta(days=60) + ) + mock_vikingdb.query = AsyncMock(return_value=[hot_record, cold_record]) + + result = await aggregator.get_memory_stats(mock_ctx, category="cases") + + dist = result["hotness_distribution"] + assert dist["hot"] >= 1 + assert dist["cold"] >= 1 + + @pytest.mark.asyncio + async def test_staleness_metrics(self, aggregator, mock_vikingdb, mock_ctx): + """Staleness should detect records not accessed in 7 and 30 days.""" + now = datetime.now(timezone.utc) + old_record = _make_memory_record( + "events", + active_count=1, + updated_at=now - timedelta(days=40), + created_at=now - timedelta(days=50), + ) + mock_vikingdb.query = AsyncMock(return_value=[old_record]) + + result = await aggregator.get_memory_stats(mock_ctx, category="events") + + assert result["staleness"]["not_accessed_7d"] >= 1 + assert result["staleness"]["not_accessed_30d"] >= 1 + assert result["staleness"]["oldest_memory_age_days"] >= 49 + + @pytest.mark.asyncio + async def test_query_error_returns_empty(self, aggregator, mock_vikingdb, mock_ctx): + """If VikingDB query fails, the category should show 0 records.""" + mock_vikingdb.query = AsyncMock(side_effect=Exception("connection error")) + + result = await aggregator.get_memory_stats(mock_ctx, category="cases") + + assert result["by_category"]["cases"] == 0 + assert result["total_memories"] == 0 + + +class TestParseDatetime: + def test_none(self): + assert _parse_datetime(None) is None + + def test_datetime_object(self): + dt = datetime(2025, 1, 1, tzinfo=timezone.utc) + assert _parse_datetime(dt) == dt + + def test_naive_datetime(self): + dt = datetime(2025, 1, 1) + result = _parse_datetime(dt) + assert result.tzinfo == timezone.utc + + def test_iso_string(self): + result = _parse_datetime("2025-01-01T00:00:00Z") + assert result is not None + assert result.year == 2025 + + def test_invalid_string(self): + assert _parse_datetime("not-a-date") is None + + def test_integer(self): + assert _parse_datetime(12345) is None diff --git a/tests/unit/stats/test_stats_api.py b/tests/unit/stats/test_stats_api.py new file mode 100644 index 00000000..1e92e7c0 --- /dev/null +++ b/tests/unit/stats/test_stats_api.py @@ -0,0 +1,112 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 +"""Tests for the stats API router.""" + +from datetime import datetime, timezone +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from openviking.server.models import Response +from openviking.server.routers.stats import router + + +@pytest.fixture +def mock_service(): + """Create a mock OpenVikingService.""" + service = MagicMock() + service.vikingdb_manager = AsyncMock() + service.vikingdb_manager.query = AsyncMock(return_value=[]) + + # Mock session access + mock_session = MagicMock() + mock_session.load = AsyncMock() + mock_stats = MagicMock() + mock_stats.total_turns = 5 + mock_stats.memories_extracted = 3 + mock_stats.contexts_used = 2 + mock_stats.skills_used = 1 + mock_session.stats = mock_stats + service.sessions.session.return_value = mock_session + + return service + + +@pytest.fixture +def mock_ctx(): + """Create a mock request context.""" + return MagicMock() + + +@pytest.fixture +def client(mock_service, mock_ctx): + """Create a test client with mocked dependencies.""" + app = FastAPI() + app.include_router(router) + + with patch( + "openviking.server.routers.stats.get_service", return_value=mock_service + ), patch( + "openviking.server.routers.stats.get_request_context", return_value=mock_ctx + ): + yield TestClient(app) + + +class TestGetMemoryStats: + def test_empty_store(self, client): + """Empty store returns zero counts.""" + response = client.get("/api/v1/stats/memories") + assert response.status_code == 200 + data = response.json() + assert data["status"] == "ok" + assert data["result"]["total_memories"] == 0 + + def test_invalid_category(self, client): + """Unknown category returns an error.""" + response = client.get("/api/v1/stats/memories?category=bogus") + assert response.status_code == 200 + data = response.json() + assert data["status"] == "error" + assert "INVALID_ARGUMENT" in data["error"]["code"] + + def test_valid_category_filter(self, client): + """Valid category returns filtered stats.""" + response = client.get("/api/v1/stats/memories?category=cases") + assert response.status_code == 200 + data = response.json() + assert data["status"] == "ok" + assert "cases" in data["result"]["by_category"] + + def test_response_shape(self, client): + """Response includes all expected top-level keys.""" + response = client.get("/api/v1/stats/memories") + data = response.json()["result"] + assert "total_memories" in data + assert "by_category" in data + assert "hotness_distribution" in data + assert "staleness" in data + assert "total_vectors" in data + + +class TestGetSessionStats: + def test_session_stats(self, client): + """Session stats endpoint returns session data.""" + response = client.get("/api/v1/stats/sessions/test-session-123") + assert response.status_code == 200 + data = response.json() + assert data["status"] == "ok" + result = data["result"] + assert result["session_id"] == "test-session-123" + assert result["total_turns"] == 5 + assert result["memories_extracted"] == 3 + + def test_session_not_found(self, client, mock_service): + """Missing session returns NOT_FOUND error.""" + mock_service.sessions.session.side_effect = Exception("not found") + response = client.get("/api/v1/stats/sessions/nonexistent") + assert response.status_code == 200 + data = response.json() + assert data["status"] == "error" + assert data["error"]["code"] == "NOT_FOUND" From a96de513da305cbe7ac8ea10cb37b2d4c621a473 Mon Sep 17 00:00:00 2001 From: Matt Van Horn <455140+mvanhorn@users.noreply.github.com> Date: Tue, 17 Mar 2026 07:15:54 -0700 Subject: [PATCH 2/5] feat(parse): implement audio resource parser with Whisper transcription Replace the audio parser stub with a working implementation that: - Extracts metadata (duration, sample rate, channels, bitrate) via mutagen - Transcribes speech via Whisper API with timestamped segments - Builds structured ResourceNode tree with L0/L1/L2 content tiers - Falls back to metadata-only output when Whisper is unavailable - Adds mutagen as optional dependency under [audio] extra - Adds audio_summary prompt template for semantic indexing - Includes unit tests with mocked Whisper API and mutagen --- openviking/parse/parsers/media/audio.py | 480 ++++++++++++++---- .../templates/parsing/audio_summary.yaml | 44 ++ pyproject.toml | 3 + tests/unit/parse/__init__.py | 0 tests/unit/parse/test_audio_parser.py | 288 +++++++++++ 5 files changed, 708 insertions(+), 107 deletions(-) create mode 100644 openviking/prompts/templates/parsing/audio_summary.yaml create mode 100644 tests/unit/parse/__init__.py create mode 100644 tests/unit/parse/test_audio_parser.py diff --git a/openviking/parse/parsers/media/audio.py b/openviking/parse/parsers/media/audio.py index 5c57dfc2..fbe944c9 100644 --- a/openviking/parse/parsers/media/audio.py +++ b/openviking/parse/parsers/media/audio.py @@ -1,41 +1,117 @@ # Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. # SPDX-License-Identifier: Apache-2.0 """ -Audio parser - Future implementation. - -Planned Features: -1. Speech-to-text transcription using ASR models -2. Audio metadata extraction (duration, sample rate, channels) -3. Speaker diarization (identify different speakers) -4. Timestamp alignment for transcribed text -5. Generate structured ResourceNode with transcript - -Example workflow: - 1. Load audio file - 2. Extract metadata (duration, format, sample rate) - 3. Transcribe speech to text using Whisper or similar - 4. (Optional) Perform speaker diarization - 5. Create ResourceNode with: - - type: NodeType.ROOT - - children: sections for each speaker/timestamp - - meta: audio metadata and timestamps - 6. Return ParseResult - -Supported formats: MP3, WAV, OGG, FLAC, AAC, M4A +Audio parser with metadata extraction and Whisper transcription. + +Features: +1. Speech-to-text transcription using Whisper API +2. Audio metadata extraction (duration, sample rate, channels) via mutagen +3. Timestamp alignment for transcribed text +4. Generate structured ResourceNode with transcript segments + +Supported formats: MP3, WAV, OGG, FLAC, AAC, M4A, OPUS """ +import io +import time from pathlib import Path -from typing import List, Optional, Union +from typing import Any, Dict, List, Optional, Union from openviking.parse.base import NodeType, ParseResult, ResourceNode from openviking.parse.parsers.base_parser import BaseParser from openviking.parse.parsers.media.constants import AUDIO_EXTENSIONS from openviking_cli.utils.config.parser_config import AudioConfig +from openviking_cli.utils.logger import get_logger + +logger = get_logger(__name__) + +# Magic bytes for audio format validation +AUDIO_MAGIC_BYTES: Dict[str, List[bytes]] = { + ".mp3": [b"ID3", b"\xff\xfb", b"\xff\xf3", b"\xff\xf2"], + ".wav": [b"RIFF"], + ".ogg": [b"OggS"], + ".flac": [b"fLaC"], + ".aac": [b"\xff\xf1", b"\xff\xf9"], + ".m4a": [b"\x00\x00\x00", b"ftypM4A", b"ftypisom"], + ".opus": [b"OggS"], +} + + +def _try_import_mutagen(): + """Lazily import mutagen, returning None if not installed.""" + try: + import mutagen + + return mutagen + except ImportError: + return None + + +def _format_timestamp(seconds: float) -> str: + """Format seconds as MM:SS or H:MM:SS.""" + hours = int(seconds // 3600) + minutes = int((seconds % 3600) // 60) + secs = int(seconds % 60) + if hours > 0: + return f"{hours}:{minutes:02d}:{secs:02d}" + return f"{minutes}:{secs:02d}" + + +def _extract_metadata_mutagen(file_path: Path) -> Dict[str, Any]: + """ + Extract audio metadata using mutagen. + + Args: + file_path: Path to audio file + + Returns: + Dictionary with duration, sample_rate, channels, bitrate, format + """ + mutagen = _try_import_mutagen() + if mutagen is None: + logger.warning( + "[AudioParser] mutagen not installed, skipping metadata extraction. " + "Install with: pip install mutagen" + ) + return {} + + try: + audio = mutagen.File(str(file_path)) + if audio is None: + logger.warning(f"[AudioParser] mutagen could not identify file: {file_path}") + return {} + + meta: Dict[str, Any] = {} + + # Duration + if hasattr(audio.info, "length"): + meta["duration"] = round(audio.info.length, 2) + + # Sample rate + if hasattr(audio.info, "sample_rate"): + meta["sample_rate"] = audio.info.sample_rate + + # Channels + if hasattr(audio.info, "channels"): + meta["channels"] = audio.info.channels + + # Bitrate (bits per second) + if hasattr(audio.info, "bitrate"): + meta["bitrate"] = audio.info.bitrate + + return meta + + except Exception as e: + logger.warning(f"[AudioParser] mutagen metadata extraction failed: {e}") + return {} class AudioParser(BaseParser): """ Audio parser for audio files. + + Extracts metadata via mutagen and transcribes speech via Whisper API. + Falls back to metadata-only output when transcription is unavailable. """ def __init__(self, config: Optional[AudioConfig] = None, **kwargs): @@ -53,23 +129,28 @@ def supported_extensions(self) -> List[str]: """Return supported audio file extensions.""" return AUDIO_EXTENSIONS - async def parse(self, source: Union[str, Path], instruction: str = "", **kwargs) -> ParseResult: + async def parse( + self, source: Union[str, Path], instruction: str = "", **kwargs + ) -> ParseResult: """ - Parse audio file - only copy original file and extract basic metadata, no content understanding. + Parse audio file - extract metadata, transcribe via Whisper, build ResourceNode tree. Args: source: Audio file path + instruction: Processing instruction **kwargs: Additional parsing parameters Returns: - ParseResult with audio content + ParseResult with audio content tree Raises: FileNotFoundError: If source file does not exist - IOError: If audio processing fails + ValueError: If file signature does not match expected format """ from openviking.storage.viking_fs import get_viking_fs + start_time = time.monotonic() + # Convert to Path object file_path = Path(source) if isinstance(source, str) else source if not file_path.exists(): @@ -78,160 +159,339 @@ async def parse(self, source: Union[str, Path], instruction: str = "", **kwargs) viking_fs = get_viking_fs() temp_uri = viking_fs.create_temp_uri() - # Phase 1: Generate temporary files + # Read audio bytes audio_bytes = file_path.read_bytes() ext = file_path.suffix + # Validate magic bytes + self._validate_audio_bytes(audio_bytes, ext, file_path) + from openviking_cli.utils.uri import VikingURI # Sanitize original filename (replace spaces with underscores) original_filename = file_path.name.replace(" ", "_") - # Root directory name: filename stem + _ + extension (without dot) stem = file_path.stem.replace(" ", "_") ext_no_dot = ext[1:] if ext else "" root_dir_name = VikingURI.sanitize_segment(f"{stem}_{ext_no_dot}") root_dir_uri = f"{temp_uri}/{root_dir_name}" await viking_fs.mkdir(root_dir_uri, exist_ok=True) - # 1.1 Save original audio with original filename (sanitized) + # Save original audio await viking_fs.write_file_bytes(f"{root_dir_uri}/{original_filename}", audio_bytes) - # 1.2 Validate audio file using magic bytes - # Define magic bytes for supported audio formats - audio_magic_bytes = { - ".mp3": [b"ID3", b"\xff\xfb", b"\xff\xf3", b"\xff\xf2"], - ".wav": [b"RIFF"], - ".ogg": [b"OggS"], - ".flac": [b"fLaC"], - ".aac": [b"\xff\xf1", b"\xff\xf9"], - ".m4a": [b"\x00\x00\x00", b"ftypM4A", b"ftypisom"], - ".opus": [b"OggS"], - } - - # Check magic bytes - valid = False - ext_lower = ext.lower() - magic_list = audio_magic_bytes.get(ext_lower, []) - for magic in magic_list: - if len(audio_bytes) >= len(magic) and audio_bytes.startswith(magic): - valid = True - break - - if not valid: - raise ValueError( - f"Invalid audio file: {file_path}. File signature does not match expected format {ext_lower}" + # Extract metadata via mutagen + mutagen_meta = _extract_metadata_mutagen(file_path) + duration = mutagen_meta.get("duration", 0) + sample_rate = mutagen_meta.get("sample_rate", 0) + channels = mutagen_meta.get("channels", 0) + bitrate = mutagen_meta.get("bitrate", 0) + format_str = ext_no_dot.lower() + + # Attempt transcription + transcript_segments: List[Dict[str, Any]] = [] + full_transcript = "" + warnings: List[str] = [] + + if self.config.enable_transcription: + try: + transcript_segments = await self._asr_transcribe_with_timestamps( + audio_bytes, self.config.transcription_model, ext + ) + if transcript_segments: + full_transcript = "\n".join( + seg["text"] for seg in transcript_segments + ) + else: + # Try plain transcription + full_transcript = await self._asr_transcribe( + audio_bytes, self.config.transcription_model, ext + ) + except Exception as e: + logger.warning(f"[AudioParser] Transcription failed: {e}") + warnings.append(f"Transcription unavailable: {e}") + + has_transcript = bool(full_transcript.strip()) + + # Save transcript file if available + if has_transcript: + transcript_md = self._build_transcript_markdown( + transcript_segments, full_transcript, file_path.stem ) + await viking_fs.write_file(f"{root_dir_uri}/transcript.md", transcript_md) + + # Build segment child nodes + children = [] + if transcript_segments: + for i, seg in enumerate(transcript_segments): + seg_start = seg.get("start", 0) + seg_end = seg.get("end", 0) + seg_text = seg.get("text", "").strip() + if not seg_text: + continue + + child = ResourceNode( + type=NodeType.SECTION, + title=f"segment_{i + 1:03d} ({_format_timestamp(seg_start)}-{_format_timestamp(seg_end)})", + level=1, + detail_file=None, + content_path=None, + children=[], + content_type="text", + meta={ + "start": seg_start, + "end": seg_end, + "text": seg_text, + }, + ) + children.append(child) + + # Build root node meta + root_meta: Dict[str, Any] = { + "duration": duration, + "sample_rate": sample_rate, + "channels": channels, + "bitrate": bitrate, + "format": format_str, + "content_type": "audio", + "source_title": file_path.stem, + "semantic_name": file_path.stem, + "original_filename": original_filename, + "has_transcript": has_transcript, + "segment_count": len(children), + } - # Extract audio metadata (placeholder) - duration = 0 - sample_rate = 0 - channels = 0 - format_str = ext[1:].upper() - - # Create ResourceNode - metadata only, no content understanding yet + # Create root ResourceNode root_node = ResourceNode( type=NodeType.ROOT, title=file_path.stem, level=0, detail_file=None, content_path=None, - children=[], - meta={ - "duration": duration, - "sample_rate": sample_rate, - "channels": channels, - "format": format_str.lower(), - "content_type": "audio", - "source_title": file_path.stem, - "semantic_name": file_path.stem, - "original_filename": original_filename, - }, + children=children, + content_type="audio", + meta=root_meta, + ) + + # Generate semantic info (L0 abstract, L1 overview) + description = full_transcript if has_transcript else f"Audio file: {file_path.name}" + await self._generate_semantic_info( + root_node, description, viking_fs, has_transcript ) - # Phase 3: Build directory structure (handled by TreeBuilder) + if not has_transcript: + warnings.append( + "No transcript available. Metadata-only output. " + "Configure Whisper API or install openai-whisper for transcription." + ) + + parse_time = time.monotonic() - start_time + return ParseResult( root=root_node, source_path=str(file_path), temp_dir_path=temp_uri, source_format="audio", parser_name="AudioParser", - meta={"content_type": "audio", "format": format_str.lower()}, + parse_time=parse_time, + meta={"content_type": "audio", "format": format_str}, + warnings=warnings, + ) + + def _validate_audio_bytes( + self, audio_bytes: bytes, ext: str, file_path: Path + ) -> None: + """Validate audio file using magic bytes.""" + ext_lower = ext.lower() + magic_list = AUDIO_MAGIC_BYTES.get(ext_lower, []) + for magic in magic_list: + if len(audio_bytes) >= len(magic) and audio_bytes.startswith(magic): + return + # If no magic bytes defined for this extension, skip validation + if not magic_list: + return + raise ValueError( + f"Invalid audio file: {file_path}. " + f"File signature does not match expected format {ext_lower}" ) - async def _asr_transcribe(self, audio_bytes: bytes, model: Optional[str]) -> str: + async def _asr_transcribe( + self, audio_bytes: bytes, model: Optional[str], ext: str = ".mp3" + ) -> str: """ - Generate audio transcription using ASR. + Transcribe audio using Whisper API via OpenAI client. Args: audio_bytes: Audio binary data - model: ASR model name + model: Whisper model name + ext: File extension for mime type hint Returns: - Audio transcription in markdown format - - TODO: Integrate with actual ASR API (Whisper, etc.) + Transcription text """ - # Fallback implementation - returns basic placeholder - return "Audio transcription (ASR integration pending)\n\nThis is an audio. ASR transcription feature has not yet integrated external API." + try: + from openviking_cli.utils.config import get_openviking_config + + config = get_openviking_config() + import openai + + client = openai.AsyncOpenAI( + api_key=config.llm.api_key if hasattr(config, "llm") else None, + ) + + audio_file = io.BytesIO(audio_bytes) + audio_file.name = f"audio{ext}" + + response = await client.audio.transcriptions.create( + model=model or "whisper-1", + file=audio_file, + language=self.config.language, + ) + + return response.text + + except Exception as e: + logger.warning(f"[AudioParser._asr_transcribe] Whisper API call failed: {e}") + return "" async def _asr_transcribe_with_timestamps( - self, audio_bytes: bytes, model: Optional[str] - ) -> Optional[str]: + self, audio_bytes: bytes, model: Optional[str], ext: str = ".mp3" + ) -> List[Dict[str, Any]]: """ - Extract transcription with timestamps from audio using ASR. + Transcribe audio with timestamps using Whisper API verbose_json format. Args: audio_bytes: Audio binary data - model: ASR model name + model: Whisper model name + ext: File extension Returns: - Transcript with timestamps in markdown format, or None if not available + List of segment dicts with keys: start, end, text + """ + try: + from openviking_cli.utils.config import get_openviking_config + + config = get_openviking_config() + import openai - TODO: Integrate with ASR API + client = openai.AsyncOpenAI( + api_key=config.llm.api_key if hasattr(config, "llm") else None, + ) + + audio_file = io.BytesIO(audio_bytes) + audio_file.name = f"audio{ext}" + + response = await client.audio.transcriptions.create( + model=model or "whisper-1", + file=audio_file, + response_format="verbose_json", + timestamp_granularities=["segment"], + language=self.config.language, + ) + + segments = [] + if hasattr(response, "segments") and response.segments: + for seg in response.segments: + segments.append({ + "start": seg.get("start", 0) if isinstance(seg, dict) else getattr(seg, "start", 0), + "end": seg.get("end", 0) if isinstance(seg, dict) else getattr(seg, "end", 0), + "text": seg.get("text", "") if isinstance(seg, dict) else getattr(seg, "text", ""), + }) + + return segments + + except Exception as e: + logger.warning( + f"[AudioParser._asr_transcribe_with_timestamps] Whisper API call failed: {e}" + ) + return [] + + def _build_transcript_markdown( + self, + segments: List[Dict[str, Any]], + full_transcript: str, + title: str, + ) -> str: """ - # Not implemented - return None - return None + Build a markdown transcript file from segments or plain text. + + Args: + segments: Timestamped transcript segments + full_transcript: Full transcript text (used if no segments) + title: Audio file title + + Returns: + Markdown-formatted transcript + """ + parts = [f"# Transcript: {title}\n"] + + if segments: + for seg in segments: + start = _format_timestamp(seg.get("start", 0)) + end = _format_timestamp(seg.get("end", 0)) + text = seg.get("text", "").strip() + if text: + parts.append(f"**[{start} - {end}]** {text}\n") + elif full_transcript.strip(): + parts.append(full_transcript.strip()) + parts.append("") + + return "\n".join(parts) async def _generate_semantic_info( - self, node: ResourceNode, description: str, viking_fs, has_transcript: bool - ): + self, + node: ResourceNode, + description: str, + viking_fs: Any, + has_transcript: bool, + ) -> None: """ - Phase 2: Generate abstract and overview. + Generate L0 abstract and L1 overview for the audio resource. Args: node: ResourceNode to update - description: Audio description + description: Audio transcript or description text viking_fs: VikingFS instance - has_transcript: Whether transcript file exists + has_transcript: Whether transcript is available """ - # Generate abstract (short summary, < 100 tokens) - abstract = description[:200] if len(description) > 200 else description - - # Generate overview (content summary + file list + usage instructions) + # L0 abstract: short summary (< 256 chars) + if has_transcript and len(description) > 50: + first_sentence_end = description.find(".", 20) + if 20 < first_sentence_end < 256: + abstract = description[: first_sentence_end + 1] + else: + abstract = description[:253] + "..." if len(description) > 256 else description + else: + abstract = description[:253] + "..." if len(description) > 256 else description + + # L1 overview overview_parts = [ "## Content Summary\n", - description, + abstract, "\n\n## Available Files\n", - f"- {node.meta['original_filename']}: Original audio file ({node.meta['duration']}s, {node.meta['sample_rate']}Hz, {node.meta['channels']}ch, {node.meta['format'].upper()} format)\n", + ( + f"- {node.meta['original_filename']}: Original audio file " + f"({node.meta['duration']}s, {node.meta['sample_rate']}Hz, " + f"{node.meta['channels']}ch, {node.meta['format'].upper()} format)\n" + ), ] if has_transcript: - overview_parts.append("- transcript.md: Transcript with timestamps from the audio\n") + overview_parts.append( + "- transcript.md: Timestamped transcript from the audio\n" + ) overview_parts.append("\n## Usage\n") overview_parts.append("### Play Audio\n") overview_parts.append("```python\n") overview_parts.append("audio_bytes = await audio_resource.play()\n") overview_parts.append("# Returns: Audio file binary data\n") - overview_parts.append("# Purpose: Play or save the audio\n") overview_parts.append("```\n\n") if has_transcript: - overview_parts.append("### Get Timestamps Transcript\n") + overview_parts.append("### Get Timestamped Transcript\n") overview_parts.append("```python\n") overview_parts.append("timestamps = await audio_resource.timestamps()\n") overview_parts.append("# Returns: FileContent object or None\n") - overview_parts.append("# Purpose: Extract timestamped transcript from the audio\n") overview_parts.append("```\n\n") overview_parts.append("### Get Audio Metadata\n") @@ -245,17 +505,22 @@ async def _generate_semantic_info( overview_parts.append( f"channels = audio_resource.get_channels() # {node.meta['channels']}\n" ) - overview_parts.append(f'format = audio_resource.get_format() # "{node.meta["format"]}"\n') + overview_parts.append( + f'format = audio_resource.get_format() # "{node.meta["format"]}"\n' + ) overview_parts.append("```\n") overview = "".join(overview_parts) - # Store in node meta node.meta["abstract"] = abstract node.meta["overview"] = overview async def parse_content( - self, content: str, source_path: Optional[str] = None, instruction: str = "", **kwargs + self, + content: str, + source_path: Optional[str] = None, + instruction: str = "", + **kwargs, ) -> ParseResult: """ Parse audio from content string - Not yet implemented. @@ -263,6 +528,7 @@ async def parse_content( Args: content: Audio content (base64 or binary string) source_path: Optional source path for metadata + instruction: Processing instruction **kwargs: Additional parsing parameters Returns: diff --git a/openviking/prompts/templates/parsing/audio_summary.yaml b/openviking/prompts/templates/parsing/audio_summary.yaml new file mode 100644 index 00000000..06009d47 --- /dev/null +++ b/openviking/prompts/templates/parsing/audio_summary.yaml @@ -0,0 +1,44 @@ +metadata: + id: "parsing.audio_summary" + name: "Audio Summary" + description: "Generate concise audio summary from transcript for semantic parsing" + version: "1.0.0" + language: "en" + category: "parsing" + +variables: + - name: "transcript" + type: "string" + description: "Full audio transcript text" + required: true + max_length: 30000 + - name: "duration" + type: "string" + description: "Audio duration in seconds" + default: "unknown" + required: false + - name: "format" + type: "string" + description: "Audio file format" + default: "unknown" + required: false + +template: | + Please analyze this audio transcript and generate a concise summary for semantic indexing. + + Audio duration: {{ duration }}s + Audio format: {{ format }} + + Transcript: + {{ transcript }} + + Generate a comprehensive summary that includes: + 1. Main topic or subject of the audio + 2. Key points discussed + 3. Any notable speakers or perspectives + 4. Important conclusions or takeaways + + Keep the summary clear and factual, suitable for semantic search and understanding. + +llm_config: + temperature: 0.0 diff --git a/pyproject.toml b/pyproject.toml index c26aa7cf..241997d8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,6 +76,9 @@ dependencies = [ pyagfs = { path = "third_party/agfs/agfs-sdk/python" } [project.optional-dependencies] +audio = [ + "mutagen>=1.47.0", +] test = [ "pytest>=7.0.0", "pytest-asyncio>=0.21.0", diff --git a/tests/unit/parse/__init__.py b/tests/unit/parse/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/parse/test_audio_parser.py b/tests/unit/parse/test_audio_parser.py new file mode 100644 index 00000000..2b34e8e4 --- /dev/null +++ b/tests/unit/parse/test_audio_parser.py @@ -0,0 +1,288 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 +"""Unit tests for AudioParser with mocked Whisper API and mutagen.""" + +import tempfile +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from openviking.parse.base import NodeType +from openviking.parse.parsers.media.audio import ( + AUDIO_MAGIC_BYTES, + AudioParser, + _extract_metadata_mutagen, + _format_timestamp, +) +from openviking_cli.utils.config.parser_config import AudioConfig + + +class TestFormatTimestamp: + def test_seconds_only(self): + assert _format_timestamp(45) == "0:45" + + def test_minutes_and_seconds(self): + assert _format_timestamp(125) == "2:05" + + def test_hours(self): + assert _format_timestamp(3661) == "1:01:01" + + def test_zero(self): + assert _format_timestamp(0) == "0:00" + + +class TestExtractMetadataMutagen: + @patch("openviking.parse.parsers.media.audio._try_import_mutagen") + def test_mutagen_not_installed(self, mock_import): + mock_import.return_value = None + result = _extract_metadata_mutagen(Path("/fake/audio.mp3")) + assert result == {} + + @patch("openviking.parse.parsers.media.audio._try_import_mutagen") + def test_mutagen_returns_metadata(self, mock_import): + mock_mutagen = MagicMock() + mock_audio = MagicMock() + mock_audio.info.length = 120.5 + mock_audio.info.sample_rate = 44100 + mock_audio.info.channels = 2 + mock_audio.info.bitrate = 320000 + mock_mutagen.File.return_value = mock_audio + mock_import.return_value = mock_mutagen + + result = _extract_metadata_mutagen(Path("/fake/audio.mp3")) + assert result["duration"] == 120.5 + assert result["sample_rate"] == 44100 + assert result["channels"] == 2 + assert result["bitrate"] == 320000 + + @patch("openviking.parse.parsers.media.audio._try_import_mutagen") + def test_mutagen_file_returns_none(self, mock_import): + mock_mutagen = MagicMock() + mock_mutagen.File.return_value = None + mock_import.return_value = mock_mutagen + + result = _extract_metadata_mutagen(Path("/fake/audio.mp3")) + assert result == {} + + @patch("openviking.parse.parsers.media.audio._try_import_mutagen") + def test_mutagen_raises_exception(self, mock_import): + mock_mutagen = MagicMock() + mock_mutagen.File.side_effect = Exception("corrupt file") + mock_import.return_value = mock_mutagen + + result = _extract_metadata_mutagen(Path("/fake/audio.mp3")) + assert result == {} + + +class TestAudioParserInit: + def test_default_config(self): + parser = AudioParser() + assert parser.config.enable_transcription is True + assert parser.config.transcription_model == "whisper-large-v3" + + def test_custom_config(self): + config = AudioConfig(enable_transcription=False, language="en") + parser = AudioParser(config=config) + assert parser.config.enable_transcription is False + assert parser.config.language == "en" + + def test_supported_extensions(self): + parser = AudioParser() + exts = parser.supported_extensions + assert ".mp3" in exts + assert ".wav" in exts + assert ".ogg" in exts + assert ".flac" in exts + assert ".aac" in exts + assert ".m4a" in exts + + def test_can_parse(self): + parser = AudioParser() + assert parser.can_parse("test.mp3") is True + assert parser.can_parse("test.wav") is True + assert parser.can_parse("test.txt") is False + assert parser.can_parse("test.pdf") is False + + +class TestAudioParserValidation: + def test_validate_mp3_id3(self): + parser = AudioParser() + audio_bytes = b"ID3" + b"\x00" * 100 + parser._validate_audio_bytes(audio_bytes, ".mp3", Path("test.mp3")) + + def test_validate_wav_riff(self): + parser = AudioParser() + audio_bytes = b"RIFF" + b"\x00" * 100 + parser._validate_audio_bytes(audio_bytes, ".wav", Path("test.wav")) + + def test_validate_flac(self): + parser = AudioParser() + audio_bytes = b"fLaC" + b"\x00" * 100 + parser._validate_audio_bytes(audio_bytes, ".flac", Path("test.flac")) + + def test_validate_ogg(self): + parser = AudioParser() + audio_bytes = b"OggS" + b"\x00" * 100 + parser._validate_audio_bytes(audio_bytes, ".ogg", Path("test.ogg")) + + def test_invalid_mp3_raises(self): + parser = AudioParser() + audio_bytes = b"NOT_MP3" + b"\x00" * 100 + with pytest.raises(ValueError, match="Invalid audio file"): + parser._validate_audio_bytes(audio_bytes, ".mp3", Path("test.mp3")) + + def test_unknown_extension_skips_validation(self): + parser = AudioParser() + audio_bytes = b"anything" + parser._validate_audio_bytes(audio_bytes, ".xyz", Path("test.xyz")) + + +class TestAudioParserParse: + @pytest.mark.asyncio + async def test_file_not_found(self): + parser = AudioParser() + with pytest.raises(FileNotFoundError, match="Audio file not found"): + await parser.parse("/nonexistent/audio.mp3") + + @pytest.mark.asyncio + @patch("openviking.parse.parsers.media.audio._extract_metadata_mutagen") + async def test_parse_metadata_only(self, mock_metadata): + """Test parsing with transcription disabled - metadata only.""" + mock_metadata.return_value = { + "duration": 60.0, + "sample_rate": 44100, + "channels": 2, + "bitrate": 128000, + } + + config = AudioConfig(enable_transcription=False) + parser = AudioParser(config=config) + + with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as f: + f.write(b"ID3" + b"\x00" * 200) + tmp_path = f.name + + try: + mock_viking_fs = MagicMock() + mock_viking_fs.create_temp_uri.return_value = "viking://temp/test123" + mock_viking_fs.mkdir = AsyncMock() + mock_viking_fs.write_file_bytes = AsyncMock() + mock_viking_fs.write_file = AsyncMock() + + with patch( + "openviking.parse.parsers.media.audio.get_viking_fs", + return_value=mock_viking_fs, + ): + result = await parser.parse(tmp_path) + + assert result.parser_name == "AudioParser" + assert result.source_format == "audio" + assert result.root.type == NodeType.ROOT + assert result.root.meta["duration"] == 60.0 + assert result.root.meta["sample_rate"] == 44100 + assert result.root.meta["channels"] == 2 + assert result.root.meta["has_transcript"] is False + assert len(result.warnings) > 0 + finally: + Path(tmp_path).unlink(missing_ok=True) + + @pytest.mark.asyncio + @patch("openviking.parse.parsers.media.audio._extract_metadata_mutagen") + async def test_parse_with_transcript_segments(self, mock_metadata): + """Test parsing with mocked Whisper returning timestamped segments.""" + mock_metadata.return_value = { + "duration": 30.0, + "sample_rate": 16000, + "channels": 1, + "bitrate": 64000, + } + + config = AudioConfig(enable_transcription=True) + parser = AudioParser(config=config) + + segments = [ + {"start": 0.0, "end": 10.0, "text": "Hello world."}, + {"start": 10.0, "end": 20.0, "text": "This is a test."}, + {"start": 20.0, "end": 30.0, "text": "Goodbye."}, + ] + + with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as f: + f.write(b"ID3" + b"\x00" * 200) + tmp_path = f.name + + try: + mock_viking_fs = MagicMock() + mock_viking_fs.create_temp_uri.return_value = "viking://temp/test456" + mock_viking_fs.mkdir = AsyncMock() + mock_viking_fs.write_file_bytes = AsyncMock() + mock_viking_fs.write_file = AsyncMock() + + with ( + patch( + "openviking.parse.parsers.media.audio.get_viking_fs", + return_value=mock_viking_fs, + ), + patch.object( + parser, + "_asr_transcribe_with_timestamps", + new_callable=AsyncMock, + return_value=segments, + ), + ): + result = await parser.parse(tmp_path) + + assert result.root.meta["has_transcript"] is True + assert result.root.meta["segment_count"] == 3 + assert len(result.root.children) == 3 + assert result.root.children[0].type == NodeType.SECTION + assert "0:00" in result.root.children[0].title + assert result.root.children[0].meta["text"] == "Hello world." + assert len(result.warnings) == 0 + + mock_viking_fs.write_file.assert_called_once() + call_args = mock_viking_fs.write_file.call_args + assert "transcript.md" in call_args[0][0] + finally: + Path(tmp_path).unlink(missing_ok=True) + + +class TestAudioParserTranscript: + def test_build_transcript_markdown_with_segments(self): + parser = AudioParser() + segments = [ + {"start": 0.0, "end": 15.0, "text": "First segment."}, + {"start": 15.0, "end": 30.0, "text": "Second segment."}, + ] + md = parser._build_transcript_markdown(segments, "", "test_audio") + assert "# Transcript: test_audio" in md + assert "**[0:00 - 0:15]** First segment." in md + assert "**[0:15 - 0:30]** Second segment." in md + + def test_build_transcript_markdown_plain(self): + parser = AudioParser() + md = parser._build_transcript_markdown( + [], "This is the full transcript text.", "test_audio" + ) + assert "# Transcript: test_audio" in md + assert "This is the full transcript text." in md + + +class TestAudioParserParseContent: + @pytest.mark.asyncio + async def test_parse_content_not_implemented(self): + parser = AudioParser() + with pytest.raises(NotImplementedError): + await parser.parse_content("base64data") + + +class TestAudioMagicBytes: + def test_magic_bytes_defined(self): + """Verify magic bytes are defined for all supported formats.""" + assert ".mp3" in AUDIO_MAGIC_BYTES + assert ".wav" in AUDIO_MAGIC_BYTES + assert ".ogg" in AUDIO_MAGIC_BYTES + assert ".flac" in AUDIO_MAGIC_BYTES + assert ".aac" in AUDIO_MAGIC_BYTES + assert ".m4a" in AUDIO_MAGIC_BYTES + assert ".opus" in AUDIO_MAGIC_BYTES From 9d63ee5547b373d2aae744c1a0064b16b760ae25 Mon Sep 17 00:00:00 2001 From: Matt Van Horn <455140+mvanhorn@users.noreply.github.com> Date: Tue, 17 Mar 2026 08:09:11 -0700 Subject: [PATCH 3/5] style: format with ruff --- openviking/server/routers/stats.py | 4 +--- openviking/storage/stats_aggregator.py | 5 +---- tests/unit/stats/test_stats_aggregator.py | 6 ++---- tests/unit/stats/test_stats_api.py | 9 +++------ 4 files changed, 7 insertions(+), 17 deletions(-) diff --git a/openviking/server/routers/stats.py b/openviking/server/routers/stats.py index 1de6b224..06115118 100644 --- a/openviking/server/routers/stats.py +++ b/openviking/server/routers/stats.py @@ -59,9 +59,7 @@ async def get_session_stats( service = get_service() aggregator = _get_aggregator() try: - result = await aggregator.get_session_extraction_stats( - session_id, service, _ctx - ) + result = await aggregator.get_session_extraction_stats(session_id, service, _ctx) return Response(status="ok", result=result) except Exception as e: logger.warning("Failed to get session stats for %s: %s", session_id, e) diff --git a/openviking/storage/stats_aggregator.py b/openviking/storage/stats_aggregator.py index 46e87eb3..31ece8ab 100644 --- a/openviking/storage/stats_aggregator.py +++ b/openviking/storage/stats_aggregator.py @@ -173,10 +173,7 @@ async def _query_memories_by_category( # Filter by category using URI prefix category_prefix = f"/{category}/" - return [ - r for r in records - if category_prefix in r.get("uri", "") - ] + return [r for r in records if category_prefix in r.get("uri", "")] except Exception as e: logger.error("Error querying memories for category %s: %s", category, e) return [] diff --git a/tests/unit/stats/test_stats_aggregator.py b/tests/unit/stats/test_stats_aggregator.py index 6cdc6c49..449b25a6 100644 --- a/tests/unit/stats/test_stats_aggregator.py +++ b/tests/unit/stats/test_stats_aggregator.py @@ -3,7 +3,7 @@ """Tests for StatsAggregator.""" from datetime import datetime, timedelta, timezone -from unittest.mock import AsyncMock, MagicMock, patch +from unittest.mock import AsyncMock, MagicMock import pytest @@ -92,9 +92,7 @@ async def test_hotness_buckets(self, aggregator, mock_vikingdb, mock_ctx): """Records should be classified into cold/warm/hot based on score.""" now = datetime.now(timezone.utc) # Recent + high access -> hot - hot_record = _make_memory_record( - "cases", active_count=50, updated_at=now - ) + hot_record = _make_memory_record("cases", active_count=50, updated_at=now) # Old + no access -> cold cold_record = _make_memory_record( "cases", active_count=0, updated_at=now - timedelta(days=60) diff --git a/tests/unit/stats/test_stats_api.py b/tests/unit/stats/test_stats_api.py index 1e92e7c0..9acc2972 100644 --- a/tests/unit/stats/test_stats_api.py +++ b/tests/unit/stats/test_stats_api.py @@ -2,14 +2,12 @@ # SPDX-License-Identifier: Apache-2.0 """Tests for the stats API router.""" -from datetime import datetime, timezone from unittest.mock import AsyncMock, MagicMock, patch import pytest from fastapi import FastAPI from fastapi.testclient import TestClient -from openviking.server.models import Response from openviking.server.routers.stats import router @@ -46,10 +44,9 @@ def client(mock_service, mock_ctx): app = FastAPI() app.include_router(router) - with patch( - "openviking.server.routers.stats.get_service", return_value=mock_service - ), patch( - "openviking.server.routers.stats.get_request_context", return_value=mock_ctx + with ( + patch("openviking.server.routers.stats.get_service", return_value=mock_service), + patch("openviking.server.routers.stats.get_request_context", return_value=mock_ctx), ): yield TestClient(app) From 898cc8e426c1e39be709ccb36966c1e1c2ca9410 Mon Sep 17 00:00:00 2001 From: Matt Van Horn <455140+mvanhorn@users.noreply.github.com> Date: Wed, 18 Mar 2026 06:51:47 -0700 Subject: [PATCH 4/5] Revert audio parser changes from this branch The audio parser feature is unrelated to memory health stats and belongs in its own PR (#707). Reverts audio.py to pre-rewrite state, removes the unused audio_summary.yaml template and audio parser tests. Co-Authored-By: Claude Opus 4.6 (1M context) --- openviking/parse/parsers/media/audio.py | 480 ++++-------------- .../templates/parsing/audio_summary.yaml | 44 -- tests/unit/parse/__init__.py | 0 tests/unit/parse/test_audio_parser.py | 288 ----------- 4 files changed, 107 insertions(+), 705 deletions(-) delete mode 100644 openviking/prompts/templates/parsing/audio_summary.yaml delete mode 100644 tests/unit/parse/__init__.py delete mode 100644 tests/unit/parse/test_audio_parser.py diff --git a/openviking/parse/parsers/media/audio.py b/openviking/parse/parsers/media/audio.py index fbe944c9..5c57dfc2 100644 --- a/openviking/parse/parsers/media/audio.py +++ b/openviking/parse/parsers/media/audio.py @@ -1,117 +1,41 @@ # Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. # SPDX-License-Identifier: Apache-2.0 """ -Audio parser with metadata extraction and Whisper transcription. - -Features: -1. Speech-to-text transcription using Whisper API -2. Audio metadata extraction (duration, sample rate, channels) via mutagen -3. Timestamp alignment for transcribed text -4. Generate structured ResourceNode with transcript segments - -Supported formats: MP3, WAV, OGG, FLAC, AAC, M4A, OPUS +Audio parser - Future implementation. + +Planned Features: +1. Speech-to-text transcription using ASR models +2. Audio metadata extraction (duration, sample rate, channels) +3. Speaker diarization (identify different speakers) +4. Timestamp alignment for transcribed text +5. Generate structured ResourceNode with transcript + +Example workflow: + 1. Load audio file + 2. Extract metadata (duration, format, sample rate) + 3. Transcribe speech to text using Whisper or similar + 4. (Optional) Perform speaker diarization + 5. Create ResourceNode with: + - type: NodeType.ROOT + - children: sections for each speaker/timestamp + - meta: audio metadata and timestamps + 6. Return ParseResult + +Supported formats: MP3, WAV, OGG, FLAC, AAC, M4A """ -import io -import time from pathlib import Path -from typing import Any, Dict, List, Optional, Union +from typing import List, Optional, Union from openviking.parse.base import NodeType, ParseResult, ResourceNode from openviking.parse.parsers.base_parser import BaseParser from openviking.parse.parsers.media.constants import AUDIO_EXTENSIONS from openviking_cli.utils.config.parser_config import AudioConfig -from openviking_cli.utils.logger import get_logger - -logger = get_logger(__name__) - -# Magic bytes for audio format validation -AUDIO_MAGIC_BYTES: Dict[str, List[bytes]] = { - ".mp3": [b"ID3", b"\xff\xfb", b"\xff\xf3", b"\xff\xf2"], - ".wav": [b"RIFF"], - ".ogg": [b"OggS"], - ".flac": [b"fLaC"], - ".aac": [b"\xff\xf1", b"\xff\xf9"], - ".m4a": [b"\x00\x00\x00", b"ftypM4A", b"ftypisom"], - ".opus": [b"OggS"], -} - - -def _try_import_mutagen(): - """Lazily import mutagen, returning None if not installed.""" - try: - import mutagen - - return mutagen - except ImportError: - return None - - -def _format_timestamp(seconds: float) -> str: - """Format seconds as MM:SS or H:MM:SS.""" - hours = int(seconds // 3600) - minutes = int((seconds % 3600) // 60) - secs = int(seconds % 60) - if hours > 0: - return f"{hours}:{minutes:02d}:{secs:02d}" - return f"{minutes}:{secs:02d}" - - -def _extract_metadata_mutagen(file_path: Path) -> Dict[str, Any]: - """ - Extract audio metadata using mutagen. - - Args: - file_path: Path to audio file - - Returns: - Dictionary with duration, sample_rate, channels, bitrate, format - """ - mutagen = _try_import_mutagen() - if mutagen is None: - logger.warning( - "[AudioParser] mutagen not installed, skipping metadata extraction. " - "Install with: pip install mutagen" - ) - return {} - - try: - audio = mutagen.File(str(file_path)) - if audio is None: - logger.warning(f"[AudioParser] mutagen could not identify file: {file_path}") - return {} - - meta: Dict[str, Any] = {} - - # Duration - if hasattr(audio.info, "length"): - meta["duration"] = round(audio.info.length, 2) - - # Sample rate - if hasattr(audio.info, "sample_rate"): - meta["sample_rate"] = audio.info.sample_rate - - # Channels - if hasattr(audio.info, "channels"): - meta["channels"] = audio.info.channels - - # Bitrate (bits per second) - if hasattr(audio.info, "bitrate"): - meta["bitrate"] = audio.info.bitrate - - return meta - - except Exception as e: - logger.warning(f"[AudioParser] mutagen metadata extraction failed: {e}") - return {} class AudioParser(BaseParser): """ Audio parser for audio files. - - Extracts metadata via mutagen and transcribes speech via Whisper API. - Falls back to metadata-only output when transcription is unavailable. """ def __init__(self, config: Optional[AudioConfig] = None, **kwargs): @@ -129,28 +53,23 @@ def supported_extensions(self) -> List[str]: """Return supported audio file extensions.""" return AUDIO_EXTENSIONS - async def parse( - self, source: Union[str, Path], instruction: str = "", **kwargs - ) -> ParseResult: + async def parse(self, source: Union[str, Path], instruction: str = "", **kwargs) -> ParseResult: """ - Parse audio file - extract metadata, transcribe via Whisper, build ResourceNode tree. + Parse audio file - only copy original file and extract basic metadata, no content understanding. Args: source: Audio file path - instruction: Processing instruction **kwargs: Additional parsing parameters Returns: - ParseResult with audio content tree + ParseResult with audio content Raises: FileNotFoundError: If source file does not exist - ValueError: If file signature does not match expected format + IOError: If audio processing fails """ from openviking.storage.viking_fs import get_viking_fs - start_time = time.monotonic() - # Convert to Path object file_path = Path(source) if isinstance(source, str) else source if not file_path.exists(): @@ -159,339 +78,160 @@ async def parse( viking_fs = get_viking_fs() temp_uri = viking_fs.create_temp_uri() - # Read audio bytes + # Phase 1: Generate temporary files audio_bytes = file_path.read_bytes() ext = file_path.suffix - # Validate magic bytes - self._validate_audio_bytes(audio_bytes, ext, file_path) - from openviking_cli.utils.uri import VikingURI # Sanitize original filename (replace spaces with underscores) original_filename = file_path.name.replace(" ", "_") + # Root directory name: filename stem + _ + extension (without dot) stem = file_path.stem.replace(" ", "_") ext_no_dot = ext[1:] if ext else "" root_dir_name = VikingURI.sanitize_segment(f"{stem}_{ext_no_dot}") root_dir_uri = f"{temp_uri}/{root_dir_name}" await viking_fs.mkdir(root_dir_uri, exist_ok=True) - # Save original audio + # 1.1 Save original audio with original filename (sanitized) await viking_fs.write_file_bytes(f"{root_dir_uri}/{original_filename}", audio_bytes) - # Extract metadata via mutagen - mutagen_meta = _extract_metadata_mutagen(file_path) - duration = mutagen_meta.get("duration", 0) - sample_rate = mutagen_meta.get("sample_rate", 0) - channels = mutagen_meta.get("channels", 0) - bitrate = mutagen_meta.get("bitrate", 0) - format_str = ext_no_dot.lower() - - # Attempt transcription - transcript_segments: List[Dict[str, Any]] = [] - full_transcript = "" - warnings: List[str] = [] - - if self.config.enable_transcription: - try: - transcript_segments = await self._asr_transcribe_with_timestamps( - audio_bytes, self.config.transcription_model, ext - ) - if transcript_segments: - full_transcript = "\n".join( - seg["text"] for seg in transcript_segments - ) - else: - # Try plain transcription - full_transcript = await self._asr_transcribe( - audio_bytes, self.config.transcription_model, ext - ) - except Exception as e: - logger.warning(f"[AudioParser] Transcription failed: {e}") - warnings.append(f"Transcription unavailable: {e}") - - has_transcript = bool(full_transcript.strip()) - - # Save transcript file if available - if has_transcript: - transcript_md = self._build_transcript_markdown( - transcript_segments, full_transcript, file_path.stem - ) - await viking_fs.write_file(f"{root_dir_uri}/transcript.md", transcript_md) - - # Build segment child nodes - children = [] - if transcript_segments: - for i, seg in enumerate(transcript_segments): - seg_start = seg.get("start", 0) - seg_end = seg.get("end", 0) - seg_text = seg.get("text", "").strip() - if not seg_text: - continue - - child = ResourceNode( - type=NodeType.SECTION, - title=f"segment_{i + 1:03d} ({_format_timestamp(seg_start)}-{_format_timestamp(seg_end)})", - level=1, - detail_file=None, - content_path=None, - children=[], - content_type="text", - meta={ - "start": seg_start, - "end": seg_end, - "text": seg_text, - }, - ) - children.append(child) - - # Build root node meta - root_meta: Dict[str, Any] = { - "duration": duration, - "sample_rate": sample_rate, - "channels": channels, - "bitrate": bitrate, - "format": format_str, - "content_type": "audio", - "source_title": file_path.stem, - "semantic_name": file_path.stem, - "original_filename": original_filename, - "has_transcript": has_transcript, - "segment_count": len(children), + # 1.2 Validate audio file using magic bytes + # Define magic bytes for supported audio formats + audio_magic_bytes = { + ".mp3": [b"ID3", b"\xff\xfb", b"\xff\xf3", b"\xff\xf2"], + ".wav": [b"RIFF"], + ".ogg": [b"OggS"], + ".flac": [b"fLaC"], + ".aac": [b"\xff\xf1", b"\xff\xf9"], + ".m4a": [b"\x00\x00\x00", b"ftypM4A", b"ftypisom"], + ".opus": [b"OggS"], } - # Create root ResourceNode + # Check magic bytes + valid = False + ext_lower = ext.lower() + magic_list = audio_magic_bytes.get(ext_lower, []) + for magic in magic_list: + if len(audio_bytes) >= len(magic) and audio_bytes.startswith(magic): + valid = True + break + + if not valid: + raise ValueError( + f"Invalid audio file: {file_path}. File signature does not match expected format {ext_lower}" + ) + + # Extract audio metadata (placeholder) + duration = 0 + sample_rate = 0 + channels = 0 + format_str = ext[1:].upper() + + # Create ResourceNode - metadata only, no content understanding yet root_node = ResourceNode( type=NodeType.ROOT, title=file_path.stem, level=0, detail_file=None, content_path=None, - children=children, - content_type="audio", - meta=root_meta, - ) - - # Generate semantic info (L0 abstract, L1 overview) - description = full_transcript if has_transcript else f"Audio file: {file_path.name}" - await self._generate_semantic_info( - root_node, description, viking_fs, has_transcript + children=[], + meta={ + "duration": duration, + "sample_rate": sample_rate, + "channels": channels, + "format": format_str.lower(), + "content_type": "audio", + "source_title": file_path.stem, + "semantic_name": file_path.stem, + "original_filename": original_filename, + }, ) - if not has_transcript: - warnings.append( - "No transcript available. Metadata-only output. " - "Configure Whisper API or install openai-whisper for transcription." - ) - - parse_time = time.monotonic() - start_time - + # Phase 3: Build directory structure (handled by TreeBuilder) return ParseResult( root=root_node, source_path=str(file_path), temp_dir_path=temp_uri, source_format="audio", parser_name="AudioParser", - parse_time=parse_time, - meta={"content_type": "audio", "format": format_str}, - warnings=warnings, - ) - - def _validate_audio_bytes( - self, audio_bytes: bytes, ext: str, file_path: Path - ) -> None: - """Validate audio file using magic bytes.""" - ext_lower = ext.lower() - magic_list = AUDIO_MAGIC_BYTES.get(ext_lower, []) - for magic in magic_list: - if len(audio_bytes) >= len(magic) and audio_bytes.startswith(magic): - return - # If no magic bytes defined for this extension, skip validation - if not magic_list: - return - raise ValueError( - f"Invalid audio file: {file_path}. " - f"File signature does not match expected format {ext_lower}" + meta={"content_type": "audio", "format": format_str.lower()}, ) - async def _asr_transcribe( - self, audio_bytes: bytes, model: Optional[str], ext: str = ".mp3" - ) -> str: + async def _asr_transcribe(self, audio_bytes: bytes, model: Optional[str]) -> str: """ - Transcribe audio using Whisper API via OpenAI client. + Generate audio transcription using ASR. Args: audio_bytes: Audio binary data - model: Whisper model name - ext: File extension for mime type hint + model: ASR model name Returns: - Transcription text - """ - try: - from openviking_cli.utils.config import get_openviking_config + Audio transcription in markdown format - config = get_openviking_config() - import openai - - client = openai.AsyncOpenAI( - api_key=config.llm.api_key if hasattr(config, "llm") else None, - ) - - audio_file = io.BytesIO(audio_bytes) - audio_file.name = f"audio{ext}" - - response = await client.audio.transcriptions.create( - model=model or "whisper-1", - file=audio_file, - language=self.config.language, - ) - - return response.text - - except Exception as e: - logger.warning(f"[AudioParser._asr_transcribe] Whisper API call failed: {e}") - return "" + TODO: Integrate with actual ASR API (Whisper, etc.) + """ + # Fallback implementation - returns basic placeholder + return "Audio transcription (ASR integration pending)\n\nThis is an audio. ASR transcription feature has not yet integrated external API." async def _asr_transcribe_with_timestamps( - self, audio_bytes: bytes, model: Optional[str], ext: str = ".mp3" - ) -> List[Dict[str, Any]]: + self, audio_bytes: bytes, model: Optional[str] + ) -> Optional[str]: """ - Transcribe audio with timestamps using Whisper API verbose_json format. + Extract transcription with timestamps from audio using ASR. Args: audio_bytes: Audio binary data - model: Whisper model name - ext: File extension + model: ASR model name Returns: - List of segment dicts with keys: start, end, text - """ - try: - from openviking_cli.utils.config import get_openviking_config - - config = get_openviking_config() - import openai + Transcript with timestamps in markdown format, or None if not available - client = openai.AsyncOpenAI( - api_key=config.llm.api_key if hasattr(config, "llm") else None, - ) - - audio_file = io.BytesIO(audio_bytes) - audio_file.name = f"audio{ext}" - - response = await client.audio.transcriptions.create( - model=model or "whisper-1", - file=audio_file, - response_format="verbose_json", - timestamp_granularities=["segment"], - language=self.config.language, - ) - - segments = [] - if hasattr(response, "segments") and response.segments: - for seg in response.segments: - segments.append({ - "start": seg.get("start", 0) if isinstance(seg, dict) else getattr(seg, "start", 0), - "end": seg.get("end", 0) if isinstance(seg, dict) else getattr(seg, "end", 0), - "text": seg.get("text", "") if isinstance(seg, dict) else getattr(seg, "text", ""), - }) - - return segments - - except Exception as e: - logger.warning( - f"[AudioParser._asr_transcribe_with_timestamps] Whisper API call failed: {e}" - ) - return [] - - def _build_transcript_markdown( - self, - segments: List[Dict[str, Any]], - full_transcript: str, - title: str, - ) -> str: - """ - Build a markdown transcript file from segments or plain text. - - Args: - segments: Timestamped transcript segments - full_transcript: Full transcript text (used if no segments) - title: Audio file title - - Returns: - Markdown-formatted transcript + TODO: Integrate with ASR API """ - parts = [f"# Transcript: {title}\n"] - - if segments: - for seg in segments: - start = _format_timestamp(seg.get("start", 0)) - end = _format_timestamp(seg.get("end", 0)) - text = seg.get("text", "").strip() - if text: - parts.append(f"**[{start} - {end}]** {text}\n") - elif full_transcript.strip(): - parts.append(full_transcript.strip()) - parts.append("") - - return "\n".join(parts) + # Not implemented - return None + return None async def _generate_semantic_info( - self, - node: ResourceNode, - description: str, - viking_fs: Any, - has_transcript: bool, - ) -> None: + self, node: ResourceNode, description: str, viking_fs, has_transcript: bool + ): """ - Generate L0 abstract and L1 overview for the audio resource. + Phase 2: Generate abstract and overview. Args: node: ResourceNode to update - description: Audio transcript or description text + description: Audio description viking_fs: VikingFS instance - has_transcript: Whether transcript is available + has_transcript: Whether transcript file exists """ - # L0 abstract: short summary (< 256 chars) - if has_transcript and len(description) > 50: - first_sentence_end = description.find(".", 20) - if 20 < first_sentence_end < 256: - abstract = description[: first_sentence_end + 1] - else: - abstract = description[:253] + "..." if len(description) > 256 else description - else: - abstract = description[:253] + "..." if len(description) > 256 else description - - # L1 overview + # Generate abstract (short summary, < 100 tokens) + abstract = description[:200] if len(description) > 200 else description + + # Generate overview (content summary + file list + usage instructions) overview_parts = [ "## Content Summary\n", - abstract, + description, "\n\n## Available Files\n", - ( - f"- {node.meta['original_filename']}: Original audio file " - f"({node.meta['duration']}s, {node.meta['sample_rate']}Hz, " - f"{node.meta['channels']}ch, {node.meta['format'].upper()} format)\n" - ), + f"- {node.meta['original_filename']}: Original audio file ({node.meta['duration']}s, {node.meta['sample_rate']}Hz, {node.meta['channels']}ch, {node.meta['format'].upper()} format)\n", ] if has_transcript: - overview_parts.append( - "- transcript.md: Timestamped transcript from the audio\n" - ) + overview_parts.append("- transcript.md: Transcript with timestamps from the audio\n") overview_parts.append("\n## Usage\n") overview_parts.append("### Play Audio\n") overview_parts.append("```python\n") overview_parts.append("audio_bytes = await audio_resource.play()\n") overview_parts.append("# Returns: Audio file binary data\n") + overview_parts.append("# Purpose: Play or save the audio\n") overview_parts.append("```\n\n") if has_transcript: - overview_parts.append("### Get Timestamped Transcript\n") + overview_parts.append("### Get Timestamps Transcript\n") overview_parts.append("```python\n") overview_parts.append("timestamps = await audio_resource.timestamps()\n") overview_parts.append("# Returns: FileContent object or None\n") + overview_parts.append("# Purpose: Extract timestamped transcript from the audio\n") overview_parts.append("```\n\n") overview_parts.append("### Get Audio Metadata\n") @@ -505,22 +245,17 @@ async def _generate_semantic_info( overview_parts.append( f"channels = audio_resource.get_channels() # {node.meta['channels']}\n" ) - overview_parts.append( - f'format = audio_resource.get_format() # "{node.meta["format"]}"\n' - ) + overview_parts.append(f'format = audio_resource.get_format() # "{node.meta["format"]}"\n') overview_parts.append("```\n") overview = "".join(overview_parts) + # Store in node meta node.meta["abstract"] = abstract node.meta["overview"] = overview async def parse_content( - self, - content: str, - source_path: Optional[str] = None, - instruction: str = "", - **kwargs, + self, content: str, source_path: Optional[str] = None, instruction: str = "", **kwargs ) -> ParseResult: """ Parse audio from content string - Not yet implemented. @@ -528,7 +263,6 @@ async def parse_content( Args: content: Audio content (base64 or binary string) source_path: Optional source path for metadata - instruction: Processing instruction **kwargs: Additional parsing parameters Returns: diff --git a/openviking/prompts/templates/parsing/audio_summary.yaml b/openviking/prompts/templates/parsing/audio_summary.yaml deleted file mode 100644 index 06009d47..00000000 --- a/openviking/prompts/templates/parsing/audio_summary.yaml +++ /dev/null @@ -1,44 +0,0 @@ -metadata: - id: "parsing.audio_summary" - name: "Audio Summary" - description: "Generate concise audio summary from transcript for semantic parsing" - version: "1.0.0" - language: "en" - category: "parsing" - -variables: - - name: "transcript" - type: "string" - description: "Full audio transcript text" - required: true - max_length: 30000 - - name: "duration" - type: "string" - description: "Audio duration in seconds" - default: "unknown" - required: false - - name: "format" - type: "string" - description: "Audio file format" - default: "unknown" - required: false - -template: | - Please analyze this audio transcript and generate a concise summary for semantic indexing. - - Audio duration: {{ duration }}s - Audio format: {{ format }} - - Transcript: - {{ transcript }} - - Generate a comprehensive summary that includes: - 1. Main topic or subject of the audio - 2. Key points discussed - 3. Any notable speakers or perspectives - 4. Important conclusions or takeaways - - Keep the summary clear and factual, suitable for semantic search and understanding. - -llm_config: - temperature: 0.0 diff --git a/tests/unit/parse/__init__.py b/tests/unit/parse/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/unit/parse/test_audio_parser.py b/tests/unit/parse/test_audio_parser.py deleted file mode 100644 index 2b34e8e4..00000000 --- a/tests/unit/parse/test_audio_parser.py +++ /dev/null @@ -1,288 +0,0 @@ -# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. -# SPDX-License-Identifier: Apache-2.0 -"""Unit tests for AudioParser with mocked Whisper API and mutagen.""" - -import tempfile -from pathlib import Path -from unittest.mock import AsyncMock, MagicMock, patch - -import pytest - -from openviking.parse.base import NodeType -from openviking.parse.parsers.media.audio import ( - AUDIO_MAGIC_BYTES, - AudioParser, - _extract_metadata_mutagen, - _format_timestamp, -) -from openviking_cli.utils.config.parser_config import AudioConfig - - -class TestFormatTimestamp: - def test_seconds_only(self): - assert _format_timestamp(45) == "0:45" - - def test_minutes_and_seconds(self): - assert _format_timestamp(125) == "2:05" - - def test_hours(self): - assert _format_timestamp(3661) == "1:01:01" - - def test_zero(self): - assert _format_timestamp(0) == "0:00" - - -class TestExtractMetadataMutagen: - @patch("openviking.parse.parsers.media.audio._try_import_mutagen") - def test_mutagen_not_installed(self, mock_import): - mock_import.return_value = None - result = _extract_metadata_mutagen(Path("/fake/audio.mp3")) - assert result == {} - - @patch("openviking.parse.parsers.media.audio._try_import_mutagen") - def test_mutagen_returns_metadata(self, mock_import): - mock_mutagen = MagicMock() - mock_audio = MagicMock() - mock_audio.info.length = 120.5 - mock_audio.info.sample_rate = 44100 - mock_audio.info.channels = 2 - mock_audio.info.bitrate = 320000 - mock_mutagen.File.return_value = mock_audio - mock_import.return_value = mock_mutagen - - result = _extract_metadata_mutagen(Path("/fake/audio.mp3")) - assert result["duration"] == 120.5 - assert result["sample_rate"] == 44100 - assert result["channels"] == 2 - assert result["bitrate"] == 320000 - - @patch("openviking.parse.parsers.media.audio._try_import_mutagen") - def test_mutagen_file_returns_none(self, mock_import): - mock_mutagen = MagicMock() - mock_mutagen.File.return_value = None - mock_import.return_value = mock_mutagen - - result = _extract_metadata_mutagen(Path("/fake/audio.mp3")) - assert result == {} - - @patch("openviking.parse.parsers.media.audio._try_import_mutagen") - def test_mutagen_raises_exception(self, mock_import): - mock_mutagen = MagicMock() - mock_mutagen.File.side_effect = Exception("corrupt file") - mock_import.return_value = mock_mutagen - - result = _extract_metadata_mutagen(Path("/fake/audio.mp3")) - assert result == {} - - -class TestAudioParserInit: - def test_default_config(self): - parser = AudioParser() - assert parser.config.enable_transcription is True - assert parser.config.transcription_model == "whisper-large-v3" - - def test_custom_config(self): - config = AudioConfig(enable_transcription=False, language="en") - parser = AudioParser(config=config) - assert parser.config.enable_transcription is False - assert parser.config.language == "en" - - def test_supported_extensions(self): - parser = AudioParser() - exts = parser.supported_extensions - assert ".mp3" in exts - assert ".wav" in exts - assert ".ogg" in exts - assert ".flac" in exts - assert ".aac" in exts - assert ".m4a" in exts - - def test_can_parse(self): - parser = AudioParser() - assert parser.can_parse("test.mp3") is True - assert parser.can_parse("test.wav") is True - assert parser.can_parse("test.txt") is False - assert parser.can_parse("test.pdf") is False - - -class TestAudioParserValidation: - def test_validate_mp3_id3(self): - parser = AudioParser() - audio_bytes = b"ID3" + b"\x00" * 100 - parser._validate_audio_bytes(audio_bytes, ".mp3", Path("test.mp3")) - - def test_validate_wav_riff(self): - parser = AudioParser() - audio_bytes = b"RIFF" + b"\x00" * 100 - parser._validate_audio_bytes(audio_bytes, ".wav", Path("test.wav")) - - def test_validate_flac(self): - parser = AudioParser() - audio_bytes = b"fLaC" + b"\x00" * 100 - parser._validate_audio_bytes(audio_bytes, ".flac", Path("test.flac")) - - def test_validate_ogg(self): - parser = AudioParser() - audio_bytes = b"OggS" + b"\x00" * 100 - parser._validate_audio_bytes(audio_bytes, ".ogg", Path("test.ogg")) - - def test_invalid_mp3_raises(self): - parser = AudioParser() - audio_bytes = b"NOT_MP3" + b"\x00" * 100 - with pytest.raises(ValueError, match="Invalid audio file"): - parser._validate_audio_bytes(audio_bytes, ".mp3", Path("test.mp3")) - - def test_unknown_extension_skips_validation(self): - parser = AudioParser() - audio_bytes = b"anything" - parser._validate_audio_bytes(audio_bytes, ".xyz", Path("test.xyz")) - - -class TestAudioParserParse: - @pytest.mark.asyncio - async def test_file_not_found(self): - parser = AudioParser() - with pytest.raises(FileNotFoundError, match="Audio file not found"): - await parser.parse("/nonexistent/audio.mp3") - - @pytest.mark.asyncio - @patch("openviking.parse.parsers.media.audio._extract_metadata_mutagen") - async def test_parse_metadata_only(self, mock_metadata): - """Test parsing with transcription disabled - metadata only.""" - mock_metadata.return_value = { - "duration": 60.0, - "sample_rate": 44100, - "channels": 2, - "bitrate": 128000, - } - - config = AudioConfig(enable_transcription=False) - parser = AudioParser(config=config) - - with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as f: - f.write(b"ID3" + b"\x00" * 200) - tmp_path = f.name - - try: - mock_viking_fs = MagicMock() - mock_viking_fs.create_temp_uri.return_value = "viking://temp/test123" - mock_viking_fs.mkdir = AsyncMock() - mock_viking_fs.write_file_bytes = AsyncMock() - mock_viking_fs.write_file = AsyncMock() - - with patch( - "openviking.parse.parsers.media.audio.get_viking_fs", - return_value=mock_viking_fs, - ): - result = await parser.parse(tmp_path) - - assert result.parser_name == "AudioParser" - assert result.source_format == "audio" - assert result.root.type == NodeType.ROOT - assert result.root.meta["duration"] == 60.0 - assert result.root.meta["sample_rate"] == 44100 - assert result.root.meta["channels"] == 2 - assert result.root.meta["has_transcript"] is False - assert len(result.warnings) > 0 - finally: - Path(tmp_path).unlink(missing_ok=True) - - @pytest.mark.asyncio - @patch("openviking.parse.parsers.media.audio._extract_metadata_mutagen") - async def test_parse_with_transcript_segments(self, mock_metadata): - """Test parsing with mocked Whisper returning timestamped segments.""" - mock_metadata.return_value = { - "duration": 30.0, - "sample_rate": 16000, - "channels": 1, - "bitrate": 64000, - } - - config = AudioConfig(enable_transcription=True) - parser = AudioParser(config=config) - - segments = [ - {"start": 0.0, "end": 10.0, "text": "Hello world."}, - {"start": 10.0, "end": 20.0, "text": "This is a test."}, - {"start": 20.0, "end": 30.0, "text": "Goodbye."}, - ] - - with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as f: - f.write(b"ID3" + b"\x00" * 200) - tmp_path = f.name - - try: - mock_viking_fs = MagicMock() - mock_viking_fs.create_temp_uri.return_value = "viking://temp/test456" - mock_viking_fs.mkdir = AsyncMock() - mock_viking_fs.write_file_bytes = AsyncMock() - mock_viking_fs.write_file = AsyncMock() - - with ( - patch( - "openviking.parse.parsers.media.audio.get_viking_fs", - return_value=mock_viking_fs, - ), - patch.object( - parser, - "_asr_transcribe_with_timestamps", - new_callable=AsyncMock, - return_value=segments, - ), - ): - result = await parser.parse(tmp_path) - - assert result.root.meta["has_transcript"] is True - assert result.root.meta["segment_count"] == 3 - assert len(result.root.children) == 3 - assert result.root.children[0].type == NodeType.SECTION - assert "0:00" in result.root.children[0].title - assert result.root.children[0].meta["text"] == "Hello world." - assert len(result.warnings) == 0 - - mock_viking_fs.write_file.assert_called_once() - call_args = mock_viking_fs.write_file.call_args - assert "transcript.md" in call_args[0][0] - finally: - Path(tmp_path).unlink(missing_ok=True) - - -class TestAudioParserTranscript: - def test_build_transcript_markdown_with_segments(self): - parser = AudioParser() - segments = [ - {"start": 0.0, "end": 15.0, "text": "First segment."}, - {"start": 15.0, "end": 30.0, "text": "Second segment."}, - ] - md = parser._build_transcript_markdown(segments, "", "test_audio") - assert "# Transcript: test_audio" in md - assert "**[0:00 - 0:15]** First segment." in md - assert "**[0:15 - 0:30]** Second segment." in md - - def test_build_transcript_markdown_plain(self): - parser = AudioParser() - md = parser._build_transcript_markdown( - [], "This is the full transcript text.", "test_audio" - ) - assert "# Transcript: test_audio" in md - assert "This is the full transcript text." in md - - -class TestAudioParserParseContent: - @pytest.mark.asyncio - async def test_parse_content_not_implemented(self): - parser = AudioParser() - with pytest.raises(NotImplementedError): - await parser.parse_content("base64data") - - -class TestAudioMagicBytes: - def test_magic_bytes_defined(self): - """Verify magic bytes are defined for all supported formats.""" - assert ".mp3" in AUDIO_MAGIC_BYTES - assert ".wav" in AUDIO_MAGIC_BYTES - assert ".ogg" in AUDIO_MAGIC_BYTES - assert ".flac" in AUDIO_MAGIC_BYTES - assert ".aac" in AUDIO_MAGIC_BYTES - assert ".m4a" in AUDIO_MAGIC_BYTES - assert ".opus" in AUDIO_MAGIC_BYTES From c0d13adb1306027c46185b3ebd170842ed253c17 Mon Sep 17 00:00:00 2001 From: Matt Van Horn <455140+mvanhorn@users.noreply.github.com> Date: Wed, 18 Mar 2026 06:52:27 -0700 Subject: [PATCH 5/5] Address review feedback: fix N+1 query, remove total_vectors, fix error handling - Replace per-category _query_memories_by_category with single _query_all_memories call, grouping by category in Python (1 DB round-trip instead of 8) - Remove misleading total_vectors field (was identical to total_memories). Will add real vector count from VikingDB index stats in a follow-up - Distinguish KeyError (session not found) from other failures in stats.py endpoint, returning INTERNAL_ERROR for unexpected exceptions Co-Authored-By: Claude Opus 4.6 (1M context) --- openviking/server/routers/stats.py | 12 +++++++-- openviking/storage/stats_aggregator.py | 36 +++++++++++++------------- 2 files changed, 28 insertions(+), 20 deletions(-) diff --git a/openviking/server/routers/stats.py b/openviking/server/routers/stats.py index 06115118..33f79197 100644 --- a/openviking/server/routers/stats.py +++ b/openviking/server/routers/stats.py @@ -61,8 +61,7 @@ async def get_session_stats( try: result = await aggregator.get_session_extraction_stats(session_id, service, _ctx) return Response(status="ok", result=result) - except Exception as e: - logger.warning("Failed to get session stats for %s: %s", session_id, e) + except KeyError: return Response( status="error", error=ErrorInfo( @@ -70,3 +69,12 @@ async def get_session_stats( message=f"Session not found: {session_id}", ), ) + except Exception as e: + logger.error("Failed to get session stats for %s: %s", session_id, e) + return Response( + status="error", + error=ErrorInfo( + code="INTERNAL_ERROR", + message=f"Failed to retrieve session stats: {type(e).__name__}", + ), + ) diff --git a/openviking/storage/stats_aggregator.py b/openviking/storage/stats_aggregator.py index 31ece8ab..76bf6240 100644 --- a/openviking/storage/stats_aggregator.py +++ b/openviking/storage/stats_aggregator.py @@ -70,13 +70,20 @@ async def get_memory_stats( "not_accessed_30d": 0, "oldest_memory_age_days": 0, } - total_vectors = 0 + + # Fetch all memories once and group by category in Python + all_records = await self._query_all_memories(ctx) + grouped: Dict[str, List[Dict[str, Any]]] = {cat: [] for cat in categories} + for record in all_records: + uri = record.get("uri", "") + for cat in categories: + if f"/{cat}/" in uri: + grouped[cat].append(record) + break for cat in categories: - records = await self._query_memories_by_category(ctx, cat) - count = len(records) - by_category[cat] = count - total_vectors += count + records = grouped[cat] + by_category[cat] = len(records) for record in records: active_count = record.get("active_count", 0) @@ -115,7 +122,6 @@ async def get_memory_stats( "by_category": by_category, "hotness_distribution": hotness_dist, "staleness": staleness, - "total_vectors": total_vectors, } async def get_session_extraction_stats( @@ -146,19 +152,17 @@ async def get_session_extraction_stats( "skills_used": stats.skills_used, } - async def _query_memories_by_category( + async def _query_all_memories( self, ctx: RequestContext, - category: str, ) -> List[Dict[str, Any]]: - """Query all memory records for a given category. + """Query all memory records in a single DB round-trip. - Uses the context_type="memory" filter and checks the category - in the URI path. Fetches fields needed for hotness and staleness - computation. + Uses the context_type="memory" filter. Callers group by category + in Python to avoid N+1 queries. """ try: - records = await self._vikingdb.query( + return await self._vikingdb.query( filter=Eq("context_type", "memory"), limit=10000, output_fields=[ @@ -170,12 +174,8 @@ async def _query_memories_by_category( ], ctx=ctx, ) - - # Filter by category using URI prefix - category_prefix = f"/{category}/" - return [r for r in records if category_prefix in r.get("uri", "")] except Exception as e: - logger.error("Error querying memories for category %s: %s", category, e) + logger.error("Error querying memories: %s", e) return []