Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions backends/advanced/src/advanced_omi_backend/app_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ async def lifespan(app: FastAPI):
try:
from beanie import init_beanie
from advanced_omi_backend.models.conversation import Conversation
from advanced_omi_backend.models.audio_file import AudioFile
from advanced_omi_backend.models.audio_chunk import AudioChunkDocument
from advanced_omi_backend.models.user import User

await init_beanie(
database=config.db,
document_models=[User, Conversation, AudioFile],
document_models=[User, Conversation, AudioChunkDocument],
)
application_logger.info("Beanie initialized for all document models")
except Exception as e:
Expand Down
97 changes: 97 additions & 0 deletions backends/advanced/src/advanced_omi_backend/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import logging
import os
import shutil
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Optional

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -131,6 +133,101 @@ def save_diarization_settings_to_file(settings):
return False


# ============================================================================
# Cleanup Settings (JSON file-based with in-memory caching)
# ============================================================================

@dataclass
class CleanupSettings:
"""Cleanup configuration for soft-deleted conversations."""
auto_cleanup_enabled: bool = False
retention_days: int = 30

# Global cache for cleanup settings
_cleanup_settings: Optional[CleanupSettings] = None


def get_cleanup_config_path() -> Path:
"""Get path to cleanup settings JSON file."""
data_dir = Path(os.getenv("DATA_DIR", "/app/data"))
data_dir.mkdir(parents=True, exist_ok=True)
return data_dir / "cleanup_config.json"


def load_cleanup_settings_from_file() -> CleanupSettings:
"""
Load cleanup settings from JSON file or return defaults.

Returns cached settings if available, otherwise loads from file.
If file doesn't exist, returns default settings.
"""
global _cleanup_settings

# Return cached settings if available
if _cleanup_settings is not None:
return _cleanup_settings

config_path = get_cleanup_config_path()

# Try to load from file
if config_path.exists():
try:
with open(config_path, "r") as f:
data = json.load(f)
_cleanup_settings = CleanupSettings(**data)
logger.info(f"✅ Loaded cleanup settings: auto_cleanup={_cleanup_settings.auto_cleanup_enabled}, retention={_cleanup_settings.retention_days}d")
return _cleanup_settings
except Exception as e:
logger.error(f"❌ Failed to load cleanup settings from {config_path}: {e}")

# Return defaults if file doesn't exist or failed to load
_cleanup_settings = CleanupSettings()
logger.info("Using default cleanup settings (auto_cleanup_enabled=False, retention_days=30)")
return _cleanup_settings


def save_cleanup_settings_to_file(settings: CleanupSettings) -> None:
"""
Save cleanup settings to JSON file and update in-memory cache.

Args:
settings: CleanupSettings to persist

Raises:
Exception: If file write fails
"""
global _cleanup_settings

config_path = get_cleanup_config_path()

try:
# Save to JSON file
with open(config_path, "w") as f:
json.dump(asdict(settings), f, indent=2)

# Update in-memory cache
_cleanup_settings = settings

logger.info(f"✅ Saved cleanup settings: auto_cleanup={settings.auto_cleanup_enabled}, retention={settings.retention_days}d")
except Exception as e:
logger.error(f"❌ Failed to save cleanup settings to {config_path}: {e}")
raise


def get_cleanup_settings() -> dict:
"""
Get current cleanup settings as dict (for API responses).

Returns:
Dict with auto_cleanup_enabled and retention_days
"""
settings = load_cleanup_settings_from_file()
return {
"auto_cleanup_enabled": settings.auto_cleanup_enabled,
"retention_days": settings.retention_days,
}


