Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backends/advanced/.dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@
!nginx.conf.template
!start.sh
!start-k8s.sh
!start-workers.sh
!worker_orchestrator.py
!Caddyfile
5 changes: 2 additions & 3 deletions backends/advanced/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,9 @@ COPY . .
COPY diarization_config.json* ./


# Copy and make startup scripts executable
# Copy and make startup script executable
COPY start.sh ./
COPY start-workers.sh ./
RUN chmod +x start.sh start-workers.sh
RUN chmod +x start.sh

# Run the application with workers
CMD ["./start.sh"]
6 changes: 3 additions & 3 deletions backends/advanced/Dockerfile.k8s
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ COPY . .
# Copy memory config (created by init.sh from template)


# Copy and make K8s startup scripts executable
COPY start-k8s.sh start-workers.sh ./
RUN chmod +x start-k8s.sh start-workers.sh
# Copy and make K8s startup script executable
COPY start-k8s.sh ./
RUN chmod +x start-k8s.sh

# Activate virtual environment in PATH
ENV PATH="/app/.venv/bin:$PATH"
Expand Down
3 changes: 2 additions & 1 deletion backends/advanced/docker-compose-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,10 @@ services:
build:
context: .
dockerfile: Dockerfile
command: ./start-workers.sh
command: ["uv", "run", "python", "worker_orchestrator.py"]
volumes:
- ./src:/app/src
- ./worker_orchestrator.py:/app/worker_orchestrator.py
- ./data/test_audio_chunks:/app/audio_chunks
- ./data/test_debug_dir:/app/debug_dir
- ./data/test_data:/app/data
Expand Down
19 changes: 13 additions & 6 deletions backends/advanced/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,29 +76,36 @@ services:
# Unified Worker Container
# No CUDA needed for chronicle-backend and workers, workers only orchestrate jobs and call external services
# Runs all workers in a single container for efficiency:
# - 3 RQ workers (transcription, memory, default queues)
# - 1 Audio stream worker (Redis Streams consumer - must be single to maintain sequential chunks)
# - 6 RQ workers (transcription, memory, default queues)
# - 1 Audio persistence worker (audio queue)
# - 1+ Stream workers (conditional based on config.yml - Deepgram/Parakeet)
# Uses Python orchestrator for process management, health monitoring, and self-healing
workers:
build:
context: .
dockerfile: Dockerfile
command: ["./start-workers.sh"]
command: ["uv", "run", "python", "worker_orchestrator.py"]
env_file:
- .env
volumes:
- ./src:/app/src
- ./start-workers.sh:/app/start-workers.sh
- ./worker_orchestrator.py:/app/worker_orchestrator.py
- ./data/audio_chunks:/app/audio_chunks
- ./data:/app/data
- ../../config/config.yml:/app/config.yml # Removed :ro for consistency
- ../../config/plugins.yml:/app/plugins.yml # Plugin configuration
- ../../config/config.yml:/app/config.yml
- ../../config/plugins.yml:/app/plugins.yml
environment:
- DEEPGRAM_API_KEY=${DEEPGRAM_API_KEY}
- PARAKEET_ASR_URL=${PARAKEET_ASR_URL}
- OPENAI_API_KEY=${OPENAI_API_KEY}
- GROQ_API_KEY=${GROQ_API_KEY}
- HA_TOKEN=${HA_TOKEN}
- REDIS_URL=redis://redis:6379/0
# Worker orchestrator configuration (optional - defaults shown)
- WORKER_CHECK_INTERVAL=${WORKER_CHECK_INTERVAL:-10}
- MIN_RQ_WORKERS=${MIN_RQ_WORKERS:-6}
- WORKER_STARTUP_GRACE_PERIOD=${WORKER_STARTUP_GRACE_PERIOD:-30}
- WORKER_SHUTDOWN_TIMEOUT=${WORKER_SHUTDOWN_TIMEOUT:-30}
depends_on:
redis:
condition: service_healthy
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""
Worker Orchestrator Package

This package provides a Python-based orchestration system for managing
Chronicle's worker processes, replacing the bash-based start-workers.sh script.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small point: I would probably put the worker orchestrator as a service, and not in the workers folder.
This means that it's just the code in the workers folder that is used by seperate processes to the main server.


Components:
- config: Worker definitions and orchestrator configuration
- worker_registry: Build worker list with conditional logic
- process_manager: Process lifecycle management
- health_monitor: Health checks and self-healing
"""

from .config import WorkerDefinition, OrchestratorConfig, WorkerType
from .worker_registry import build_worker_definitions
from .process_manager import ManagedWorker, ProcessManager, WorkerState
from .health_monitor import HealthMonitor

__all__ = [
"WorkerDefinition",
"OrchestratorConfig",
"WorkerType",
"build_worker_definitions",
"ManagedWorker",
"ProcessManager",
"WorkerState",
"HealthMonitor",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
"""
Worker Orchestrator Configuration

Defines data structures for worker definitions and orchestrator configuration.
"""

import os
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional, Callable, List


class WorkerType(Enum):
"""Type of worker process"""

RQ_WORKER = "rq_worker" # RQ queue worker
STREAM_CONSUMER = "stream_consumer" # Redis Streams consumer


@dataclass
class WorkerDefinition:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like a worker model?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worker definition and worker state should be in same place as all worker charateristings. This config file is in the orchestrator folder, so suggests its config for the orchestator not workers

"""
Definition of a single worker process.

Attributes:
name: Unique identifier for the worker
command: Full command to execute (as list for subprocess)
worker_type: Type of worker (RQ vs stream consumer)
queues: Queue names for RQ workers (empty for stream consumers)
enabled_check: Optional predicate function to determine if worker should start
restart_on_failure: Whether to automatically restart on failure
health_check: Optional custom health check function
"""

name: str
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

command: List[str]
worker_type: WorkerType = WorkerType.RQ_WORKER
queues: List[str] = field(default_factory=list)
enabled_check: Optional[Callable[[], bool]] = None
restart_on_failure: bool = True
health_check: Optional[Callable[[], bool]] = None

def is_enabled(self) -> bool:
"""Check if this worker should be started"""
if self.enabled_check is None:
return True
return self.enabled_check()


@dataclass
class OrchestratorConfig:
"""
Global configuration for the worker orchestrator.

All settings can be overridden via environment variables.
"""

# Redis connection
redis_url: str = field(
default_factory=lambda: os.getenv("REDIS_URL", "redis://localhost:6379/0")
)

# Health monitoring settings
check_interval: int = field(
default_factory=lambda: int(os.getenv("WORKER_CHECK_INTERVAL", "10"))
)
min_rq_workers: int = field(
default_factory=lambda: int(os.getenv("MIN_RQ_WORKERS", "6"))
)
startup_grace_period: int = field(
default_factory=lambda: int(os.getenv("WORKER_STARTUP_GRACE_PERIOD", "30"))
)

# Shutdown settings
shutdown_timeout: int = field(
default_factory=lambda: int(os.getenv("WORKER_SHUTDOWN_TIMEOUT", "30"))
)

# Logging
log_level: str = field(default_factory=lambda: os.getenv("LOG_LEVEL", "INFO"))

def __post_init__(self):
"""Validate configuration after initialization"""
if self.check_interval <= 0:
raise ValueError("check_interval must be positive")
if self.min_rq_workers < 0:
raise ValueError("min_rq_workers must be non-negative")
if self.startup_grace_period < 0:
raise ValueError("startup_grace_period must be non-negative")
if self.shutdown_timeout <= 0:
raise ValueError("shutdown_timeout must be positive")
Loading