Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions backends/advanced/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,27 @@ services:
condition: service_started
restart: unless-stopped

cron:
build:
context: .
dockerfile: Dockerfile
command: ["uv", "run", "python", "-m", "advanced_omi_backend.cron"]
env_file:
- .env
volumes:
- ./src:/app/src
- ./data:/app/data
- ../../config/config.yml:/app/config.yml
environment:
- REDIS_URL=redis://redis:6379/0
- MONGO_URL=${MONGO_URL:-mongodb://mongo:27017}
depends_on:
mongo:
condition: service_healthy
redis:
condition: service_healthy
restart: unless-stopped

webui:
build:
context: ./webui
Expand Down
3 changes: 2 additions & 1 deletion backends/advanced/src/advanced_omi_backend/app_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,11 @@ async def lifespan(app: FastAPI):
from advanced_omi_backend.models.conversation import Conversation
from advanced_omi_backend.models.audio_file import AudioFile
from advanced_omi_backend.models.user import User
from advanced_omi_backend.models.annotation import TranscriptAnnotation

await init_beanie(
database=config.db,
document_models=[User, Conversation, AudioFile],
document_models=[User, Conversation, AudioFile, TranscriptAnnotation],
)
application_logger.info("Beanie initialized for all document models")
except Exception as e:
Expand Down
72 changes: 72 additions & 0 deletions backends/advanced/src/advanced_omi_backend/cron.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import asyncio
import logging
import os
from datetime import datetime
import signal
import sys

from advanced_omi_backend.workers.annotation_jobs import surface_error_suggestions, finetune_hallucination_model
from advanced_omi_backend.database import init_db

# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
stream=sys.stdout
)
logger = logging.getLogger("cron_scheduler")

# Frequency configuration (in seconds)
SUGGESTION_INTERVAL = 24 * 60 * 60 # Daily
TRAINING_INTERVAL = 7 * 24 * 60 * 60 # Weekly

# For testing purposes, we can check more frequently if ENV var is set
if os.getenv("DEV_MODE", "false").lower() == "true":
SUGGESTION_INTERVAL = 60 # 1 minute
TRAINING_INTERVAL = 300 # 5 minutes

async def run_scheduler():
logger.info("Starting Cron Scheduler...")

# Initialize DB connection
await init_db()

last_suggestion_run = datetime.min
last_training_run = datetime.min

while True:
now = datetime.utcnow()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Replace deprecated datetime.utcnow() with datetime.now(timezone.utc).

datetime.utcnow() is deprecated in Python 3.12+ in favor of timezone-aware alternatives.

🔧 Use timezone-aware datetime
     while True:
-        now = datetime.utcnow()
+        now = datetime.now(timezone.utc)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
now = datetime.utcnow()
while True:
now = datetime.now(timezone.utc)
🤖 Prompt for AI Agents
In @backends/advanced/src/advanced_omi_backend/cron.py at line 38, Replace the
deprecated call to datetime.utcnow() used to set the now variable with a
timezone-aware call datetime.now(timezone.utc); update imports in cron.py to
import timezone (e.g., from datetime import datetime, timezone) or use
datetime.timezone if you keep a module import, and ensure any downstream code
expecting naive datetimes is adjusted to handle an aware datetime.


# Check Suggestions Job
if (now - last_suggestion_run).total_seconds() >= SUGGESTION_INTERVAL:
logger.info("Running scheduled job: surface_error_suggestions")
try:
await surface_error_suggestions()
last_suggestion_run = now
except Exception as e:
logger.error(f"Error in surface_error_suggestions: {e}", exc_info=True)

# Check Training Job
if (now - last_training_run).total_seconds() >= TRAINING_INTERVAL:
logger.info("Running scheduled job: finetune_hallucination_model")
try:
await finetune_hallucination_model()
last_training_run = now
except Exception as e:
logger.error(f"Error in finetune_hallucination_model: {e}", exc_info=True)

Comment on lines +40 to +57
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Jobs lack user_id scoping required by coding guidelines.

The surface_error_suggestions() and finetune_hallucination_model() jobs are invoked without user_id parameters, but per the coding guidelines, all data operations must be scoped by user_id. This creates an architectural inconsistency with the worker implementations.

Consider one of these approaches:

  1. Per-user execution: Iterate over active users and invoke jobs for each user
  2. Batch processing: Refactor jobs to process all users in batches while maintaining proper data isolation
  3. Global processing: If intentionally global, document the architectural decision and ensure it aligns with system requirements

Would you like me to generate a solution that iterates over users and invokes these jobs per-user?

