-
Notifications
You must be signed in to change notification settings - Fork 22
Refactor worker management and introduce orchestrator for improved pr… #251
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feat/ha-plugin
Are you sure you want to change the base?
Conversation
…ocess handling - Replaced the bash-based `start-workers.sh` script with a Python-based worker orchestrator for better process management and health monitoring. - Updated `docker-compose.yml` to configure the new orchestrator and adjust worker definitions, including the addition of audio persistence and stream workers. - Enhanced the Dockerfile to remove the old startup script and ensure the orchestrator is executable. - Introduced new modules for orchestrator configuration, health monitoring, process management, and worker registry to streamline worker lifecycle management. - Improved environment variable handling for worker configuration and health checks.
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the 📝 WalkthroughWalkthroughA legacy shell-script-based worker orchestration system (start-workers.sh) is replaced with a structured Python-based orchestrator. The change introduces new modules for worker configuration, process lifecycle management, and health monitoring, while updating Docker artifacts to invoke the Python orchestrator instead of the shell script. Changes
Sequence DiagramsequenceDiagram
participant Docker as Docker Container<br/>(Entrypoint)
participant Orch as Worker Orchestrator<br/>(Main)
participant PM as ProcessManager
participant HM as HealthMonitor
participant Redis
participant Workers as Worker Processes
Docker->>Orch: start (uv run python worker_orchestrator.py)
rect rgb(200, 220, 255)
Note over Orch,PM: Initialization Phase
Orch->>Orch: Load OrchestratorConfig from env
Orch->>Orch: Call build_worker_definitions()
Orch->>PM: Create ProcessManager(definitions)
Orch->>Redis: Create Redis connection
Orch->>HM: Create HealthMonitor(PM, config, redis)
end
rect rgb(200, 255, 220)
Note over PM,Workers: Startup Phase
Orch->>PM: start_all()
PM->>Workers: spawn subprocess for each definition
PM->>Workers: set state STARTING
PM->>Workers: transition to RUNNING after startup grace period
end
rect rgb(255, 240, 200)
Note over HM,PM: Health Monitoring Loop
Orch->>HM: start()
loop Every check_interval seconds
HM->>PM: check health for all workers
HM->>Workers: call optional custom health_check()
HM->>Redis: query RQ workers (Worker.all)
HM->>Redis: validate min_rq_workers count
alt Worker in FAILED state & restart_on_failure
HM->>PM: restart_worker(name)
PM->>Workers: stop + start
end
HM->>HM: log health status & aggregates
end
end
rect rgb(255, 220, 220)
Note over PM,Workers: Graceful Shutdown
Docker->>Orch: SIGTERM/SIGINT
Orch->>HM: stop()
Orch->>PM: stop_all(timeout=30)
PM->>Workers: send SIGTERM
PM->>Workers: wait timeout, then SIGKILL if needed
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
Comment |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🤖 Fix all issues with AI Agents
In
@backends/advanced/src/advanced_omi_backend/workers/orchestrator/health_monitor.py:
- Around line 98-102: The health monitor's general exception handler logs errors
but leaves self.running True so callers think the monitor is still active;
update the except Exception block in the health monitor loop (the handler around
"Health monitor loop error") to set self.running = False before logging and then
either re-raise the exception or ensure the loop exits cleanly so the monitor
state is accurate; reference the monitor method where self.running is managed
and the existing asyncio.CancelledError/Exception handlers when making the
change.
In
@backends/advanced/src/advanced_omi_backend/workers/orchestrator/process_manager.py:
- Around line 84-90: The subprocess is created with stdout=subprocess.PIPE in
ProcessManager where self.process is set from
subprocess.Popen(self.definition.command), but the pipe is never read and can
fill, causing hangs; update the subprocess spawn in the process creation logic
to either set stdout and stderr to subprocess.DEVNULL if logs are not needed, or
open/rotate a log file (e.g., using self.name to name the file) and pass that
file handle as stdout and stderr=subprocess.STDOUT, or implement a background
reader that drains self.process.stdout continuously; ensure the chosen change
replaces the current stdout=subprocess.PIPE usage and closes any file handles or
reader threads on process termination.
- Around line 256-266: The restart logic ignores failures from worker.stop(),
risking orphaned processes; update the code (around the restart block
referencing logger, worker.stop(timeout=timeout), worker.start(),
worker.restart_count, name, timeout) to check the boolean result of worker.stop
and, if it returns False, log an error (including context like name and
timeout), do not call worker.start(), and return False; optionally perform a
retry or escalate (e.g., call a force/kill method if available) before deciding
to abort the restart.
🧹 Nitpick comments (7)
backends/advanced/src/advanced_omi_backend/workers/orchestrator/__init__.py (1)
19-28: Consider alphabetically sorting__all__for consistency.While the current grouping is logical, an alphabetical sort would improve consistency with common Python style conventions.
🔎 Optional: Sort __all__ alphabetically
__all__ = [ + "HealthMonitor", + "ManagedWorker", + "OrchestratorConfig", + "ProcessManager", "WorkerDefinition", - "OrchestratorConfig", "WorkerType", + "WorkerState", "build_worker_definitions", - "ManagedWorker", - "ProcessManager", - "WorkerState", - "HealthMonitor", ]backends/advanced/src/advanced_omi_backend/workers/orchestrator/config.py (1)
59-81: Consider error handling for environment variable parsing.The
int()conversions for environment variables will raiseValueErrorif non-numeric values are provided, occurring during field initialization before__post_init__validation. Consider wrapping the conversions with try-except to provide clearer error messages.🔎 Example: Add safe int parsing helper
+def _get_int_env(key: str, default: str) -> int: + """Safely parse integer from environment variable.""" + try: + return int(os.getenv(key, default)) + except ValueError as e: + raise ValueError(f"Environment variable {key} must be an integer, got: {os.getenv(key)}") from e + @dataclass class OrchestratorConfig: # Health monitoring settings check_interval: int = field( - default_factory=lambda: int(os.getenv("WORKER_CHECK_INTERVAL", "10")) + default_factory=lambda: _get_int_env("WORKER_CHECK_INTERVAL", "10") )Apply similar pattern to other int fields.
backends/advanced/src/advanced_omi_backend/workers/orchestrator/health_monitor.py (4)
104-114: Synchronous Redis calls may block the event loop.
_check_rq_worker_registrationcallsWorker.all()synchronously, and_check_worker_healthiterates through workers. When called from the async_check_health, these could block the event loop if Redis is slow or there are many workers. Consider running them in an executor:await asyncio.get_event_loop().run_in_executor(None, self._check_worker_health)
142-144: Uselogger.exceptionto include stack trace.Per static analysis hint,
logger.exceptionautomatically includes the stack trace, which aids debugging.🔎 Proposed fix
except Exception as e: all_healthy = False - logger.error(f"{worker.name}: Health check raised exception: {e}") + logger.exception(f"{worker.name}: Health check raised exception: {e}")
173-175: Uselogger.exceptionfor Redis errors.This would capture the full stack trace when Redis/RQ access fails, aiding in diagnosing connectivity issues.
🔎 Proposed fix
except Exception as e: - logger.error(f"Failed to check RQ worker registration: {e}") + logger.exception(f"Failed to check RQ worker registration: {e}") return False
218-231: Consider logging Redis errors and making error state explicit.When Redis fails, the exception is swallowed silently and
rq_worker_countis set to-1. This works but loses diagnostic information. Consider logging the error and using an explicit error indicator inrq_healthy.🔎 Proposed fix
# Check RQ worker registration try: rq_workers = Worker.all(connection=self.redis) rq_worker_count = len(rq_workers) - except Exception: + rq_error = False + except Exception as e: + logger.warning(f"Failed to query RQ workers: {e}") rq_worker_count = -1 # Error indicator + rq_error = True return { "running": self.running, "uptime": time.time() - self.start_time if self.running else 0, "total_workers": len(worker_status), "state_counts": state_counts, "rq_worker_count": rq_worker_count, "min_rq_workers": self.config.min_rq_workers, - "rq_healthy": rq_worker_count >= self.config.min_rq_workers, + "rq_healthy": not rq_error and rq_worker_count >= self.config.min_rq_workers, }backends/advanced/src/advanced_omi_backend/workers/orchestrator/process_manager.py (1)
132-146: Wrap post-kill wait in try/except to handle zombie processes.If
wait(timeout=5)afterkill()times out (e.g., zombie process), it raisesTimeoutExpiredwhich gets caught by the outer handler and marks the worker asFAILED. This may be intentional, but explicitly handling this case would be clearer.🔎 Proposed fix
except subprocess.TimeoutExpired: # Force kill if timeout exceeded logger.warning( f"{self.name}: Timeout expired, force killing (SIGKILL)..." ) self.process.kill() - self.process.wait(timeout=5) + try: + self.process.wait(timeout=5) + except subprocess.TimeoutExpired: + logger.error(f"{self.name}: Process did not exit after SIGKILL") logger.warning(f"{self.name}: Force killed") self.state = WorkerState.STOPPED return True except Exception as e: - logger.error(f"{self.name}: Error during shutdown: {e}") + logger.exception(f"{self.name}: Error during shutdown: {e}") self.state = WorkerState.FAILED return False
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
backends/advanced/Dockerfilebackends/advanced/docker-compose.ymlbackends/advanced/src/advanced_omi_backend/workers/orchestrator/__init__.pybackends/advanced/src/advanced_omi_backend/workers/orchestrator/config.pybackends/advanced/src/advanced_omi_backend/workers/orchestrator/health_monitor.pybackends/advanced/src/advanced_omi_backend/workers/orchestrator/process_manager.pybackends/advanced/src/advanced_omi_backend/workers/orchestrator/worker_registry.pybackends/advanced/start-workers.sh
💤 Files with no reviewable changes (1)
- backends/advanced/start-workers.sh
🧰 Additional context used
🧬 Code graph analysis (3)
backends/advanced/src/advanced_omi_backend/workers/orchestrator/process_manager.py (1)
backends/advanced/src/advanced_omi_backend/workers/orchestrator/config.py (1)
WorkerDefinition(21-47)
backends/advanced/src/advanced_omi_backend/workers/orchestrator/config.py (1)
backends/advanced/src/advanced_omi_backend/workers/orchestrator/process_manager.py (1)
name(53-55)
backends/advanced/src/advanced_omi_backend/workers/orchestrator/health_monitor.py (2)
backends/advanced/src/advanced_omi_backend/workers/orchestrator/config.py (2)
OrchestratorConfig(51-91)WorkerType(13-17)backends/advanced/src/advanced_omi_backend/workers/orchestrator/process_manager.py (7)
WorkerState(19-28)start(69-101)stop(103-146)check_health(148-179)name(53-55)restart_worker(240-266)get_status(268-288)
🪛 Ruff (0.14.10)
backends/advanced/src/advanced_omi_backend/workers/orchestrator/process_manager.py
84-84: subprocess call: check for execution of untrusted input
(S603)
96-96: Consider moving this statement to an else block
(TRY300)
98-98: Do not catch blind exception: Exception
(BLE001)
99-99: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
130-130: Consider moving this statement to an else block
(TRY300)
143-143: Do not catch blind exception: Exception
(BLE001)
144-144: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
170-170: Do not catch blind exception: Exception
(BLE001)
171-171: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
backends/advanced/src/advanced_omi_backend/workers/orchestrator/__init__.py
19-28: __all__ is not sorted
Apply an isort-style sorting to __all__
(RUF022)
backends/advanced/src/advanced_omi_backend/workers/orchestrator/worker_registry.py
32-32: Do not catch blind exception: Exception
(BLE001)
backends/advanced/src/advanced_omi_backend/workers/orchestrator/config.py
85-85: Avoid specifying long messages outside the exception class
(TRY003)
87-87: Avoid specifying long messages outside the exception class
(TRY003)
89-89: Avoid specifying long messages outside the exception class
(TRY003)
91-91: Avoid specifying long messages outside the exception class
(TRY003)
backends/advanced/src/advanced_omi_backend/workers/orchestrator/health_monitor.py
142-142: Do not catch blind exception: Exception
(BLE001)
144-144: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
171-171: Consider moving this statement to an else block
(TRY300)
173-173: Do not catch blind exception: Exception
(BLE001)
174-174: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
221-221: Do not catch blind exception: Exception
(BLE001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: robot-tests
🔇 Additional comments (27)
backends/advanced/Dockerfile (1)
42-44: LGTM! Clean removal of legacy worker script.The Dockerfile correctly removes references to
start-workers.shwhile maintaining the core startup script handling, aligning with the orchestrator refactor.backends/advanced/docker-compose.yml (4)
79-82: Documentation accurately reflects the orchestrator architecture.The updated comments correctly describe the worker composition and orchestrator capabilities.
95-96: Config mounts enable orchestrator to read STT provider settings.These mounts are required for the
worker_registry.pylogic that conditionally starts workers based on the configured STT provider.
104-108: Orchestrator environment variables with sensible defaults.The default values are reasonable:
WORKER_CHECK_INTERVAL=10: Health checks every 10 secondsMIN_RQ_WORKERS=6: Matches the 6 RQ workers defined inworker_registry.py- Grace and timeout periods provide adequate buffer for worker lifecycle events
87-87: The orchestrator entrypoint fileworker_orchestrator.pyexists atbackends/advanced/worker_orchestrator.pyand is correctly mounted to/app/worker_orchestrator.py(line 92). The command at line 87 properly references it.backends/advanced/src/advanced_omi_backend/workers/orchestrator/__init__.py (1)
14-17: LGTM! Clean package API exports.The imports correctly expose the orchestrator's public interface from its submodules.
backends/advanced/src/advanced_omi_backend/workers/orchestrator/config.py (3)
13-18: LGTM! Clear worker type distinction.The enum appropriately distinguishes between RQ queue workers and Redis Streams consumers.
20-48: Well-designed worker definition structure.The dataclass provides appropriate defaults and flexibility:
- Command as
List[str]is subprocess-safe- Optional
enabled_checkenables conditional worker startup- Default
restart_on_failure=Trueensures resilience
82-91: Validation logic is sound and comprehensive.The
__post_init__validation correctly enforces positive/non-negative constraints for configuration values. Inline error messages are appropriate for configuration validation (the Ruff TRY003 warnings can be safely ignored here).backends/advanced/src/advanced_omi_backend/workers/orchestrator/worker_registry.py (8)
17-36: Resilient STT provider detection with appropriate fallback.The broad exception handling at line 32 is justified here for orchestrator startup resilience. Returning an empty string ensures workers don't start if configuration is unavailable, providing safe fail-closed behavior.
38-57: Correct conditional logic with helpful diagnostics.The function properly validates both the STT provider configuration and API key presence. The warning message at lines 52-54 provides valuable feedback for misconfiguration scenarios.
59-68: LGTM! Simple and correct Parakeet enablement check.
79-98: RQ worker configuration aligns with documented architecture.The 6 multi-queue workers match the docker-compose documentation (line 79) and the
MIN_RQ_WORKERSdefault. All workers handling the same queues provides redundancy and load distribution.
100-116: Dedicated audio persistence worker is properly configured.The single-queue audio worker provides isolation for audio processing tasks, matching the architecture described in docker-compose.yml (line 80).
118-133: Conditional Deepgram worker properly configured.The worker correctly uses
STREAM_CONSUMERtype and conditional startup viashould_start_deepgram_batch, ensuring it only runs when properly configured.
135-150: Conditional Parakeet worker properly configured.Consistent with the Deepgram worker pattern, using appropriate conditional startup logic.
152-170: Excellent observability with comprehensive logging.The detailed logging of STT provider, worker counts, and individual worker names provides valuable debugging information for orchestrator startup and configuration validation.
backends/advanced/src/advanced_omi_backend/workers/orchestrator/health_monitor.py (3)
33-44: LGTM!The constructor properly initializes all required dependencies and state variables.
46-76: LGTM!Start and stop methods have proper idempotency checks and clean task cancellation handling.
177-200: LGTM!Restart logic correctly checks both state and configuration before attempting restart, with appropriate logging.
backends/advanced/src/advanced_omi_backend/workers/orchestrator/process_manager.py (7)
19-28: LGTM!Well-defined state machine with clear state descriptions covering the full worker lifecycle.
44-67: LGTM!Properties are correctly implemented. The
is_alivecheck usingpoll() is Noneis the correct idiom.
175-179: Consider allowing recovery from UNHEALTHY state.The state only transitions from
STARTINGtoRUNNINGon health check success. If a worker becomesUNHEALTHYand later recovers (custom health check passes), it remainsUNHEALTHYindefinitely. Consider whether recovery should be allowed:# Update state if currently starting - if self.state == WorkerState.STARTING: + if self.state in (WorkerState.STARTING, WorkerState.UNHEALTHY): self.state = WorkerState.RUNNINGIf the intent is to require a restart to recover, document this behavior.
216-238: LGTM!Sequential shutdown with per-worker timeout is straightforward and avoids race conditions during graceful termination.
268-288: LGTM!Comprehensive status reporting that exposes all relevant worker metrics.
290-296: LGTM!Clean accessor methods for worker retrieval.
195-214: No issue found - disabled workers are already filtered before ProcessManager initialization.Disabled workers are filtered in
build_worker_definitions()(worker_registry.py, lines 156-170) which returns only enabled workers.ProcessManagerreceives this pre-filtered list, sostart_all()only attempts to start workers that have passed theis_enabled()check. The current implementation is correct and follows good architectural practice by filtering at the source rather than repeatedly checking conditions.Likely an incorrect or invalid review comment.
backends/advanced/src/advanced_omi_backend/workers/orchestrator/health_monitor.py
Show resolved
Hide resolved
backends/advanced/src/advanced_omi_backend/workers/orchestrator/process_manager.py
Show resolved
Hide resolved
backends/advanced/src/advanced_omi_backend/workers/orchestrator/process_manager.py
Show resolved
Hide resolved
- Deleted the `run-test.sh` script, which was used for local test execution. - Updated Docker configurations to replace the `start-workers.sh` script with `worker_orchestrator.py` for improved worker management. - Enhanced health monitoring and process management in the orchestrator to ensure better reliability and logging. - Adjusted deployment configurations to reflect the new orchestrator setup.
|
| Metric | Count |
|---|---|
| ✅ Passed | 87 |
| ❌ Failed | 8 |
| 📊 Total | 95 |
📊 View Reports
GitHub Pages (Live Reports):
Download Artifacts:
- robot-test-reports-html - HTML reports
- robot-test-results-xml - XML output
|
| Metric | Count |
|---|---|
| ✅ Passed | 93 |
| ❌ Failed | 2 |
| 📊 Total | 95 |
📊 View Reports
GitHub Pages (Live Reports):
Download Artifacts:
- robot-test-reports-html - HTML reports
- robot-test-results-xml - XML output
- Introduced a new method `_handle_registration_loss` to manage RQ worker registration loss, replicating the behavior of the previous bash script. - Implemented a cooldown period to prevent frequent restarts during network issues. - Added logging for bulk restart actions and their outcomes to enhance monitoring and debugging capabilities. - Created a `_restart_all_rq_workers` method to facilitate the bulk restart of RQ workers, ensuring they re-register with Redis upon startup.
🎉 Robot Framework Test ResultsStatus: ✅ All tests passed!
📊 View ReportsGitHub Pages (Live Reports): Download Artifacts:
|
|
I would strongly recommend reading the RQ docs where there are references to supervisor https://python-rq.org/patterns/supervisor/ or systemd for orchestrating workers in prod https://python-rq.org/patterns/systemd/ there are also worker configs and class definitions that we should consider if are needed on top of existing RQ stack https://python-rq.org/docs/workers/ I'd also move some files around and make distunction clear between the orchestator service and workers. Making a worker model that extende rq.worker class would be my choice. none of this is blocking, but if the refactor purpose is to reduce complexity, I'd move things out of code in favour of standard RQ rather than reimplementing it. (unless there is good reason to) |
| Worker Orchestrator Package | ||
| This package provides a Python-based orchestration system for managing | ||
| Chronicle's worker processes, replacing the bash-based start-workers.sh script. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
small point: I would probably put the worker orchestrator as a service, and not in the workers folder.
This means that it's just the code in the workers folder that is used by seperate processes to the main server.
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class HealthMonitor: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider moving this to the queue controller (maybe rename to worker contror) or health routes. or make a health controller. There's a bunch of code around health checks, this should probably live there, or be references by the main health check
|
|
||
|
|
||
| @dataclass | ||
| class WorkerDefinition: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sounds like a worker model?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
worker definition and worker state should be in same place as all worker charateristings. This config file is in the orchestrator folder, so suggests its config for the orchestator not workers
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class WorkerState(Enum): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All this sounds like a worker model.
However RQ deals with all of this natively, which is why I didn't make a worker model. https://python-rq.org/docs/workers/
| health_check: Optional custom health check function | ||
| """ | ||
|
|
||
| name: str |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider using a config file https://python-rq.org/docs/workers/#using-a-config-file
So I read it, but was confused why we'd need that over docker already handling restarts |
…ocess handling
start-workers.shscript with a Python-based worker orchestrator for better process management and health monitoring.docker-compose.ymlto configure the new orchestrator and adjust worker definitions, including the addition of audio persistence and stream workers.Summary by CodeRabbit
Release Notes
Refactor
Chores
✏️ Tip: You can customize this high-level summary in your review settings.