diff --git a/backends/advanced/.dockerignore b/backends/advanced/.dockerignore index 2dd9b44f..f0f7f05c 100644 --- a/backends/advanced/.dockerignore +++ b/backends/advanced/.dockerignore @@ -17,5 +17,5 @@ !nginx.conf.template !start.sh !start-k8s.sh -!start-workers.sh +!worker_orchestrator.py !Caddyfile \ No newline at end of file diff --git a/backends/advanced/Dockerfile b/backends/advanced/Dockerfile index 352bcfe9..a24ed841 100644 --- a/backends/advanced/Dockerfile +++ b/backends/advanced/Dockerfile @@ -39,10 +39,9 @@ COPY . . COPY diarization_config.json* ./ -# Copy and make startup scripts executable +# Copy and make startup script executable COPY start.sh ./ -COPY start-workers.sh ./ -RUN chmod +x start.sh start-workers.sh +RUN chmod +x start.sh # Run the application with workers CMD ["./start.sh"] diff --git a/backends/advanced/Dockerfile.k8s b/backends/advanced/Dockerfile.k8s index b746752a..6500ccf5 100644 --- a/backends/advanced/Dockerfile.k8s +++ b/backends/advanced/Dockerfile.k8s @@ -36,9 +36,9 @@ COPY . . # Copy memory config (created by init.sh from template) -# Copy and make K8s startup scripts executable -COPY start-k8s.sh start-workers.sh ./ -RUN chmod +x start-k8s.sh start-workers.sh +# Copy and make K8s startup script executable +COPY start-k8s.sh ./ +RUN chmod +x start-k8s.sh # Activate virtual environment in PATH ENV PATH="/app/.venv/bin:$PATH" diff --git a/backends/advanced/docker-compose-test.yml b/backends/advanced/docker-compose-test.yml index 812d29b9..134e6687 100644 --- a/backends/advanced/docker-compose-test.yml +++ b/backends/advanced/docker-compose-test.yml @@ -154,9 +154,10 @@ services: build: context: . dockerfile: Dockerfile - command: ./start-workers.sh + command: ["uv", "run", "python", "worker_orchestrator.py"] volumes: - ./src:/app/src + - ./worker_orchestrator.py:/app/worker_orchestrator.py - ./data/test_audio_chunks:/app/audio_chunks - ./data/test_debug_dir:/app/debug_dir - ./data/test_data:/app/data diff --git a/backends/advanced/docker-compose.yml b/backends/advanced/docker-compose.yml index 4e6ba153..e0895271 100644 --- a/backends/advanced/docker-compose.yml +++ b/backends/advanced/docker-compose.yml @@ -76,22 +76,24 @@ services: # Unified Worker Container # No CUDA needed for chronicle-backend and workers, workers only orchestrate jobs and call external services # Runs all workers in a single container for efficiency: - # - 3 RQ workers (transcription, memory, default queues) - # - 1 Audio stream worker (Redis Streams consumer - must be single to maintain sequential chunks) + # - 6 RQ workers (transcription, memory, default queues) + # - 1 Audio persistence worker (audio queue) + # - 1+ Stream workers (conditional based on config.yml - Deepgram/Parakeet) + # Uses Python orchestrator for process management, health monitoring, and self-healing workers: build: context: . dockerfile: Dockerfile - command: ["./start-workers.sh"] + command: ["uv", "run", "python", "worker_orchestrator.py"] env_file: - .env volumes: - ./src:/app/src - - ./start-workers.sh:/app/start-workers.sh + - ./worker_orchestrator.py:/app/worker_orchestrator.py - ./data/audio_chunks:/app/audio_chunks - ./data:/app/data - - ../../config/config.yml:/app/config.yml # Removed :ro for consistency - - ../../config/plugins.yml:/app/plugins.yml # Plugin configuration + - ../../config/config.yml:/app/config.yml + - ../../config/plugins.yml:/app/plugins.yml environment: - DEEPGRAM_API_KEY=${DEEPGRAM_API_KEY} - PARAKEET_ASR_URL=${PARAKEET_ASR_URL} @@ -99,6 +101,11 @@ services: - GROQ_API_KEY=${GROQ_API_KEY} - HA_TOKEN=${HA_TOKEN} - REDIS_URL=redis://redis:6379/0 + # Worker orchestrator configuration (optional - defaults shown) + - WORKER_CHECK_INTERVAL=${WORKER_CHECK_INTERVAL:-10} + - MIN_RQ_WORKERS=${MIN_RQ_WORKERS:-6} + - WORKER_STARTUP_GRACE_PERIOD=${WORKER_STARTUP_GRACE_PERIOD:-30} + - WORKER_SHUTDOWN_TIMEOUT=${WORKER_SHUTDOWN_TIMEOUT:-30} depends_on: redis: condition: service_healthy diff --git a/backends/advanced/src/advanced_omi_backend/workers/orchestrator/__init__.py b/backends/advanced/src/advanced_omi_backend/workers/orchestrator/__init__.py new file mode 100644 index 00000000..1c7b0d7a --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/workers/orchestrator/__init__.py @@ -0,0 +1,28 @@ +""" +Worker Orchestrator Package + +This package provides a Python-based orchestration system for managing +Chronicle's worker processes, replacing the bash-based start-workers.sh script. + +Components: +- config: Worker definitions and orchestrator configuration +- worker_registry: Build worker list with conditional logic +- process_manager: Process lifecycle management +- health_monitor: Health checks and self-healing +""" + +from .config import WorkerDefinition, OrchestratorConfig, WorkerType +from .worker_registry import build_worker_definitions +from .process_manager import ManagedWorker, ProcessManager, WorkerState +from .health_monitor import HealthMonitor + +__all__ = [ + "WorkerDefinition", + "OrchestratorConfig", + "WorkerType", + "build_worker_definitions", + "ManagedWorker", + "ProcessManager", + "WorkerState", + "HealthMonitor", +] diff --git a/backends/advanced/src/advanced_omi_backend/workers/orchestrator/config.py b/backends/advanced/src/advanced_omi_backend/workers/orchestrator/config.py new file mode 100644 index 00000000..633d366a --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/workers/orchestrator/config.py @@ -0,0 +1,91 @@ +""" +Worker Orchestrator Configuration + +Defines data structures for worker definitions and orchestrator configuration. +""" + +import os +from dataclasses import dataclass, field +from enum import Enum +from typing import Optional, Callable, List + + +class WorkerType(Enum): + """Type of worker process""" + + RQ_WORKER = "rq_worker" # RQ queue worker + STREAM_CONSUMER = "stream_consumer" # Redis Streams consumer + + +@dataclass +class WorkerDefinition: + """ + Definition of a single worker process. + + Attributes: + name: Unique identifier for the worker + command: Full command to execute (as list for subprocess) + worker_type: Type of worker (RQ vs stream consumer) + queues: Queue names for RQ workers (empty for stream consumers) + enabled_check: Optional predicate function to determine if worker should start + restart_on_failure: Whether to automatically restart on failure + health_check: Optional custom health check function + """ + + name: str + command: List[str] + worker_type: WorkerType = WorkerType.RQ_WORKER + queues: List[str] = field(default_factory=list) + enabled_check: Optional[Callable[[], bool]] = None + restart_on_failure: bool = True + health_check: Optional[Callable[[], bool]] = None + + def is_enabled(self) -> bool: + """Check if this worker should be started""" + if self.enabled_check is None: + return True + return self.enabled_check() + + +@dataclass +class OrchestratorConfig: + """ + Global configuration for the worker orchestrator. + + All settings can be overridden via environment variables. + """ + + # Redis connection + redis_url: str = field( + default_factory=lambda: os.getenv("REDIS_URL", "redis://localhost:6379/0") + ) + + # Health monitoring settings + check_interval: int = field( + default_factory=lambda: int(os.getenv("WORKER_CHECK_INTERVAL", "10")) + ) + min_rq_workers: int = field( + default_factory=lambda: int(os.getenv("MIN_RQ_WORKERS", "6")) + ) + startup_grace_period: int = field( + default_factory=lambda: int(os.getenv("WORKER_STARTUP_GRACE_PERIOD", "30")) + ) + + # Shutdown settings + shutdown_timeout: int = field( + default_factory=lambda: int(os.getenv("WORKER_SHUTDOWN_TIMEOUT", "30")) + ) + + # Logging + log_level: str = field(default_factory=lambda: os.getenv("LOG_LEVEL", "INFO")) + + def __post_init__(self): + """Validate configuration after initialization""" + if self.check_interval <= 0: + raise ValueError("check_interval must be positive") + if self.min_rq_workers < 0: + raise ValueError("min_rq_workers must be non-negative") + if self.startup_grace_period < 0: + raise ValueError("startup_grace_period must be non-negative") + if self.shutdown_timeout <= 0: + raise ValueError("shutdown_timeout must be positive") diff --git a/backends/advanced/src/advanced_omi_backend/workers/orchestrator/health_monitor.py b/backends/advanced/src/advanced_omi_backend/workers/orchestrator/health_monitor.py new file mode 100644 index 00000000..9b1149e2 --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/workers/orchestrator/health_monitor.py @@ -0,0 +1,317 @@ +""" +Health Monitor + +Self-healing monitor that detects and recovers from worker failures. +Periodically checks worker health and restarts failed workers. +""" + +import asyncio +import logging +import time +from typing import Optional + +from redis import Redis +from rq import Worker + +from .config import OrchestratorConfig, WorkerType +from .process_manager import ProcessManager, WorkerState + +logger = logging.getLogger(__name__) + + +class HealthMonitor: + """ + Self-healing monitor for worker processes. + + Periodically checks: + 1. Individual worker health (process liveness) + 2. RQ worker registration count in Redis + + Automatically restarts failed workers if configured. + """ + + def __init__( + self, + process_manager: ProcessManager, + config: OrchestratorConfig, + redis_client: Redis, + ): + self.process_manager = process_manager + self.config = config + self.redis = redis_client + self.running = False + self.monitor_task: Optional[asyncio.Task] = None + self.start_time = time.time() + self.last_registration_recovery: Optional[float] = None + self.registration_recovery_cooldown = 60 # seconds + + async def start(self): + """Start the health monitoring loop""" + if self.running: + logger.warning("Health monitor already running") + return + + self.running = True + self.start_time = time.time() + logger.info( + f"Starting health monitor (check interval: {self.config.check_interval}s, " + f"grace period: {self.config.startup_grace_period}s)" + ) + + self.monitor_task = asyncio.create_task(self._monitor_loop()) + + async def stop(self): + """Stop the health monitoring loop""" + if not self.running: + return + + logger.info("Stopping health monitor...") + self.running = False + + if self.monitor_task: + self.monitor_task.cancel() + try: + await self.monitor_task + except asyncio.CancelledError: + pass + + logger.info("Health monitor stopped") + + async def _monitor_loop(self): + """Main monitoring loop""" + try: + while self.running: + # Wait for startup grace period before starting checks + elapsed = time.time() - self.start_time + if elapsed < self.config.startup_grace_period: + remaining = self.config.startup_grace_period - elapsed + logger.debug( + f"In startup grace period - waiting {remaining:.0f}s before health checks" + ) + await asyncio.sleep(self.config.check_interval) + continue + + # Perform health checks + await self._check_health() + + # Wait for next check + await asyncio.sleep(self.config.check_interval) + + except asyncio.CancelledError: + logger.info("Health monitor loop cancelled") + raise + except Exception as e: + logger.error(f"Health monitor loop error: {e}", exc_info=True) + self.running = False # Mark monitor as stopped so callers know it's not active + raise # Re-raise to ensure the monitor task fails properly + + async def _check_health(self): + """Perform all health checks and restart failed workers""" + try: + # Check individual worker health + worker_health = self._check_worker_health() + + # Check RQ worker registration count + rq_health = self._check_rq_worker_registration() + + # If RQ workers lost registration, trigger bulk restart (matches old bash script behavior) + if not rq_health: + self._handle_registration_loss() + + # Restart failed workers + self._restart_failed_workers() + + # Log summary + if not worker_health or not rq_health: + logger.warning( + f"Health check: worker_health={worker_health}, rq_health={rq_health}" + ) + + except Exception as e: + logger.error(f"Error during health check: {e}", exc_info=True) + + def _check_worker_health(self) -> bool: + """ + Check individual worker health. + + Returns: + True if all workers are healthy + """ + all_healthy = True + + for worker in self.process_manager.get_all_workers(): + try: + is_healthy = worker.check_health() + if not is_healthy: + all_healthy = False + logger.warning( + f"{worker.name}: Health check failed (state={worker.state.value})" + ) + except Exception as e: + all_healthy = False + logger.error(f"{worker.name}: Health check raised exception: {e}") + + return all_healthy + + def _check_rq_worker_registration(self) -> bool: + """ + Check RQ worker registration count in Redis. + + This replicates the bash script's logic: + - Query Redis for all registered RQ workers + - Check if count >= min_rq_workers + + Returns: + True if RQ worker count is sufficient + """ + try: + workers = Worker.all(connection=self.redis) + worker_count = len(workers) + + if worker_count < self.config.min_rq_workers: + logger.warning( + f"RQ worker registration: {worker_count} workers " + f"(expected >= {self.config.min_rq_workers})" + ) + return False + + logger.debug(f"RQ worker registration: {worker_count} workers registered") + return True + + except Exception as e: + logger.error(f"Failed to check RQ worker registration: {e}") + return False + + def _restart_failed_workers(self): + """Restart workers that have failed and should be restarted""" + for worker in self.process_manager.get_all_workers(): + # Only restart if: + # 1. Worker state is FAILED + # 2. Worker definition has restart_on_failure=True + if ( + worker.state == WorkerState.FAILED + and worker.definition.restart_on_failure + ): + logger.warning( + f"{worker.name}: Worker failed, initiating restart " + f"(restart count: {worker.restart_count})" + ) + + success = self.process_manager.restart_worker(worker.name) + + if success: + logger.info( + f"{worker.name}: Restart successful " + f"(total restarts: {worker.restart_count})" + ) + else: + logger.error(f"{worker.name}: Restart failed") + + def _handle_registration_loss(self): + """ + Handle RQ worker registration loss. + + This replicates the old bash script's self-healing behavior: + - Check if cooldown period has passed + - Restart all RQ workers (bulk restart) + - Update recovery timestamp + + Cooldown prevents too-frequent restarts during Redis/network issues. + """ + current_time = time.time() + + # Check if cooldown period has passed + if self.last_registration_recovery is not None: + elapsed = current_time - self.last_registration_recovery + if elapsed < self.registration_recovery_cooldown: + remaining = self.registration_recovery_cooldown - elapsed + logger.debug( + f"Registration recovery cooldown active - " + f"waiting {remaining:.0f}s before next recovery attempt" + ) + return + + logger.warning( + "⚠️ RQ worker registration loss detected - initiating bulk restart " + "(replicating old start-workers.sh behavior)" + ) + + # Restart all RQ workers + success = self._restart_all_rq_workers() + + if success: + logger.info("✅ Bulk restart completed - workers should re-register soon") + else: + logger.error("❌ Bulk restart encountered errors - check individual worker logs") + + # Update recovery timestamp to start cooldown + self.last_registration_recovery = current_time + + def _restart_all_rq_workers(self) -> bool: + """ + Restart all RQ workers (bulk restart). + + This matches the old bash script's recovery mechanism: + - Kill all RQ workers + - Restart them + - Workers will automatically re-register with Redis on startup + + Returns: + True if all RQ workers restarted successfully, False otherwise + """ + rq_workers = [ + worker + for worker in self.process_manager.get_all_workers() + if worker.definition.worker_type == WorkerType.RQ_WORKER + ] + + if not rq_workers: + logger.warning("No RQ workers found to restart") + return False + + logger.info(f"Restarting {len(rq_workers)} RQ workers...") + + all_success = True + for worker in rq_workers: + logger.info(f" ↻ Restarting {worker.name}...") + success = self.process_manager.restart_worker(worker.name) + + if success: + logger.info(f" ✓ {worker.name} restarted successfully") + else: + logger.error(f" ✗ {worker.name} restart failed") + all_success = False + + return all_success + + def get_health_status(self) -> dict: + """ + Get current health status summary. + + Returns: + Dictionary with health status information + """ + worker_status = self.process_manager.get_status() + + # Count workers by state + state_counts = {} + for status in worker_status.values(): + state = status["state"] + state_counts[state] = state_counts.get(state, 0) + 1 + + # Check RQ worker registration + try: + rq_workers = Worker.all(connection=self.redis) + rq_worker_count = len(rq_workers) + except Exception: + rq_worker_count = -1 # Error indicator + + return { + "running": self.running, + "uptime": time.time() - self.start_time if self.running else 0, + "total_workers": len(worker_status), + "state_counts": state_counts, + "rq_worker_count": rq_worker_count, + "min_rq_workers": self.config.min_rq_workers, + "rq_healthy": rq_worker_count >= self.config.min_rq_workers, + } diff --git a/backends/advanced/src/advanced_omi_backend/workers/orchestrator/process_manager.py b/backends/advanced/src/advanced_omi_backend/workers/orchestrator/process_manager.py new file mode 100644 index 00000000..21b7f23e --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/workers/orchestrator/process_manager.py @@ -0,0 +1,305 @@ +""" +Process Manager + +Manages lifecycle of all worker processes with state tracking. +Handles process creation, monitoring, and graceful shutdown. +""" + +import logging +import subprocess +import time +from enum import Enum +from typing import Dict, List, Optional + +from .config import WorkerDefinition + +logger = logging.getLogger(__name__) + + +class WorkerState(Enum): + """Worker process lifecycle states""" + + PENDING = "pending" # Not yet started + STARTING = "starting" # Process started, waiting for health check + RUNNING = "running" # Healthy and running + UNHEALTHY = "unhealthy" # Running but health check failed + STOPPING = "stopping" # Shutdown initiated + STOPPED = "stopped" # Cleanly stopped + FAILED = "failed" # Crashed or failed to start + + +class ManagedWorker: + """ + Wraps a single worker process with state tracking. + + Attributes: + definition: Worker definition + process: Subprocess.Popen object (None if not started) + state: Current worker state + start_time: Timestamp when worker was started + restart_count: Number of times worker has been restarted + last_health_check: Timestamp of last health check + """ + + def __init__(self, definition: WorkerDefinition): + self.definition = definition + self.process: Optional[subprocess.Popen] = None + self.state = WorkerState.PENDING + self.start_time: Optional[float] = None + self.restart_count = 0 + self.last_health_check: Optional[float] = None + + @property + def name(self) -> str: + """Worker name""" + return self.definition.name + + @property + def pid(self) -> Optional[int]: + """Process ID (None if not started)""" + return self.process.pid if self.process else None + + @property + def is_alive(self) -> bool: + """Check if process is alive""" + if not self.process: + return False + return self.process.poll() is None + + def start(self) -> bool: + """ + Start the worker process. + + Returns: + True if started successfully, False otherwise + """ + if self.process and self.is_alive: + logger.warning(f"{self.name}: Already running (PID {self.pid})") + return False + + try: + logger.info(f"{self.name}: Starting worker...") + logger.debug(f"{self.name}: Command: {' '.join(self.definition.command)}") + + # Don't capture stdout/stderr - let it flow to container logs (Docker captures it) + # This prevents buffer overflow and blocking when worker output exceeds 64KB + # Worker logs will be visible via 'docker logs' command + self.process = subprocess.Popen( + self.definition.command, + stdout=None, # Inherit from parent (goes to container stdout) + stderr=None, # Inherit from parent (goes to container stderr) + ) + + self.state = WorkerState.STARTING + self.start_time = time.time() + + logger.info(f"{self.name}: Started with PID {self.pid}") + return True + + except Exception as e: + logger.error(f"{self.name}: Failed to start: {e}") + self.state = WorkerState.FAILED + return False + + def stop(self, timeout: int = 30) -> bool: + """ + Gracefully stop the worker process. + + Args: + timeout: Maximum wait time in seconds + + Returns: + True if stopped successfully, False otherwise + """ + if not self.process or not self.is_alive: + logger.debug(f"{self.name}: Already stopped") + self.state = WorkerState.STOPPED + return True + + try: + logger.info(f"{self.name}: Stopping worker (PID {self.pid})...") + self.state = WorkerState.STOPPING + + # Send SIGTERM for graceful shutdown + self.process.terminate() + + # Wait for process to exit + try: + self.process.wait(timeout=timeout) + logger.info(f"{self.name}: Stopped gracefully") + self.state = WorkerState.STOPPED + return True + + except subprocess.TimeoutExpired: + # Force kill if timeout exceeded + logger.warning( + f"{self.name}: Timeout expired, force killing (SIGKILL)..." + ) + self.process.kill() + self.process.wait(timeout=5) + logger.warning(f"{self.name}: Force killed") + self.state = WorkerState.STOPPED + return True + + except Exception as e: + logger.error(f"{self.name}: Error during shutdown: {e}") + self.state = WorkerState.FAILED + return False + + def check_health(self) -> bool: + """ + Check worker health. + + Returns: + True if healthy, False otherwise + """ + self.last_health_check = time.time() + + # Basic liveness check + if not self.is_alive: + logger.warning(f"{self.name}: Process is not alive") + self.state = WorkerState.FAILED + return False + + # Custom health check if defined + if self.definition.health_check: + try: + if not self.definition.health_check(): + logger.warning(f"{self.name}: Custom health check failed") + self.state = WorkerState.UNHEALTHY + return False + except Exception as e: + logger.error(f"{self.name}: Health check raised exception: {e}") + self.state = WorkerState.UNHEALTHY + return False + + # Update state if currently starting + if self.state == WorkerState.STARTING: + self.state = WorkerState.RUNNING + + return True + + +class ProcessManager: + """ + Manages all worker processes. + + Provides high-level API for starting, stopping, and monitoring workers. + """ + + def __init__(self, worker_definitions: List[WorkerDefinition]): + self.workers: Dict[str, ManagedWorker] = { + defn.name: ManagedWorker(defn) for defn in worker_definitions + } + logger.info(f"ProcessManager initialized with {len(self.workers)} workers") + + def start_all(self) -> bool: + """ + Start all workers. + + Returns: + True if all workers started successfully + """ + logger.info("Starting all workers...") + success = True + + for worker in self.workers.values(): + if not worker.start(): + success = False + + if success: + logger.info("All workers started successfully") + else: + logger.warning("Some workers failed to start") + + return success + + def stop_all(self, timeout: int = 30) -> bool: + """ + Stop all workers gracefully. + + Args: + timeout: Maximum wait time per worker in seconds + + Returns: + True if all workers stopped successfully + """ + logger.info("Stopping all workers...") + success = True + + for worker in self.workers.values(): + if not worker.stop(timeout=timeout): + success = False + + if success: + logger.info("All workers stopped successfully") + else: + logger.warning("Some workers failed to stop cleanly") + + return success + + def restart_worker(self, name: str, timeout: int = 30) -> bool: + """ + Restart a specific worker. + + Args: + name: Worker name + timeout: Maximum wait time for shutdown in seconds + + Returns: + True if restarted successfully + """ + worker = self.workers.get(name) + if not worker: + logger.error(f"Worker '{name}' not found") + return False + + logger.info(f"Restarting worker: {name}") + + # Ensure worker is fully stopped before attempting restart + stop_success = worker.stop(timeout=timeout) + if not stop_success: + logger.error(f"{name}: Failed to stop cleanly, restart aborted") + worker.state = WorkerState.FAILED + return False + + # Attempt to start the worker + success = worker.start() + + if success: + worker.restart_count += 1 + logger.info(f"{name}: Restart #{worker.restart_count} successful") + else: + logger.error(f"{name}: Restart failed") + + return success + + def get_status(self) -> Dict[str, Dict]: + """ + Get detailed status of all workers. + + Returns: + Dictionary mapping worker name to status info + """ + status = {} + + for name, worker in self.workers.items(): + status[name] = { + "pid": worker.pid, + "state": worker.state.value, + "is_alive": worker.is_alive, + "restart_count": worker.restart_count, + "start_time": worker.start_time, + "last_health_check": worker.last_health_check, + "queues": worker.definition.queues, + } + + return status + + def get_worker(self, name: str) -> Optional[ManagedWorker]: + """Get worker by name""" + return self.workers.get(name) + + def get_all_workers(self) -> List[ManagedWorker]: + """Get all workers""" + return list(self.workers.values()) diff --git a/backends/advanced/src/advanced_omi_backend/workers/orchestrator/worker_registry.py b/backends/advanced/src/advanced_omi_backend/workers/orchestrator/worker_registry.py new file mode 100644 index 00000000..512f4a9a --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/workers/orchestrator/worker_registry.py @@ -0,0 +1,170 @@ +""" +Worker Registry + +Builds the complete list of worker definitions with conditional logic. +Reuses model_registry.py for config.yml parsing. +""" + +import os +import logging +from typing import List + +from .config import WorkerDefinition, WorkerType + +logger = logging.getLogger(__name__) + + +def get_default_stt_provider() -> str: + """ + Query config.yml for the default STT provider. + + Returns: + Provider name (e.g., "deepgram", "parakeet") or empty string if not configured + """ + try: + from advanced_omi_backend.model_registry import get_models_registry + + registry = get_models_registry() + if registry and registry.defaults: + stt_model = registry.get_default("stt") + if stt_model: + return stt_model.model_provider or "" + except Exception as e: + logger.warning(f"Failed to read STT provider from config.yml: {e}") + + return "" + + +def should_start_deepgram_batch() -> bool: + """ + Check if Deepgram batch worker should start. + + Conditions: + - DEFAULT_STT provider is "deepgram" (from config.yml) + - DEEPGRAM_API_KEY is set in environment + """ + stt_provider = get_default_stt_provider() + has_api_key = bool(os.getenv("DEEPGRAM_API_KEY")) + + enabled = stt_provider == "deepgram" and has_api_key + + if stt_provider == "deepgram" and not has_api_key: + logger.warning( + "Deepgram configured as default STT but DEEPGRAM_API_KEY not set - worker disabled" + ) + + return enabled + + +def should_start_parakeet() -> bool: + """ + Check if Parakeet stream worker should start. + + Conditions: + - DEFAULT_STT provider is "parakeet" (from config.yml) + """ + stt_provider = get_default_stt_provider() + return stt_provider == "parakeet" + + +def build_worker_definitions() -> List[WorkerDefinition]: + """ + Build the complete list of worker definitions. + + Returns: + List of WorkerDefinition objects, including conditional workers + """ + workers = [] + + # 6x RQ Workers - Multi-queue workers (transcription, memory, default) + for i in range(1, 7): + workers.append( + WorkerDefinition( + name=f"rq-worker-{i}", + command=[ + "uv", + "run", + "python", + "-m", + "advanced_omi_backend.workers.rq_worker_entry", + "transcription", + "memory", + "default", + ], + worker_type=WorkerType.RQ_WORKER, + queues=["transcription", "memory", "default"], + restart_on_failure=True, + ) + ) + + # Audio Persistence Worker - Single-queue worker (audio queue) + workers.append( + WorkerDefinition( + name="audio-persistence", + command=[ + "uv", + "run", + "python", + "-m", + "advanced_omi_backend.workers.rq_worker_entry", + "audio", + ], + worker_type=WorkerType.RQ_WORKER, + queues=["audio"], + restart_on_failure=True, + ) + ) + + # Deepgram Batch Worker - Conditional (if DEFAULT_STT=deepgram + API key) + workers.append( + WorkerDefinition( + name="deepgram-batch", + command=[ + "uv", + "run", + "python", + "-m", + "advanced_omi_backend.workers.audio_stream_deepgram_worker", + ], + worker_type=WorkerType.STREAM_CONSUMER, + enabled_check=should_start_deepgram_batch, + restart_on_failure=True, + ) + ) + + # Parakeet Stream Worker - Conditional (if DEFAULT_STT=parakeet) + workers.append( + WorkerDefinition( + name="parakeet-stream", + command=[ + "uv", + "run", + "python", + "-m", + "advanced_omi_backend.workers.audio_stream_parakeet_worker", + ], + worker_type=WorkerType.STREAM_CONSUMER, + enabled_check=should_start_parakeet, + restart_on_failure=True, + ) + ) + + # Log worker configuration + stt_provider = get_default_stt_provider() + logger.info(f"STT Provider from config.yml: {stt_provider or 'none'}") + + enabled_workers = [w for w in workers if w.is_enabled()] + disabled_workers = [w for w in workers if not w.is_enabled()] + + logger.info(f"Total workers configured: {len(workers)}") + logger.info(f"Enabled workers: {len(enabled_workers)}") + logger.info( + f"Enabled worker names: {', '.join([w.name for w in enabled_workers])}" + ) + + if disabled_workers: + logger.info( + f"Disabled workers: {', '.join([w.name for w in disabled_workers])}" + ) + + return enabled_workers diff --git a/backends/advanced/start-k8s.sh b/backends/advanced/start-k8s.sh index a2f3d817..4235b16c 100755 --- a/backends/advanced/start-k8s.sh +++ b/backends/advanced/start-k8s.sh @@ -80,7 +80,7 @@ sleep 1 # Function to start all workers start_workers() { # NEW WORKERS - Redis Streams multi-provider architecture - # Single worker ensures sequential processing of audio chunks (matching start-workers.sh) + # Single worker ensures sequential processing of audio chunks (matching worker_orchestrator.py) echo "🎵 Starting audio stream Deepgram worker (1 worker for sequential processing)..." if python3 -m advanced_omi_backend.workers.audio_stream_deepgram_worker & then @@ -91,7 +91,7 @@ start_workers() { exit 1 fi - # Start 3 RQ workers listening to ALL queues (matching start-workers.sh) + # Start 3 RQ workers listening to ALL queues (matching worker_orchestrator.py) echo "🔧 Starting RQ workers (3 workers, all queues: transcription, memory, default)..." if python3 -m advanced_omi_backend.workers.rq_worker_entry transcription memory default & then @@ -123,7 +123,7 @@ start_workers() { exit 1 fi - # Start 1 dedicated audio persistence worker (matching start-workers.sh) + # Start 1 dedicated audio persistence worker (matching worker_orchestrator.py) echo "💾 Starting audio persistence worker (1 worker for audio queue)..." if python3 -m advanced_omi_backend.workers.rq_worker_entry audio & then diff --git a/backends/advanced/start-workers.sh b/backends/advanced/start-workers.sh deleted file mode 100755 index 8715da4b..00000000 --- a/backends/advanced/start-workers.sh +++ /dev/null @@ -1,208 +0,0 @@ -#!/bin/bash -# Unified worker startup script -# Starts all workers in a single container for efficiency - -set -e - -echo "🚀 Starting Chronicle Workers..." - -# Clean up any stale worker registrations from previous runs -echo "🧹 Cleaning up stale worker registrations from Redis..." -# Use RQ's cleanup command to remove dead workers -uv run python -c " -from rq import Worker -from redis import Redis -import os -import socket - -redis_url = os.getenv('REDIS_URL', 'redis://localhost:6379/0') -redis_conn = Redis.from_url(redis_url) -hostname = socket.gethostname() - -# Only clean up workers from THIS hostname (pod) -workers = Worker.all(connection=redis_conn) -cleaned = 0 -for worker in workers: - if worker.hostname == hostname: - worker.register_death() - cleaned += 1 -print(f'Cleaned up {cleaned} stale workers from {hostname}') -" 2>/dev/null || echo "No stale workers to clean" - -sleep 1 - -# Function to start all workers -start_workers() { - echo "🔧 Starting RQ workers (6 workers, all queues: transcription, memory, default)..." - uv run python -m advanced_omi_backend.workers.rq_worker_entry transcription memory default & - RQ_WORKER_1_PID=$! - uv run python -m advanced_omi_backend.workers.rq_worker_entry transcription memory default & - RQ_WORKER_2_PID=$! - uv run python -m advanced_omi_backend.workers.rq_worker_entry transcription memory default & - RQ_WORKER_3_PID=$! - uv run python -m advanced_omi_backend.workers.rq_worker_entry transcription memory default & - RQ_WORKER_4_PID=$! - uv run python -m advanced_omi_backend.workers.rq_worker_entry transcription memory default & - RQ_WORKER_5_PID=$! - uv run python -m advanced_omi_backend.workers.rq_worker_entry transcription memory default & - RQ_WORKER_6_PID=$! - - echo "💾 Starting audio persistence worker (1 worker for audio queue)..." - uv run python -m advanced_omi_backend.workers.rq_worker_entry audio & - AUDIO_PERSISTENCE_WORKER_PID=$! - - # Determine which STT provider to use from config.yml - echo "📋 Checking config.yml for default STT provider..." - DEFAULT_STT=$(uv run python -c " -from advanced_omi_backend.model_registry import get_models_registry -registry = get_models_registry() -if registry and registry.defaults: - stt_model = registry.get_default('stt') - if stt_model: - print(stt_model.model_provider or '') -" 2>/dev/null || echo "") - - echo "📋 Configured STT provider: ${DEFAULT_STT:-none}" - - # Batch Deepgram worker - uses consumer group "deepgram_workers" - # Runs alongside deepgram-streaming-worker container (consumer group "streaming-transcription") - # Both workers process same streams via Redis consumer groups (fan-out architecture) - # - Batch worker: High-quality transcription with diarization (~6s latency) - # - Streaming worker: Fast wake-word detection with plugins (~1-2s latency) - if [[ "$DEFAULT_STT" == "deepgram" ]] && [ -n "$DEEPGRAM_API_KEY" ]; then - echo "🎵 Starting audio stream Deepgram batch worker (consumer group: deepgram_workers)..." - uv run python -m advanced_omi_backend.workers.audio_stream_deepgram_worker & - AUDIO_STREAM_DEEPGRAM_WORKER_PID=$! - else - echo "⏭️ Skipping Deepgram batch worker (not configured as default STT or API key missing)" - AUDIO_STREAM_DEEPGRAM_WORKER_PID="" - fi - - # Only start Parakeet worker if configured as default STT - if [[ "$DEFAULT_STT" == "parakeet" ]]; then - echo "🎵 Starting audio stream Parakeet worker (1 worker for sequential processing)..." - uv run python -m advanced_omi_backend.workers.audio_stream_parakeet_worker & - AUDIO_STREAM_PARAKEET_WORKER_PID=$! - else - echo "⏭️ Skipping Parakeet stream worker (not configured as default STT)" - AUDIO_STREAM_PARAKEET_WORKER_PID="" - fi - - echo "✅ All workers started:" - echo " - RQ worker 1: PID $RQ_WORKER_1_PID (transcription, memory, default)" - echo " - RQ worker 2: PID $RQ_WORKER_2_PID (transcription, memory, default)" - echo " - RQ worker 3: PID $RQ_WORKER_3_PID (transcription, memory, default)" - echo " - RQ worker 4: PID $RQ_WORKER_4_PID (transcription, memory, default)" - echo " - RQ worker 5: PID $RQ_WORKER_5_PID (transcription, memory, default)" - echo " - RQ worker 6: PID $RQ_WORKER_6_PID (transcription, memory, default)" - echo " - Audio persistence worker: PID $AUDIO_PERSISTENCE_WORKER_PID (audio queue - file rotation)" - [ -n "$AUDIO_STREAM_DEEPGRAM_WORKER_PID" ] && echo " - Audio stream Deepgram worker: PID $AUDIO_STREAM_DEEPGRAM_WORKER_PID (Redis Streams consumer)" || true - [ -n "$AUDIO_STREAM_PARAKEET_WORKER_PID" ] && echo " - Audio stream Parakeet worker: PID $AUDIO_STREAM_PARAKEET_WORKER_PID (Redis Streams consumer)" || true -} - -# Function to check worker registration health -check_worker_health() { - WORKER_COUNT=$(uv run python -c " -from rq import Worker -from redis import Redis -import os -import sys - -try: - redis_url = os.getenv('REDIS_URL', 'redis://localhost:6379/0') - r = Redis.from_url(redis_url) - workers = Worker.all(connection=r) - print(len(workers)) -except Exception as e: - print('0', file=sys.stderr) - sys.exit(1) -" 2>/dev/null || echo "0") - echo "$WORKER_COUNT" -} - -# Self-healing monitoring function -monitor_worker_health() { - local CHECK_INTERVAL=10 # Check every 10 seconds - local MIN_WORKERS=6 # Expect at least 6 RQ workers - - echo "🩺 Starting self-healing monitor (check interval: ${CHECK_INTERVAL}s, min workers: ${MIN_WORKERS})" - - while true; do - sleep $CHECK_INTERVAL - - WORKER_COUNT=$(check_worker_health) - - if [ "$WORKER_COUNT" -lt "$MIN_WORKERS" ]; then - echo "⚠️ Self-healing: Only $WORKER_COUNT workers registered (expected >= $MIN_WORKERS)" - echo "🔧 Self-healing: Restarting all workers to restore registration..." - - # Kill all workers - kill $RQ_WORKER_1_PID $RQ_WORKER_2_PID $RQ_WORKER_3_PID $RQ_WORKER_4_PID $RQ_WORKER_5_PID $RQ_WORKER_6_PID $AUDIO_PERSISTENCE_WORKER_PID 2>/dev/null || true - [ -n "$AUDIO_STREAM_DEEPGRAM_WORKER_PID" ] && kill $AUDIO_STREAM_DEEPGRAM_WORKER_PID 2>/dev/null || true - [ -n "$AUDIO_STREAM_PARAKEET_WORKER_PID" ] && kill $AUDIO_STREAM_PARAKEET_WORKER_PID 2>/dev/null || true - wait 2>/dev/null || true - - # Restart workers - start_workers - - # Verify recovery - sleep 3 - NEW_WORKER_COUNT=$(check_worker_health) - echo "✅ Self-healing: Workers restarted - new count: $NEW_WORKER_COUNT" - fi - done -} - -# Function to handle shutdown -shutdown() { - echo "🛑 Shutting down workers..." - kill $MONITOR_PID 2>/dev/null || true - kill $RQ_WORKER_1_PID 2>/dev/null || true - kill $RQ_WORKER_2_PID 2>/dev/null || true - kill $RQ_WORKER_3_PID 2>/dev/null || true - kill $RQ_WORKER_4_PID 2>/dev/null || true - kill $RQ_WORKER_5_PID 2>/dev/null || true - kill $RQ_WORKER_6_PID 2>/dev/null || true - kill $AUDIO_PERSISTENCE_WORKER_PID 2>/dev/null || true - [ -n "$AUDIO_STREAM_DEEPGRAM_WORKER_PID" ] && kill $AUDIO_STREAM_DEEPGRAM_WORKER_PID 2>/dev/null || true - [ -n "$AUDIO_STREAM_PARAKEET_WORKER_PID" ] && kill $AUDIO_STREAM_PARAKEET_WORKER_PID 2>/dev/null || true - wait - echo "✅ All workers stopped" - exit 0 -} - -# Set up signal handlers -trap shutdown SIGTERM SIGINT - -# Configure Python logging for RQ workers -export PYTHONUNBUFFERED=1 - -# Start all workers -start_workers - -# Start self-healing monitor in background -monitor_worker_health & -MONITOR_PID=$! -echo "🩺 Self-healing monitor started: PID $MONITOR_PID" - -# Keep the script running and let the self-healing monitor handle worker failures -# Don't use wait -n (fail-fast on first worker exit) - this kills all workers when one fails -# Instead, wait for the monitor process or explicit shutdown signal -echo "⏳ Workers running - self-healing monitor will restart failed workers automatically" -wait $MONITOR_PID - -# If monitor exits (should only happen on SIGTERM/SIGINT), shut down gracefully -echo "🛑 Monitor exited, shutting down all workers..." -kill $RQ_WORKER_1_PID 2>/dev/null || true -kill $RQ_WORKER_2_PID 2>/dev/null || true -kill $RQ_WORKER_3_PID 2>/dev/null || true -kill $RQ_WORKER_4_PID 2>/dev/null || true -kill $RQ_WORKER_5_PID 2>/dev/null || true -kill $RQ_WORKER_6_PID 2>/dev/null || true -kill $AUDIO_PERSISTENCE_WORKER_PID 2>/dev/null || true -[ -n "$AUDIO_STREAM_DEEPGRAM_WORKER_PID" ] && kill $AUDIO_STREAM_DEEPGRAM_WORKER_PID 2>/dev/null || true -[ -n "$AUDIO_STREAM_PARAKEET_WORKER_PID" ] && kill $AUDIO_STREAM_PARAKEET_WORKER_PID 2>/dev/null || true -wait - -echo "✅ All workers stopped gracefully" -exit 0 diff --git a/backends/advanced/worker_orchestrator.py b/backends/advanced/worker_orchestrator.py new file mode 100755 index 00000000..0929bdd0 --- /dev/null +++ b/backends/advanced/worker_orchestrator.py @@ -0,0 +1,245 @@ +#!/usr/bin/env python3 +""" +Worker Orchestrator + +Main entrypoint for Chronicle worker orchestration system. +Replaces start-workers.sh bash script with Python-based orchestration. + +Usage: + python worker_orchestrator.py + # Or via Docker: docker compose up workers + +Environment Variables: + REDIS_URL Redis connection URL (default: redis://localhost:6379/0) + WORKER_CHECK_INTERVAL Health check interval in seconds (default: 10) + MIN_RQ_WORKERS Minimum expected RQ workers (default: 6) + WORKER_STARTUP_GRACE_PERIOD Grace period before health checks (default: 30) + WORKER_SHUTDOWN_TIMEOUT Max wait for graceful shutdown (default: 30) + LOG_LEVEL Logging level (default: INFO) +""" + +import asyncio +import logging +import os +import signal +import socket +import sys +from typing import Optional + +from redis import Redis +from rq import Worker + +# Import orchestrator components +from src.advanced_omi_backend.workers.orchestrator import ( + OrchestratorConfig, + ProcessManager, + HealthMonitor, + build_worker_definitions, +) + +# Configure logging +LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO") +logging.basicConfig( + level=LOG_LEVEL, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + stream=sys.stdout, +) + +logger = logging.getLogger(__name__) + + +class WorkerOrchestrator: + """ + Main orchestrator that coordinates all components. + + Handles: + - Startup sequence (Redis cleanup, worker startup) + - Signal handling (SIGTERM, SIGINT) + - Health monitoring + - Graceful shutdown + """ + + def __init__(self): + self.config: Optional[OrchestratorConfig] = None + self.redis: Optional[Redis] = None + self.process_manager: Optional[ProcessManager] = None + self.health_monitor: Optional[HealthMonitor] = None + self.shutdown_event = asyncio.Event() + + async def startup(self): + """ + Startup sequence. + + 1. Load configuration + 2. Connect to Redis + 3. Clean up stale worker registrations + 4. Build worker definitions + 5. Start all workers + 6. Setup signal handlers + 7. Start health monitor + """ + logger.info("🚀 Starting Chronicle Worker Orchestrator...") + + # 1. Load configuration + logger.info("Loading configuration...") + self.config = OrchestratorConfig() + logger.info(f"Redis URL: {self.config.redis_url}") + logger.info(f"Check interval: {self.config.check_interval}s") + logger.info(f"Min RQ workers: {self.config.min_rq_workers}") + logger.info(f"Startup grace period: {self.config.startup_grace_period}s") + + # 2. Connect to Redis + logger.info("Connecting to Redis...") + self.redis = Redis.from_url(self.config.redis_url) + try: + self.redis.ping() + logger.info("✅ Redis connection successful") + except Exception as e: + logger.error(f"❌ Failed to connect to Redis: {e}") + raise + + # 3. Clean up stale worker registrations + logger.info("🧹 Cleaning up stale worker registrations from Redis...") + cleaned_count = self._cleanup_stale_workers() + if cleaned_count > 0: + logger.info(f"Cleaned up {cleaned_count} stale workers") + else: + logger.info("No stale workers to clean") + + # 4. Build worker definitions + logger.info("Building worker definitions...") + worker_definitions = build_worker_definitions() + logger.info(f"Total enabled workers: {len(worker_definitions)}") + + # 5. Create process manager and start all workers + logger.info("Starting all workers...") + self.process_manager = ProcessManager(worker_definitions) + success = self.process_manager.start_all() + + if not success: + logger.error("❌ Some workers failed to start") + raise RuntimeError("Worker startup failed") + + # Log worker status + logger.info("✅ All workers started:") + for worker in self.process_manager.get_all_workers(): + logger.info( + f" - {worker.name}: PID {worker.pid} " + f"(queues: {', '.join(worker.definition.queues) if worker.definition.queues else 'stream consumer'})" + ) + + # 6. Setup signal handlers + loop = asyncio.get_running_loop() + for sig in (signal.SIGTERM, signal.SIGINT): + loop.add_signal_handler(sig, lambda s=sig: asyncio.create_task(self._signal_handler(s))) + + logger.info("✅ Signal handlers configured (SIGTERM, SIGINT)") + + # 7. Start health monitor + logger.info("Starting health monitor...") + self.health_monitor = HealthMonitor( + self.process_manager, self.config, self.redis + ) + await self.health_monitor.start() + logger.info("✅ Health monitor started") + + logger.info("⏳ Workers running - health monitor will auto-restart failed workers") + + def _cleanup_stale_workers(self) -> int: + """ + Clean up stale worker registrations from Redis. + + This replicates the bash script's logic: + - Only clean up workers from THIS hostname (pod-aware) + - Use RQ's register_death() to properly clean up + + Returns: + Number of workers cleaned up + """ + try: + hostname = socket.gethostname() + workers = Worker.all(connection=self.redis) + cleaned = 0 + + for worker in workers: + if worker.hostname == hostname: + worker.register_death() + cleaned += 1 + + return cleaned + + except Exception as e: + logger.warning(f"Failed to clean up stale workers: {e}") + return 0 + + async def _signal_handler(self, sig: signal.Signals): + """Handle shutdown signals""" + logger.info(f"Received signal: {sig.name}") + self.shutdown_event.set() + + async def shutdown(self): + """ + Graceful shutdown sequence. + + 1. Stop health monitor + 2. Stop all workers + 3. Close Redis connection + """ + logger.info("🛑 Initiating graceful shutdown...") + + # 1. Stop health monitor + if self.health_monitor: + await self.health_monitor.stop() + + # 2. Stop all workers + if self.process_manager: + logger.info("Stopping all workers...") + self.process_manager.stop_all(timeout=self.config.shutdown_timeout) + + # 3. Close Redis connection + if self.redis: + logger.info("Closing Redis connection...") + self.redis.close() + + logger.info("✅ All workers stopped gracefully") + + async def run(self): + """Main run loop - wait for shutdown signal""" + try: + # Perform startup + await self.startup() + + # Wait for shutdown signal + await self.shutdown_event.wait() + + except Exception as e: + logger.error(f"❌ Orchestrator error: {e}", exc_info=True) + raise + finally: + # Always perform shutdown + await self.shutdown() + + +async def main(): + """Main entrypoint""" + orchestrator = WorkerOrchestrator() + + try: + await orchestrator.run() + sys.exit(0) + + except KeyboardInterrupt: + logger.info("Interrupted by user") + sys.exit(0) + + except Exception as e: + logger.error(f"Fatal error: {e}", exc_info=True) + sys.exit(1) + + +if __name__ == "__main__": + # Ensure unbuffered output for Docker logs + os.environ["PYTHONUNBUFFERED"] = "1" + + # Run the orchestrator + asyncio.run(main()) diff --git a/backends/charts/advanced-backend/templates/deployment.yaml b/backends/charts/advanced-backend/templates/deployment.yaml index 0e40a7fb..2eb3425d 100644 --- a/backends/charts/advanced-backend/templates/deployment.yaml +++ b/backends/charts/advanced-backend/templates/deployment.yaml @@ -67,7 +67,7 @@ spec: - name: {{ .Chart.Name }}-workers image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}" imagePullPolicy: {{ .Values.image.pullPolicy }} - command: ["./start-workers.sh"] + command: ["uv", "run", "python", "worker_orchestrator.py"] envFrom: - configMapRef: name: chronicle-config diff --git a/backends/charts/advanced-backend/templates/workers-deployment.yaml b/backends/charts/advanced-backend/templates/workers-deployment.yaml index 22751d31..48add12a 100644 --- a/backends/charts/advanced-backend/templates/workers-deployment.yaml +++ b/backends/charts/advanced-backend/templates/workers-deployment.yaml @@ -21,7 +21,7 @@ spec: - name: {{ .Chart.Name }}-workers image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}" imagePullPolicy: {{ .Values.image.pullPolicy }} - command: ["./start-workers.sh"] + command: ["uv", "run", "python", "worker_orchestrator.py"] envFrom: - configMapRef: name: chronicle-config diff --git a/run-test.sh b/run-test.sh deleted file mode 100755 index ebc39a07..00000000 --- a/run-test.sh +++ /dev/null @@ -1,113 +0,0 @@ -#!/bin/bash - -# Chronicle Local Test Runner -# Runs the same tests as GitHub CI but configured for local development -# Usage: ./run-test.sh [advanced-backend|speaker-recognition|all] - -set -e - -# Colors for output -RED='\033[0;31m' -GREEN='\033[0;32m' -YELLOW='\033[1;33m' -BLUE='\033[0;34m' -NC='\033[0m' # No Color - -# Print colored output -print_info() { - echo -e "${BLUE}[INFO]${NC} $1" -} - -print_success() { - echo -e "${GREEN}[SUCCESS]${NC} $1" -} - -print_warning() { - echo -e "${YELLOW}[WARNING]${NC} $1" -} - -print_error() { - echo -e "${RED}[ERROR]${NC} $1" -} - -# Function to run advanced backend tests -run_advanced_backend_tests() { - print_info "Running Advanced Backend Integration Tests..." - - if [ ! -f "backends/advanced/run-test.sh" ]; then - print_error "backends/advanced/run-test.sh not found!" - return 1 - fi - - cd backends/advanced - ./run-test.sh - cd ../.. - - print_success "Advanced Backend tests completed" -} - -# Function to run speaker recognition tests -run_speaker_recognition_tests() { - print_info "Running Speaker Recognition Tests..." - - if [ ! -f "extras/speaker-recognition/run-test.sh" ]; then - print_error "extras/speaker-recognition/run-test.sh not found!" - return 1 - fi - - cd extras/speaker-recognition - ./run-test.sh - cd ../.. - - print_success "Speaker Recognition tests completed" -} - -# Main execution -print_info "Chronicle Local Test Runner" -print_info "==============================" - -# Check if we're in the right directory -if [ ! -f "CLAUDE.md" ]; then - print_error "Please run this script from the chronicle root directory" - exit 1 -fi - -# Parse command line argument -TEST_SUITE="${1:-all}" - -case "$TEST_SUITE" in - "advanced-backend") - run_advanced_backend_tests - ;; - "speaker-recognition") - run_speaker_recognition_tests - ;; - "all") - print_info "Running all test suites..." - - # Run advanced backend tests - if run_advanced_backend_tests; then - print_success "Advanced Backend tests: PASSED" - else - print_error "Advanced Backend tests: FAILED" - exit 1 - fi - - # Run speaker recognition tests - if run_speaker_recognition_tests; then - print_success "Speaker Recognition tests: PASSED" - else - print_error "Speaker Recognition tests: FAILED" - exit 1 - fi - - print_success "All test suites completed successfully!" - ;; - *) - print_error "Unknown test suite: $TEST_SUITE" - echo "Usage: $0 [advanced-backend|speaker-recognition|all]" - exit 1 - ;; -esac - -print_success "Test execution completed!" \ No newline at end of file