🤖 Prompt for AI Agents
In @backends/advanced/src/advanced_omi_backend/cron.py around lines 40 - 57, The
scheduled jobs call surface_error_suggestions() and
finetune_hallucination_model() without scoping to a user_id; update the loop to
iterate over active users (e.g., fetch active user IDs) and invoke await
surface_error_suggestions(user_id) and await
finetune_hallucination_model(user_id) for each user, updating
last_suggestion_run and last_training_run after the per-user batch completes; if
the job functions don't accept user_id, add a user_id parameter (with
appropriate defaults or validation) and ensure exceptions are caught per-user
(log user_id on error) so one failing user doesn't stop the whole batch.

# Sleep for a bit before next check (e.g. 1 minute)
await asyncio.sleep(60)

def handle_shutdown(signum, frame):
logger.info("Shutting down Cron Scheduler...")
sys.exit(0)

if __name__ == "__main__":
signal.signal(signal.SIGTERM, handle_shutdown)
signal.signal(signal.SIGINT, handle_shutdown)

try:
asyncio.run(run_scheduler())
except KeyboardInterrupt:
pass
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@

# Models can be imported directly from their files
# e.g. from .job import TranscriptionJob
# e.g. from .conversation import Conversation, create_conversation
# e.g. from .conversation import Conversation, create_conversation
from .annotation import TranscriptAnnotation
39 changes: 39 additions & 0 deletions backends/advanced/src/advanced_omi_backend/models/annotation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from datetime import datetime, timezone
from typing import Optional, List
from pydantic import Field
from beanie import Document, Indexed
from enum import Enum
import uuid

class TranscriptAnnotation(Document):
"""Model for transcript annotations/corrections."""

class AnnotationStatus(str, Enum):
PENDING = "pending"
ACCEPTED = "accepted"
REJECTED = "rejected"

class AnnotationSource(str, Enum):
USER = "user"
MODEL_SUGGESTION = "model_suggestion"