def get_speech_detection_settings():
"""Get speech detection settings from environment or defaults."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

from advanced_omi_backend.utils.audio_utils import (
AudioValidationError,
write_audio_file,
validate_and_prepare_audio,
)
from advanced_omi_backend.utils.audio_chunk_utils import convert_audio_to_chunks
from advanced_omi_backend.models.job import JobPriority
from advanced_omi_backend.models.user import User
from advanced_omi_backend.models.conversation import create_conversation
Expand Down Expand Up @@ -86,33 +87,19 @@ async def upload_and_process_audio_files(
# Generate audio UUID and timestamp
if source == "gdrive":
audio_uuid = getattr(file, "audio_uuid", None)
if not audio_uuid:
if not audio_uuid:
audio_logger.error(f"Missing audio_uuid for gdrive file: {file.filename}")
audio_uuid = str(uuid.uuid4())
else:
audio_uuid = str(uuid.uuid4())
else:
audio_uuid = str(uuid.uuid4())
timestamp = int(time.time() * 1000)

# Determine output directory (with optional subfolder)
from advanced_omi_backend.config import CHUNK_DIR
if folder:
chunk_dir = CHUNK_DIR / folder
chunk_dir.mkdir(parents=True, exist_ok=True)
else:
chunk_dir = CHUNK_DIR

# Validate, write audio file and create AudioSession (all in one)
# Validate and prepare audio (read format from WAV file)
try:
relative_audio_path, file_path, duration = await write_audio_file(
raw_audio_data=content,
audio_uuid=audio_uuid,
source=source,
client_id=client_id,
user_id=user.user_id,
user_email=user.email,
timestamp=timestamp,
chunk_dir=chunk_dir,
validate=True, # Validate WAV format, convert stereo→mono
audio_data, sample_rate, sample_width, channels, duration = await validate_and_prepare_audio(
audio_data=content,
expected_sample_rate=16000, # Expecting 16kHz
convert_to_mono=True # Convert stereo to mono
)
except AudioValidationError as e:
processed_files.append({
Expand All @@ -123,7 +110,7 @@ async def upload_and_process_audio_files(
continue

audio_logger.info(
f"📊 {file.filename}: {duration:.1f}s → {relative_audio_path}"
f"📊 {file.filename}: {duration:.1f}s ({sample_rate}Hz, {channels}ch, {sample_width} bytes/sample)"
)

# Create conversation immediately for uploaded files (conversation_id auto-generated)
Expand All @@ -139,20 +126,36 @@ async def upload_and_process_audio_files(
title=title,
summary="Processing uploaded audio file..."
)
# Use the relative path returned by write_audio_file (already includes folder prefix if applicable)
conversation.audio_path = relative_audio_path
await conversation.insert()
conversation_id = conversation.conversation_id # Get the auto-generated ID

audio_logger.info(f"📝 Created conversation {conversation_id} for uploaded file")

# Convert audio directly to MongoDB chunks
try:
num_chunks = await convert_audio_to_chunks(
conversation_id=conversation_id,
audio_data=audio_data,
sample_rate=sample_rate,
channels=channels,
sample_width=sample_width,
)
audio_logger.info(
f"📦 Converted uploaded file to {num_chunks} MongoDB chunks "
f"(conversation {conversation_id[:12]})"
)
except Exception as chunk_error:
audio_logger.error(
f"Failed to convert uploaded file to chunks: {chunk_error}",
exc_info=True
)

# Enqueue post-conversation processing job chain
from advanced_omi_backend.controllers.queue_controller import start_post_conversation_jobs

job_ids = start_post_conversation_jobs(
conversation_id=conversation_id,
audio_uuid=audio_uuid,
audio_file_path=file_path,
user_id=user.user_id,
post_transcription=True, # Run batch transcription for uploads
client_id=client_id # Pass client_id for UI tracking
Expand Down Expand Up @@ -217,45 +220,3 @@ async def upload_and_process_audio_files(
return JSONResponse(
status_code=500, content={"error": f"File upload failed: {str(e)}"}
)


async def get_conversation_audio_path(conversation_id: str, user: User) -> Path:
"""
Get the file path for a conversation's audio file.

Args:
conversation_id: The conversation ID
user: The authenticated user

Returns:
Path object for the audio file

Raises:
ValueError: If conversation not found, access denied, or audio file not available
"""
# Get conversation by conversation_id (UUID field, not _id)
conversation = await Conversation.find_one(Conversation.conversation_id == conversation_id)

if not conversation:
raise ValueError("Conversation not found")

# Check ownership (admins can access all files)
if not user.is_superuser and conversation.user_id != str(user.user_id):
raise ValueError("Access denied")

# Get the audio path
audio_path = conversation.audio_path

if not audio_path:
raise ValueError(f"No audio file available for this conversation")

# Build full file path
from advanced_omi_backend.app_config import get_audio_chunk_dir
audio_dir = get_audio_chunk_dir()
file_path = audio_dir / audio_path

# Check if file exists
if not file_path.exists() or not file_path.is_file():
raise ValueError("Audio file not found on disk")

return file_path
Loading
Loading