From 32e2e47e9d27c823c819706ae8d50dacc6c49c5c Mon Sep 17 00:00:00 2001 From: Ankush Malaker <43288948+AnkushMalaker@users.noreply.github.com> Date: Mon, 12 Jan 2026 05:34:16 +0000 Subject: [PATCH 1/3] Refactor audio storage to MongoDB chunks and enhance cleanup settings management - Replaced the legacy AudioFile model with AudioChunkDocument for storing audio data in MongoDB, optimizing storage and retrieval. - Introduced CleanupSettings dataclass for managing soft-deletion configurations, including auto-cleanup and retention days. - Added admin API routes for retrieving and saving cleanup settings, ensuring better control over data retention policies. - Updated audio processing workflows to utilize MongoDB chunks, removing dependencies on disk-based audio files. - Enhanced tests to validate the new audio chunk storage and cleanup functionalities, ensuring robust integration with existing systems. --- .../src/advanced_omi_backend/app_factory.py | 4 +- .../src/advanced_omi_backend/config.py | 97 +++ .../controllers/audio_controller.py | 100 +-- .../controllers/conversation_controller.py | 227 ++++-- .../controllers/system_controller.py | 64 ++ .../controllers/websocket_controller.py | 46 +- .../models/audio_chunk.py | 158 ++++ .../advanced_omi_backend/models/audio_file.py | 64 -- .../models/conversation.py | 18 +- .../src/advanced_omi_backend/models/job.py | 8 +- .../routers/api_router.py | 2 + .../routers/modules/__init__.py | 3 + .../routers/modules/admin_routes.py | 122 +++ .../routers/modules/audio_routes.py | 305 +++++++- .../routers/modules/conversation_routes.py | 17 +- .../routers/modules/system_routes.py | 22 + .../utils/audio_chunk_utils.py | 739 ++++++++++++++++++ .../advanced_omi_backend/utils/audio_utils.py | 17 - .../utils/conversation_utils.py | 44 -- .../utils/gdrive_audio_utils.py | 13 +- .../workers/audio_jobs.py | 363 +++++---- .../workers/cleanup_jobs.py | 138 ++++ .../workers/conversation_jobs.py | 31 +- .../workers/speaker_jobs.py | 114 ++- .../workers/transcription_jobs.py | 69 +- .../tests/test_audio_persistence_mongodb.py | 431 ++++++++++ tests/.env.test | 4 + tests/endpoints/audio_upload_tests.robot | 24 +- tests/infrastructure/infra_tests.robot | 42 +- .../mongodb_audio_storage_tests.robot | 106 +++ tests/libs/mongodb_helper.py | 106 +++ tests/resources/mongodb_keywords.robot | 118 +++ tests/test-requirements.txt | 1 + 33 files changed, 3078 insertions(+), 539 deletions(-) create mode 100644 backends/advanced/src/advanced_omi_backend/models/audio_chunk.py delete mode 100644 backends/advanced/src/advanced_omi_backend/models/audio_file.py create mode 100644 backends/advanced/src/advanced_omi_backend/routers/modules/admin_routes.py create mode 100644 backends/advanced/src/advanced_omi_backend/utils/audio_chunk_utils.py create mode 100644 backends/advanced/src/advanced_omi_backend/workers/cleanup_jobs.py create mode 100644 backends/advanced/tests/test_audio_persistence_mongodb.py create mode 100644 tests/integration/mongodb_audio_storage_tests.robot create mode 100644 tests/libs/mongodb_helper.py create mode 100644 tests/resources/mongodb_keywords.robot diff --git a/backends/advanced/src/advanced_omi_backend/app_factory.py b/backends/advanced/src/advanced_omi_backend/app_factory.py index 8a162cec..b8bc3e80 100644 --- a/backends/advanced/src/advanced_omi_backend/app_factory.py +++ b/backends/advanced/src/advanced_omi_backend/app_factory.py @@ -54,12 +54,12 @@ async def lifespan(app: FastAPI): try: from beanie import init_beanie from advanced_omi_backend.models.conversation import Conversation - from advanced_omi_backend.models.audio_file import AudioFile + from advanced_omi_backend.models.audio_chunk import AudioChunkDocument from advanced_omi_backend.models.user import User await init_beanie( database=config.db, - document_models=[User, Conversation, AudioFile], + document_models=[User, Conversation, AudioChunkDocument], ) application_logger.info("Beanie initialized for all document models") except Exception as e: diff --git a/backends/advanced/src/advanced_omi_backend/config.py b/backends/advanced/src/advanced_omi_backend/config.py index 2b07a8d4..f335b8be 100644 --- a/backends/advanced/src/advanced_omi_backend/config.py +++ b/backends/advanced/src/advanced_omi_backend/config.py @@ -9,7 +9,9 @@ import logging import os import shutil +from dataclasses import dataclass, asdict from pathlib import Path +from typing import Optional logger = logging.getLogger(__name__) @@ -131,6 +133,101 @@ def save_diarization_settings_to_file(settings): return False +# ============================================================================ +# Cleanup Settings (JSON file-based with in-memory caching) +# ============================================================================ + +@dataclass +class CleanupSettings: + """Cleanup configuration for soft-deleted conversations.""" + auto_cleanup_enabled: bool = False + retention_days: int = 30 + +# Global cache for cleanup settings +_cleanup_settings: Optional[CleanupSettings] = None + + +def get_cleanup_config_path() -> Path: + """Get path to cleanup settings JSON file.""" + data_dir = Path(os.getenv("DATA_DIR", "/app/data")) + data_dir.mkdir(parents=True, exist_ok=True) + return data_dir / "cleanup_config.json" + + +def load_cleanup_settings_from_file() -> CleanupSettings: + """ + Load cleanup settings from JSON file or return defaults. + + Returns cached settings if available, otherwise loads from file. + If file doesn't exist, returns default settings. + """ + global _cleanup_settings + + # Return cached settings if available + if _cleanup_settings is not None: + return _cleanup_settings + + config_path = get_cleanup_config_path() + + # Try to load from file + if config_path.exists(): + try: + with open(config_path, "r") as f: + data = json.load(f) + _cleanup_settings = CleanupSettings(**data) + logger.info(f"✅ Loaded cleanup settings: auto_cleanup={_cleanup_settings.auto_cleanup_enabled}, retention={_cleanup_settings.retention_days}d") + return _cleanup_settings + except Exception as e: + logger.error(f"❌ Failed to load cleanup settings from {config_path}: {e}") + + # Return defaults if file doesn't exist or failed to load + _cleanup_settings = CleanupSettings() + logger.info("Using default cleanup settings (auto_cleanup_enabled=False, retention_days=30)") + return _cleanup_settings + + +def save_cleanup_settings_to_file(settings: CleanupSettings) -> None: + """ + Save cleanup settings to JSON file and update in-memory cache. + + Args: + settings: CleanupSettings to persist + + Raises: + Exception: If file write fails + """ + global _cleanup_settings + + config_path = get_cleanup_config_path() + + try: + # Save to JSON file + with open(config_path, "w") as f: + json.dump(asdict(settings), f, indent=2) + + # Update in-memory cache + _cleanup_settings = settings + + logger.info(f"✅ Saved cleanup settings: auto_cleanup={settings.auto_cleanup_enabled}, retention={settings.retention_days}d") + except Exception as e: + logger.error(f"❌ Failed to save cleanup settings to {config_path}: {e}") + raise + + +def get_cleanup_settings() -> dict: + """ + Get current cleanup settings as dict (for API responses). + + Returns: + Dict with auto_cleanup_enabled and retention_days + """ + settings = load_cleanup_settings_from_file() + return { + "auto_cleanup_enabled": settings.auto_cleanup_enabled, + "retention_days": settings.retention_days, + } + + def get_speech_detection_settings(): """Get speech detection settings from environment or defaults.""" diff --git a/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py index e63dd883..143cb253 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py @@ -17,8 +17,9 @@ from advanced_omi_backend.utils.audio_utils import ( AudioValidationError, - write_audio_file, + validate_and_prepare_audio, ) +from advanced_omi_backend.utils.audio_chunk_utils import convert_audio_to_chunks from advanced_omi_backend.models.job import JobPriority from advanced_omi_backend.models.user import User from advanced_omi_backend.models.conversation import create_conversation @@ -86,33 +87,19 @@ async def upload_and_process_audio_files( # Generate audio UUID and timestamp if source == "gdrive": audio_uuid = getattr(file, "audio_uuid", None) - if not audio_uuid: + if not audio_uuid: audio_logger.error(f"Missing audio_uuid for gdrive file: {file.filename}") - audio_uuid = str(uuid.uuid4()) - else: + audio_uuid = str(uuid.uuid4()) + else: audio_uuid = str(uuid.uuid4()) timestamp = int(time.time() * 1000) - # Determine output directory (with optional subfolder) - from advanced_omi_backend.config import CHUNK_DIR - if folder: - chunk_dir = CHUNK_DIR / folder - chunk_dir.mkdir(parents=True, exist_ok=True) - else: - chunk_dir = CHUNK_DIR - - # Validate, write audio file and create AudioSession (all in one) + # Validate and prepare audio (read format from WAV file) try: - relative_audio_path, file_path, duration = await write_audio_file( - raw_audio_data=content, - audio_uuid=audio_uuid, - source=source, - client_id=client_id, - user_id=user.user_id, - user_email=user.email, - timestamp=timestamp, - chunk_dir=chunk_dir, - validate=True, # Validate WAV format, convert stereo→mono + audio_data, sample_rate, sample_width, channels, duration = await validate_and_prepare_audio( + audio_data=content, + expected_sample_rate=16000, # Expecting 16kHz + convert_to_mono=True # Convert stereo to mono ) except AudioValidationError as e: processed_files.append({ @@ -123,7 +110,7 @@ async def upload_and_process_audio_files( continue audio_logger.info( - f"📊 {file.filename}: {duration:.1f}s → {relative_audio_path}" + f"📊 {file.filename}: {duration:.1f}s ({sample_rate}Hz, {channels}ch, {sample_width} bytes/sample)" ) # Create conversation immediately for uploaded files (conversation_id auto-generated) @@ -139,20 +126,37 @@ async def upload_and_process_audio_files( title=title, summary="Processing uploaded audio file..." ) - # Use the relative path returned by write_audio_file (already includes folder prefix if applicable) - conversation.audio_path = relative_audio_path await conversation.insert() conversation_id = conversation.conversation_id # Get the auto-generated ID audio_logger.info(f"📝 Created conversation {conversation_id} for uploaded file") + # Convert audio directly to MongoDB chunks + try: + num_chunks = await convert_audio_to_chunks( + conversation_id=conversation_id, + audio_data=audio_data, + sample_rate=sample_rate, + channels=channels, + sample_width=sample_width, + ) + audio_logger.info( + f"📦 Converted uploaded file to {num_chunks} MongoDB chunks " + f"(conversation {conversation_id[:12]})" + ) + except Exception as chunk_error: + audio_logger.error( + f"Failed to convert uploaded file to chunks: {chunk_error}", + exc_info=True + ) + # Enqueue post-conversation processing job chain from advanced_omi_backend.controllers.queue_controller import start_post_conversation_jobs job_ids = start_post_conversation_jobs( conversation_id=conversation_id, audio_uuid=audio_uuid, - audio_file_path=file_path, + audio_file_path=None, # No file path - using MongoDB chunks user_id=user.user_id, post_transcription=True, # Run batch transcription for uploads client_id=client_id # Pass client_id for UI tracking @@ -217,45 +221,3 @@ async def upload_and_process_audio_files( return JSONResponse( status_code=500, content={"error": f"File upload failed: {str(e)}"} ) - - -async def get_conversation_audio_path(conversation_id: str, user: User) -> Path: - """ - Get the file path for a conversation's audio file. - - Args: - conversation_id: The conversation ID - user: The authenticated user - - Returns: - Path object for the audio file - - Raises: - ValueError: If conversation not found, access denied, or audio file not available - """ - # Get conversation by conversation_id (UUID field, not _id) - conversation = await Conversation.find_one(Conversation.conversation_id == conversation_id) - - if not conversation: - raise ValueError("Conversation not found") - - # Check ownership (admins can access all files) - if not user.is_superuser and conversation.user_id != str(user.user_id): - raise ValueError("Access denied") - - # Get the audio path - audio_path = conversation.audio_path - - if not audio_path: - raise ValueError(f"No audio file available for this conversation") - - # Build full file path - from advanced_omi_backend.app_config import get_audio_chunk_dir - audio_dir = get_audio_chunk_dir() - file_path = audio_dir / audio_path - - # Check if file exists - if not file_path.exists() or not file_path.is_file(): - raise ValueError("Audio file not found on disk") - - return file_path diff --git a/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py index 943d86bd..b26123f3 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py @@ -5,25 +5,20 @@ import logging import time from pathlib import Path -from typing import Optional + +from fastapi.responses import JSONResponse from advanced_omi_backend.client_manager import ( ClientManager, client_belongs_to_user, ) -from advanced_omi_backend.models.audio_file import AudioFile from advanced_omi_backend.models.conversation import Conversation +from advanced_omi_backend.models.audio_chunk import AudioChunkDocument from advanced_omi_backend.users import User -from fastapi.responses import JSONResponse logger = logging.getLogger(__name__) audio_logger = logging.getLogger("audio_processing") -# Legacy audio_chunks collection is still used by some endpoints (speaker assignment, segment updates) -# But conversation queries now use the Conversation model directly -# Audio cropping operations are handled in audio_controller.py - - async def close_current_conversation(client_id: str, user: User, client_manager: ClientManager): """Close the current conversation for a specific client. Users can only close their own conversations.""" # Validate client ownership @@ -103,6 +98,9 @@ async def get_conversation(conversation_id: str, user: User): "user_id": conversation.user_id, "client_id": conversation.client_id, "audio_path": conversation.audio_path, + "audio_chunks_count": conversation.audio_chunks_count, + "audio_total_duration": conversation.audio_total_duration, + "audio_compression_ratio": conversation.audio_compression_ratio, "created_at": conversation.created_at.isoformat() if conversation.created_at else None, "deleted": conversation.deleted, "deletion_reason": conversation.deletion_reason, @@ -153,6 +151,9 @@ async def get_conversations(user: User): "user_id": conv.user_id, "client_id": conv.client_id, "audio_path": conv.audio_path, + "audio_chunks_count": conv.audio_chunks_count, + "audio_total_duration": conv.audio_total_duration, + "audio_compression_ratio": conv.audio_compression_ratio, "created_at": conv.created_at.isoformat() if conv.created_at else None, "deleted": conv.deleted, "deletion_reason": conv.deletion_reason, @@ -177,12 +178,87 @@ async def get_conversations(user: User): return JSONResponse(status_code=500, content={"error": "Error fetching conversations"}) -async def delete_conversation(conversation_id: str, user: User): - """Delete a conversation and its associated audio files. Users can only delete their own conversations.""" +async def _soft_delete_conversation(conversation: Conversation, user: User) -> JSONResponse: + """Mark conversation and chunks as deleted (soft delete).""" + conversation_id = conversation.conversation_id + + # Mark conversation as deleted + conversation.deleted = True + conversation.deletion_reason = "user_deleted" + conversation.deleted_at = datetime.utcnow() + await conversation.save() + + logger.info(f"Soft deleted conversation {conversation_id} for user {user.user_id}") + + # Soft delete all associated audio chunks + result = await AudioChunkDocument.find( + AudioChunkDocument.conversation_id == conversation_id, + AudioChunkDocument.deleted == False # Only update non-deleted chunks + ).update_many({ + "$set": { + "deleted": True, + "deleted_at": datetime.utcnow() + } + }) + + deleted_chunks = result.modified_count + logger.info(f"Soft deleted {deleted_chunks} audio chunks for conversation {conversation_id}") + + return JSONResponse( + status_code=200, + content={ + "message": f"Successfully soft deleted conversation '{conversation_id}'", + "deleted_chunks": deleted_chunks, + "conversation_id": conversation_id, + "client_id": conversation.client_id, + "deleted_at": conversation.deleted_at.isoformat() if conversation.deleted_at else None + } + ) + + +async def _hard_delete_conversation(conversation: Conversation) -> JSONResponse: + """Permanently delete conversation and chunks (admin only).""" + conversation_id = conversation.conversation_id + client_id = conversation.client_id + audio_uuid = conversation.audio_uuid + + # Delete conversation document + await conversation.delete() + logger.info(f"Hard deleted conversation {conversation_id}") + + # Delete all audio chunks + result = await AudioChunkDocument.find( + AudioChunkDocument.conversation_id == conversation_id + ).delete() + + deleted_chunks = result.deleted_count + logger.info(f"Hard deleted {deleted_chunks} audio chunks for conversation {conversation_id}") + + return JSONResponse( + status_code=200, + content={ + "message": f"Successfully permanently deleted conversation '{conversation_id}'", + "deleted_chunks": deleted_chunks, + "conversation_id": conversation_id, + "client_id": client_id, + "audio_uuid": audio_uuid + } + ) + + +async def delete_conversation(conversation_id: str, user: User, permanent: bool = False): + """ + Soft delete a conversation (mark as deleted but keep data). + + Args: + conversation_id: Conversation to delete + user: Requesting user + permanent: If True, permanently delete (admin only) + """ try: # Create masked identifier for logging masked_id = f"{conversation_id[:8]}...{conversation_id[-4:]}" if len(conversation_id) > 12 else "***" - logger.info(f"Attempting to delete conversation: {masked_id}") + logger.info(f"Attempting to {'permanently ' if permanent else ''}delete conversation: {masked_id}") # Find the conversation using Beanie conversation = await Conversation.find_one(Conversation.conversation_id == conversation_id) @@ -206,57 +282,91 @@ async def delete_conversation(conversation_id: str, user: User): } ) - # Get file paths before deletion - audio_path = conversation.audio_path - audio_uuid = conversation.audio_uuid - client_id = conversation.client_id - - # Delete the conversation from database - await conversation.delete() - logger.info(f"Deleted conversation {conversation_id}") - - # Also delete from legacy AudioFile collection if it exists (backward compatibility) - audio_file = await AudioFile.find_one(AudioFile.audio_uuid == audio_uuid) - if audio_file: - await audio_file.delete() - logger.info(f"Deleted legacy audio file record for {audio_uuid}") - - # Delete associated audio files from disk - deleted_files = [] - if audio_path: - try: - # Construct full path to audio file - full_audio_path = Path("/app/audio_chunks") / audio_path - if full_audio_path.exists(): - full_audio_path.unlink() - deleted_files.append(str(full_audio_path)) - logger.info(f"Deleted audio file: {full_audio_path}") - except Exception as e: - logger.warning(f"Failed to delete audio file {audio_path}: {e}") - - logger.info(f"Successfully deleted conversation {conversation_id} for user {user.user_id}") - - # Prepare response message - delete_summary = ["conversation"] - if deleted_files: - delete_summary.append(f"{len(deleted_files)} audio file(s)") + # Hard delete (admin only, permanent flag) + if permanent and user.is_superuser: + return await _hard_delete_conversation(conversation) + + # Soft delete (default) + return await _soft_delete_conversation(conversation, user) + + except Exception as e: + logger.error(f"Error deleting conversation {conversation_id}: {e}") + return JSONResponse( + status_code=500, + content={"error": f"Failed to delete conversation: {str(e)}"} + ) + + +async def restore_conversation(conversation_id: str, user: User) -> JSONResponse: + """ + Restore a soft-deleted conversation. + + Args: + conversation_id: Conversation to restore + user: Requesting user + """ + try: + conversation = await Conversation.find_one( + Conversation.conversation_id == conversation_id + ) + + if not conversation: + return JSONResponse( + status_code=404, + content={"error": "Conversation not found"} + ) + + # Permission check + if not user.is_superuser and conversation.user_id != str(user.user_id): + return JSONResponse( + status_code=403, + content={"error": "Access denied"} + ) + + if not conversation.deleted: + return JSONResponse( + status_code=400, + content={"error": "Conversation is not deleted"} + ) + + # Restore conversation + conversation.deleted = False + conversation.deletion_reason = None + conversation.deleted_at = None + await conversation.save() + + # Restore audio chunks + result = await AudioChunkDocument.find( + AudioChunkDocument.conversation_id == conversation_id, + AudioChunkDocument.deleted == True + ).update_many({ + "$set": { + "deleted": False, + "deleted_at": None + } + }) + + restored_chunks = result.modified_count + + logger.info( + f"Restored conversation {conversation_id} " + f"({restored_chunks} chunks) for user {user.user_id}" + ) return JSONResponse( status_code=200, content={ - "message": f"Successfully deleted {', '.join(delete_summary)} '{conversation_id}'", - "deleted_files": deleted_files, - "client_id": client_id, + "message": f"Successfully restored conversation '{conversation_id}'", + "restored_chunks": restored_chunks, "conversation_id": conversation_id, - "audio_uuid": audio_uuid } ) except Exception as e: - logger.error(f"Error deleting conversation {conversation_id}: {e}") + logger.error(f"Error restoring conversation {conversation_id}: {e}") return JSONResponse( status_code=500, - content={"error": f"Failed to delete conversation: {str(e)}"} + content={"error": f"Failed to restore conversation: {str(e)}"} ) @@ -308,10 +418,17 @@ async def reprocess_transcript(conversation_id: str, user: User): version_id = str(uuid.uuid4()) # Enqueue job chain with RQ (transcription -> speaker recognition -> memory) - from advanced_omi_backend.workers.transcription_jobs import transcribe_full_audio_job - from advanced_omi_backend.workers.speaker_jobs import recognise_speakers_job + from advanced_omi_backend.controllers.queue_controller import ( + JOB_RESULT_TTL, + default_queue, + memory_queue, + transcription_queue, + ) from advanced_omi_backend.workers.memory_jobs import process_memory_job - from advanced_omi_backend.controllers.queue_controller import transcription_queue, memory_queue, default_queue, JOB_RESULT_TTL + from advanced_omi_backend.workers.speaker_jobs import recognise_speakers_job + from advanced_omi_backend.workers.transcription_jobs import ( + transcribe_full_audio_job, + ) # Job 1: Transcribe audio to text transcript_job = transcription_queue.enqueue( @@ -414,8 +531,8 @@ async def reprocess_memory(conversation_id: str, transcript_version_id: str, use version_id = str(uuid.uuid4()) # Enqueue memory processing job with RQ (RQ handles job tracking) - from advanced_omi_backend.workers.memory_jobs import enqueue_memory_processing from advanced_omi_backend.models.job import JobPriority + from advanced_omi_backend.workers.memory_jobs import enqueue_memory_processing job = enqueue_memory_processing( client_id=conversation_model.client_id, diff --git a/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py index f5ff3275..46812a8a 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py @@ -127,6 +127,70 @@ async def save_diarization_settings(settings: dict): raise e +async def get_cleanup_settings_controller(user: User) -> dict: + """ + Get current cleanup settings (admin only). + + Args: + user: Authenticated admin user + + Returns: + Dict with cleanup settings + """ + from advanced_omi_backend.config import get_cleanup_settings + + return get_cleanup_settings() + + +async def save_cleanup_settings_controller( + auto_cleanup_enabled: bool, + retention_days: int, + user: User +) -> dict: + """ + Save cleanup settings (admin only). + + Args: + auto_cleanup_enabled: Enable/disable automatic cleanup + retention_days: Number of days to retain soft-deleted conversations + user: Authenticated admin user + + Returns: + Updated cleanup settings + + Raises: + ValueError: If validation fails + """ + from advanced_omi_backend.config import CleanupSettings, save_cleanup_settings_to_file + + # Validation + if not isinstance(auto_cleanup_enabled, bool): + raise ValueError("auto_cleanup_enabled must be a boolean") + + if not isinstance(retention_days, int): + raise ValueError("retention_days must be an integer") + + if retention_days < 1 or retention_days > 365: + raise ValueError("retention_days must be between 1 and 365") + + # Create settings object + settings = CleanupSettings( + auto_cleanup_enabled=auto_cleanup_enabled, + retention_days=retention_days + ) + + # Save to file (also updates in-memory cache) + save_cleanup_settings_to_file(settings) + + logger.info(f"Admin {user.email} updated cleanup settings: auto_cleanup={auto_cleanup_enabled}, retention={retention_days}d") + + return { + "auto_cleanup_enabled": settings.auto_cleanup_enabled, + "retention_days": settings.retention_days, + "message": "Cleanup settings saved successfully" + } + + async def get_speaker_configuration(user: User): """Get current user's primary speakers configuration.""" try: diff --git a/backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py index 28e9924f..ad856b2b 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py @@ -829,8 +829,8 @@ async def _process_batch_audio_complete( return try: - from advanced_omi_backend.utils.audio_utils import write_audio_file from advanced_omi_backend.models.conversation import create_conversation + from advanced_omi_backend.utils.audio_chunk_utils import convert_audio_to_chunks # Combine all chunks complete_audio = b''.join(client_state.batch_audio_chunks) @@ -842,20 +842,17 @@ async def _process_batch_audio_complete( audio_uuid = str(uuid.uuid4()) timestamp = int(time.time() * 1000) - # Write audio file and create AudioFile entry - relative_audio_path, file_path, duration = await write_audio_file( - raw_audio_data=complete_audio, - audio_uuid=audio_uuid, - source="websocket", - client_id=client_id, - user_id=user_id, - user_email=user_email, - timestamp=timestamp, - validate=False # PCM data, not WAV - ) + # Get audio format from batch metadata (set during audio-start) + audio_format = getattr(client_state, 'batch_audio_format', {}) + sample_rate = audio_format.get('rate', OMI_SAMPLE_RATE) + sample_width = audio_format.get('width', OMI_SAMPLE_WIDTH) + channels = audio_format.get('channels', OMI_CHANNELS) + + # Calculate audio duration + duration = len(complete_audio) / (sample_rate * sample_width * channels) application_logger.info( - f"✅ Batch mode: Wrote audio file {relative_audio_path} ({duration:.1f}s)" + f"✅ Batch mode: Processing audio ({duration:.1f}s)" ) # Create conversation immediately for batch audio (conversation_id auto-generated) @@ -868,19 +865,38 @@ async def _process_batch_audio_complete( title="Batch Recording", summary="Processing batch audio..." ) - conversation.audio_path = relative_audio_path await conversation.insert() conversation_id = conversation.conversation_id # Get the auto-generated ID application_logger.info(f"📝 Batch mode: Created conversation {conversation_id}") + # Convert audio directly to MongoDB chunks (no disk intermediary) + try: + num_chunks = await convert_audio_to_chunks( + conversation_id=conversation_id, + audio_data=complete_audio, + sample_rate=sample_rate, + channels=channels, + sample_width=sample_width, + ) + application_logger.info( + f"📦 Batch mode: Converted to {num_chunks} MongoDB chunks " + f"(conversation {conversation_id[:12]})" + ) + except Exception as chunk_error: + application_logger.error( + f"Failed to convert batch audio to chunks: {chunk_error}", + exc_info=True + ) + # Continue anyway - transcription job will handle it + # Enqueue post-conversation processing job chain from advanced_omi_backend.controllers.queue_controller import start_post_conversation_jobs job_ids = start_post_conversation_jobs( conversation_id=conversation_id, audio_uuid=audio_uuid, - audio_file_path=file_path, + audio_file_path=None, # No file path - using MongoDB chunks user_id=None, # Will be read from conversation in DB by jobs post_transcription=True, # Run batch transcription for uploads client_id=client_id # Pass client_id for UI tracking diff --git a/backends/advanced/src/advanced_omi_backend/models/audio_chunk.py b/backends/advanced/src/advanced_omi_backend/models/audio_chunk.py new file mode 100644 index 00000000..cea20ef7 --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/models/audio_chunk.py @@ -0,0 +1,158 @@ +""" +Audio chunk models for MongoDB-based audio storage. + +This module contains the AudioChunkDocument model for storing Opus-compressed +audio chunks in MongoDB. Each chunk represents a 10-second segment of audio +from a conversation. +""" + +from datetime import datetime +from typing import Optional +from pydantic import ConfigDict, Field, field_serializer +from beanie import Document, Indexed +from bson import Binary + + +class AudioChunkDocument(Document): + """ + MongoDB document representing a 10-second audio chunk. + + Audio chunks are stored in Opus-compressed format for ~94% storage reduction + compared to raw PCM. Chunks are sequentially numbered and can be reconstructed + into complete WAV files for playback or batch processing. + + Storage Format: + - Encoding: Opus (24kbps VBR, optimized for speech) + - Chunk Duration: 10 seconds (configurable) + - Original Format: 16kHz, 16-bit, mono PCM + - Compression Ratio: ~0.047 (94% reduction) + + Indexes: + - (conversation_id, chunk_index): Primary query pattern for reconstruction + - conversation_id: Conversation lookup and counting + - created_at: Maintenance and cleanup operations + """ + + # Pydantic v2 configuration + model_config = ConfigDict(arbitrary_types_allowed=True) + + # Primary identifiers + conversation_id: Indexed(str) = Field( + description="Parent conversation ID (UUID format)" + ) + chunk_index: int = Field( + description="Sequential chunk number (0-based)", + ge=0 + ) + + # Audio data + audio_data: bytes = Field( + description="Opus-encoded audio bytes (stored as BSON Binary in MongoDB)" + ) + + # Size tracking + original_size: int = Field( + description="Original PCM size in bytes (before compression)", + gt=0 + ) + compressed_size: int = Field( + description="Opus-encoded size in bytes (after compression)", + gt=0 + ) + + # Time boundaries + start_time: float = Field( + description="Start time in seconds from conversation start", + ge=0.0 + ) + end_time: float = Field( + description="End time in seconds from conversation start", + gt=0.0 + ) + duration: float = Field( + description="Chunk duration in seconds (typically 10.0)", + gt=0.0 + ) + + # Audio format + sample_rate: int = Field( + default=16000, + description="Original PCM sample rate (Hz)" + ) + channels: int = Field( + default=1, + description="Number of audio channels (1=mono, 2=stereo)" + ) + + # Optional analysis + has_speech: Optional[bool] = Field( + default=None, + description="Voice Activity Detection result (if available)" + ) + + # Metadata + created_at: datetime = Field( + default_factory=datetime.utcnow, + description="Chunk creation timestamp" + ) + + # Soft delete fields + deleted: bool = Field( + default=False, + description="Whether this chunk was soft-deleted" + ) + deleted_at: Optional[datetime] = Field( + default=None, + description="When the chunk was marked as deleted" + ) + + @field_serializer('audio_data') + def serialize_audio_data(self, v: bytes) -> Binary: + """ + Convert bytes to BSON Binary for MongoDB storage. + + MongoDB returns BSON Binary as plain bytes during deserialization, + but expects Binary type for serialization to ensure proper binary data handling. + """ + if isinstance(v, bytes): + return Binary(v) + return v + + class Settings: + """Beanie document settings.""" + name = "audio_chunks" + + indexes = [ + # Primary query: Retrieve chunks in order for a conversation + [("conversation_id", 1), ("chunk_index", 1)], + + # Conversation lookup and counting + "conversation_id", + + # Maintenance queries (cleanup, monitoring) + "created_at", + + # Soft delete filtering + "deleted" + ] + + @property + def compression_ratio(self) -> float: + """Calculate compression ratio (compressed/original).""" + if self.original_size == 0: + return 0.0 + return self.compressed_size / self.original_size + + @property + def storage_savings_percent(self) -> float: + """Calculate storage savings as percentage.""" + return (1 - self.compression_ratio) * 100 + + def __repr__(self) -> str: + """Human-readable representation.""" + return ( + f"AudioChunk(conversation={self.conversation_id[:8]}..., " + f"index={self.chunk_index}, " + f"duration={self.duration:.1f}s, " + f"compression={self.compression_ratio:.3f})" + ) diff --git a/backends/advanced/src/advanced_omi_backend/models/audio_file.py b/backends/advanced/src/advanced_omi_backend/models/audio_file.py deleted file mode 100644 index ca154500..00000000 --- a/backends/advanced/src/advanced_omi_backend/models/audio_file.py +++ /dev/null @@ -1,64 +0,0 @@ -""" -AudioFile models for Chronicle backend. - -This module contains the Beanie Document model for audio_chunks collection, -which stores ALL audio files (both with and without speech). This is the -storage layer - all audio gets stored here with its metadata. - -Note: Named AudioFile (not AudioChunk) to avoid confusion with wyoming.audio.AudioChunk -which is the in-memory streaming audio data structure. -""" - -from datetime import datetime -from typing import Dict, List, Optional, Any -from pydantic import BaseModel, Field - -from beanie import Document, Indexed - - -class AudioFile(Document): - """ - Audio file model representing persisted audio files in MongoDB. - - The audio_chunks collection stores ALL raw audio files (both with and without speech). - This is just for audio file storage and metadata. If speech is detected, a - Conversation document is created which contains transcripts and memories. - - This is different from wyoming.audio.AudioChunk which is for streaming audio data. - """ - - # Core identifiers - audio_uuid: Indexed(str, unique=True) = Field(description="Unique audio identifier") - source: Indexed(str) = Field( - default="upload", - description="Source of the audio (upload, gdrive, etc.)" - ) - audio_path: str = Field(description="Path to raw audio file") - client_id: Indexed(str) = Field(description="Client device identifier") - timestamp: Indexed(int) = Field(description="Unix timestamp in milliseconds") - - # User information - user_id: Indexed(str) = Field(description="User who owns this audio") - user_email: Optional[str] = Field(None, description="User email") - - # Speech-driven conversation linking - conversation_id: Optional[str] = Field( - None, - description="Link to Conversation if speech was detected" - ) - has_speech: bool = Field(default=False, description="Whether speech was detected") - speech_analysis: Dict[str, Any] = Field( - default_factory=dict, - description="Speech detection results" - ) - - - - class Settings: - name = "audio_chunks" - indexes = [ - "audio_uuid", - "client_id", - "user_id", - "timestamp", - ] \ No newline at end of file diff --git a/backends/advanced/src/advanced_omi_backend/models/conversation.py b/backends/advanced/src/advanced_omi_backend/models/conversation.py index 00178f10..24091ef8 100644 --- a/backends/advanced/src/advanced_omi_backend/models/conversation.py +++ b/backends/advanced/src/advanced_omi_backend/models/conversation.py @@ -88,8 +88,22 @@ class MemoryVersion(BaseModel): user_id: Indexed(str) = Field(description="User who owns this conversation") client_id: Indexed(str) = Field(description="Client device identifier") - # Audio file reference - audio_path: Optional[str] = Field(None, description="Path to audio file (relative to CHUNK_DIR)") + # Legacy audio path field - no longer used, audio stored as MongoDB chunks + audio_path: Optional[str] = Field(None, description="Legacy field, not populated for new conversations") + + # MongoDB chunk-based audio storage (new system) + audio_chunks_count: Optional[int] = Field( + None, + description="Total number of 10-second audio chunks stored in MongoDB" + ) + audio_total_duration: Optional[float] = Field( + None, + description="Total audio duration in seconds (sum of all chunks)" + ) + audio_compression_ratio: Optional[float] = Field( + None, + description="Compression ratio (compressed_size / original_size), typically ~0.047 for Opus" + ) # Creation metadata created_at: Indexed(datetime) = Field(default_factory=datetime.utcnow, description="When the conversation was created") diff --git a/backends/advanced/src/advanced_omi_backend/models/job.py b/backends/advanced/src/advanced_omi_backend/models/job.py index b295782c..763373d2 100644 --- a/backends/advanced/src/advanced_omi_backend/models/job.py +++ b/backends/advanced/src/advanced_omi_backend/models/job.py @@ -35,10 +35,10 @@ async def _ensure_beanie_initialized(): from motor.motor_asyncio import AsyncIOMotorClient from beanie import init_beanie from advanced_omi_backend.models.conversation import Conversation - from advanced_omi_backend.models.audio_file import AudioFile - from advanced_omi_backend.models.user import User + from advanced_omi_backend.models.audio_chunk import AudioChunkDocument + from advanced_omi_backend.models.user import User from pymongo.errors import ConfigurationError - + # Get MongoDB URI from environment mongodb_uri = os.getenv("MONGODB_URI", "mongodb://localhost:27017") @@ -54,7 +54,7 @@ async def _ensure_beanie_initialized(): # Initialize Beanie await init_beanie( database=database, - document_models=[User, Conversation, AudioFile], + document_models=[User, Conversation, AudioChunkDocument], ) _beanie_initialized = True diff --git a/backends/advanced/src/advanced_omi_backend/routers/api_router.py b/backends/advanced/src/advanced_omi_backend/routers/api_router.py index 80c03eae..791bd5ca 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/api_router.py +++ b/backends/advanced/src/advanced_omi_backend/routers/api_router.py @@ -11,6 +11,7 @@ from fastapi import APIRouter from .modules import ( + admin_router, audio_router, chat_router, client_router, @@ -30,6 +31,7 @@ router = APIRouter(prefix="/api", tags=["api"]) # Include all sub-routers +router.include_router(admin_router) router.include_router(audio_router) router.include_router(user_router) router.include_router(chat_router) diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/__init__.py b/backends/advanced/src/advanced_omi_backend/routers/modules/__init__.py index 21f89991..3c8e4ceb 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/__init__.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/__init__.py @@ -12,8 +12,10 @@ - audio_routes: Audio file uploads and processing - health_routes: Health check endpoints - websocket_routes: WebSocket connection handling +- admin_routes: Admin-only system management endpoints """ +from .admin_routes import router as admin_router from .audio_routes import router as audio_router from .chat_routes import router as chat_router from .client_routes import router as client_router @@ -27,6 +29,7 @@ from .websocket_routes import router as websocket_router __all__ = [ + "admin_router", "audio_router", "chat_router", "client_router", diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/admin_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/admin_routes.py new file mode 100644 index 00000000..6fbbfc56 --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/admin_routes.py @@ -0,0 +1,122 @@ +""" +Admin routes for Chronicle API. + +Provides admin-only endpoints for system management and cleanup operations. +""" + +import logging +from typing import Optional + +from fastapi import APIRouter, Depends, Query, HTTPException +from fastapi.responses import JSONResponse + +from advanced_omi_backend.auth import current_active_user +from advanced_omi_backend.users import User + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/admin", tags=["admin"]) + + +def require_admin(current_user: User = Depends(current_active_user)) -> User: + """Dependency to require admin/superuser permissions.""" + if not current_user.is_superuser: + raise HTTPException( + status_code=403, + detail="Admin permissions required" + ) + return current_user + + +@router.get("/cleanup/settings") +async def get_cleanup_settings_admin( + admin: User = Depends(require_admin) +): + """Get current cleanup settings (admin only).""" + from advanced_omi_backend.config import get_cleanup_settings + + settings = get_cleanup_settings() + return { + **settings, + "note": "Cleanup settings are stored in /app/data/cleanup_config.json" + } + + +@router.post("/cleanup") +async def trigger_cleanup( + dry_run: bool = Query(False, description="Preview what would be deleted"), + retention_days: Optional[int] = Query(None, description="Override retention period"), + admin: User = Depends(require_admin) +): + """Manually trigger cleanup of soft-deleted conversations (admin only).""" + try: + from advanced_omi_backend.workers.cleanup_jobs import purge_old_deleted_conversations + from advanced_omi_backend.controllers.queue_controller import get_queue + + # Enqueue cleanup job + queue = get_queue("default") + job = queue.enqueue( + purge_old_deleted_conversations, + retention_days=retention_days, # Will use config default if None + dry_run=dry_run, + job_timeout="30m", + ) + + logger.info(f"Admin {admin.email} triggered cleanup job {job.id} (dry_run={dry_run}, retention={retention_days or 'default'})") + + return JSONResponse( + status_code=200, + content={ + "message": f"Cleanup job {'(dry run) ' if dry_run else ''}queued successfully", + "job_id": job.id, + "retention_days": retention_days or "default (from config)", + "dry_run": dry_run, + "note": "Check job status at /api/queue/jobs/{job_id}" + } + ) + + except Exception as e: + logger.error(f"Failed to trigger cleanup: {e}") + return JSONResponse( + status_code=500, + content={"error": f"Failed to trigger cleanup: {str(e)}"} + ) + + +@router.get("/cleanup/preview") +async def preview_cleanup( + retention_days: Optional[int] = Query(None, description="Preview with specific retention period"), + admin: User = Depends(require_admin) +): + """Preview what would be deleted by cleanup (admin only).""" + try: + from advanced_omi_backend.config import load_cleanup_settings_from_file + from advanced_omi_backend.models.conversation import Conversation + from datetime import datetime, timedelta + + # Use provided retention or default from config + if retention_days is None: + settings = load_cleanup_settings_from_file() + retention_days = settings.retention_days + + cutoff_date = datetime.utcnow() - timedelta(days=retention_days) + + # Count conversations that would be deleted + count = await Conversation.find( + Conversation.deleted == True, + Conversation.deleted_at < cutoff_date + ).count() + + return { + "retention_days": retention_days, + "cutoff_date": cutoff_date.isoformat(), + "conversations_to_delete": count, + "note": f"Conversations deleted before {cutoff_date.date()} would be purged" + } + + except Exception as e: + logger.error(f"Failed to preview cleanup: {e}") + return JSONResponse( + status_code=500, + content={"error": f"Failed to preview cleanup: {str(e)}"} + ) diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py index 58a33ff5..afa2906f 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/audio_routes.py @@ -2,17 +2,26 @@ Audio file upload and serving routes. Handles audio file uploads, processing job management, and audio file serving. +Audio is served from MongoDB chunks with Opus compression. """ +import io from typing import Optional from fastapi import APIRouter, Depends, File, HTTPException, Query, UploadFile -from fastapi.responses import FileResponse +from fastapi.responses import FileResponse, StreamingResponse from advanced_omi_backend.auth import current_superuser, current_active_user_optional, get_user_from_token_param from advanced_omi_backend.controllers import audio_controller from advanced_omi_backend.models.user import User +from advanced_omi_backend.models.conversation import Conversation from advanced_omi_backend.app_config import get_audio_chunk_dir from advanced_omi_backend.utils.gdrive_audio_utils import download_audio_files_from_drive, AudioValidationError +from advanced_omi_backend.utils.audio_chunk_utils import ( + reconstruct_wav_from_conversation, + retrieve_audio_chunks, + concatenate_chunks_to_pcm, + build_wav_from_pcm, +) router = APIRouter(prefix="/audio", tags=["audio"]) @@ -41,10 +50,13 @@ async def get_conversation_audio( current_user: Optional[User] = Depends(current_active_user_optional), ): """ - Serve audio file for a conversation. + Serve complete audio file for a conversation from MongoDB chunks. - This endpoint uses conversation_id for direct lookup and ownership verification, - which is more efficient than querying by filename. + Reconstructs audio by: + 1. Retrieving all Opus-compressed chunks from MongoDB + 2. Decoding each chunk to PCM + 3. Concatenating PCM data + 4. Building complete WAV file with headers Supports both header-based auth (Authorization: Bearer) and query param token for