id: str = Field(default_factory=lambda: str(uuid.uuid4()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Inconsistent ID type: use MongoDB ObjectId instead of UUID string.

The id field uses a UUID string, which is inconsistent with other models in the codebase. The User model (see backends/advanced/src/advanced_omi_backend/models/user.py) uses MongoDB ObjectId for id and provides a user_id property for string representation. This inconsistency could cause:

  • Type mismatches in queries and relationships
  • Non-uniform ID patterns across the codebase
  • Potential issues with Beanie's default behavior
🔍 Proposed fix

Based on the Beanie documentation, the default type of the id field is PydanticObjectId. While you can override it with UUID as shown in the documentation, consider aligning with the existing User model pattern for consistency across the codebase.

+from beanie import Document, Indexed, PydanticObjectId
 
 class TranscriptAnnotation(Document):
     """Model for transcript annotations/corrections."""
     
     class AnnotationStatus(str, Enum):
         PENDING = "pending"
         ACCEPTED = "accepted"
         REJECTED = "rejected"

     class AnnotationSource(str, Enum):
         USER = "user"
         MODEL_SUGGESTION = "model_suggestion"

-    id: str = Field(default_factory=lambda: str(uuid.uuid4()))
+    # id field defaults to PydanticObjectId - MongoDB's ObjectId type
     conversation_id: Indexed(str)
🤖 Prompt for AI Agents
In @backends/advanced/src/advanced_omi_backend/models/annotation.py at line 20,
The annotation model's id is defined as a UUID string (id: str =
Field(default_factory=lambda: str(uuid.uuid4()))) which diverges from the User
model's MongoDB ObjectId pattern; change the Annotation model to use
Beanie/PydanticObjectId for the primary id (match the User model's id type) and
add a string accessor property (e.g., annotation_id) if a string representation
is needed, ensuring imports and any references to Annotation.id elsewhere are
updated to expect PydanticObjectId and use the new string property when
necessary.

conversation_id: Indexed(str)
segment_index: int
original_text: str
corrected_text: str
user_id: Indexed(str)

status: AnnotationStatus = Field(default=AnnotationStatus.ACCEPTED) # User edits are accepted by default
source: AnnotationSource = Field(default=AnnotationSource.USER)

created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

class Settings:
name = "transcript_annotations"
indexes = [
"conversation_id",
"user_id",
"status"
]
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
queue_router,
system_router,
user_router,
annotation_router,
)
from .modules.health_routes import router as health_router

Expand All @@ -38,6 +39,7 @@
router.include_router(obsidian_router)
router.include_router(system_router)
router.include_router(queue_router)
router.include_router(annotation_router, prefix="/annotations", tags=["annotations"])
router.include_router(health_router) # Also include under /api for frontend compatibility


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from .system_routes import router as system_router
from .user_routes import router as user_router
from .websocket_routes import router as websocket_router
from .annotation_routes import router as annotation_router

__all__ = [
"audio_router",
Expand All @@ -38,4 +39,5 @@
"system_router",
"user_router",
"websocket_router",
"annotation_router",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
from fastapi import APIRouter, HTTPException, Depends
from typing import List, Optional
from pydantic import BaseModel
from datetime import datetime

from advanced_omi_backend.models.annotation import TranscriptAnnotation
from advanced_omi_backend.models.conversation import Conversation
from advanced_omi_backend.auth import current_active_user
from advanced_omi_backend.models.user import User
from advanced_omi_backend.workers.memory_jobs import enqueue_memory_processing
from advanced_omi_backend.models.job import JobPriority

router = APIRouter()

class AnnotationCreate(BaseModel):
conversation_id: str
segment_index: int
original_text: str
corrected_text: str
status: Optional[TranscriptAnnotation.AnnotationStatus] = TranscriptAnnotation.AnnotationStatus.ACCEPTED

class AnnotationResponse(BaseModel):
id: str
conversation_id: str
segment_index: int
original_text: str
corrected_text: str
status: str
created_at: datetime

@router.post("/", response_model=AnnotationResponse)
async def create_annotation(
annotation: AnnotationCreate,
current_user: User = Depends(current_active_user)
):
# Verify conversation exists and belongs to user
conversation = await Conversation.find_one({
"conversation_id": annotation.conversation_id,
"user_id": str(current_user.id)
})

if not conversation:
raise HTTPException(status_code=404, detail="Conversation not found")

# Create annotation
new_annotation = TranscriptAnnotation(
conversation_id=annotation.conversation_id,
segment_index=annotation.segment_index,
original_text=annotation.original_text,
corrected_text=annotation.corrected_text,
user_id=str(current_user.id),
status=annotation.status,
source=TranscriptAnnotation.AnnotationSource.USER
)

await new_annotation.insert()

# Update the actual transcript in the conversation
# We need to find the active transcript version and update the segment
if conversation.active_transcript:
version = conversation.active_transcript
if 0 <= annotation.segment_index < len(version.segments):
version.segments[annotation.segment_index].text = annotation.corrected_text

# Save the conversation with the updated segment
# We need to update the specific version in the list
for i, v in enumerate(conversation.transcript_versions):
if v.version_id == version.version_id:
conversation.transcript_versions[i] = version
break

await conversation.save()

# Trigger memory reprocessing
enqueue_memory_processing(
client_id=conversation.client_id,
user_id=str(current_user.id),
user_email=current_user.email,
conversation_id=conversation.conversation_id,
priority=JobPriority.NORMAL
)
else:
raise HTTPException(status_code=400, detail="Segment index out of range")
else:
raise HTTPException(status_code=400, detail="No active transcript found")

return AnnotationResponse(
id=str(new_annotation.id),
conversation_id=new_annotation.conversation_id,
segment_index=new_annotation.segment_index,
original_text=new_annotation.original_text,
corrected_text=new_annotation.corrected_text,
status=new_annotation.status,
created_at=new_annotation.created_at
)

@router.get("/{conversation_id}", response_model=List[AnnotationResponse])
async def get_annotations(
conversation_id: str,
current_user: User = Depends(current_active_user)
):
annotations = await TranscriptAnnotation.find({
"conversation_id": conversation_id,
"user_id": str(current_user.id)
}).to_list()

return [
AnnotationResponse(
id=str(a.id),
conversation_id=a.conversation_id,
segment_index=a.segment_index,
original_text=a.original_text,
corrected_text=a.corrected_text,
status=a.status,
created_at=a.created_at
)
for a in annotations
]
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,17 @@ async def search_memories(self, query_embedding: List[float], user_id: str, limi
"query_filter": search_filter,
"limit": limit
}

if score_threshold > 0.0:
search_params["score_threshold"] = score_threshold
memory_logger.debug(f"Using similarity threshold: {score_threshold}")


# Use query_points instead of search (AsyncQdrantClient v1.10+ compat)
response = await self.client.query_points(**search_params)

results = response.points

memories = []
for result in response.points:
for result in results:
memory = MemoryEntry(
id=str(result.id),
content=result.payload.get("content", ""),
Expand Down
Loading
Loading