From 03cc453d4734938be443aeb984a5cb1c33fe641b Mon Sep 17 00:00:00 2001 From: Gautham Goli Date: Tue, 19 Apr 2022 12:33:32 +0530 Subject: [PATCH] Distribute consumers evenly across worker nodes --- eventbusk/cli.py | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/eventbusk/cli.py b/eventbusk/cli.py index 35b4c26..020f2d3 100644 --- a/eventbusk/cli.py +++ b/eventbusk/cli.py @@ -100,21 +100,40 @@ def reload(self) -> None: @cli.command() @click.option("--app", "-A", help="Path to EventBus instance. eg. 'mymodule:app'") -def worker(app: str) -> None: +@click.option("--worker-number", "-w", help="Worker instance number") +@click.option("--total-workers", "-n", help="Total number of worker nodes") +def worker(app: str, worker_number: int = 1, total_workers: int = 1) -> None: """ Start consumer workers """ bus = find_app(app) - receivers = bus.receivers + # Sort by name to distribute receivers evenly across worker instances + receivers: list[Callable] = sorted( + list(bus.receivers), + key=lambda receiver_fn: receiver_fn.__name__ + ) if not receivers: logger.error("No registered receivers to run.") return + if worker_number < 0: + logger.error("Worker instance number cannot be less than 1") + return + if worker_number > total_workers: + logger.error("Worker instance number cannot be > total workers") + num_workers = len(receivers) logger.info(f"Found {num_workers} receivers.") + # Every n-th receiver (n = total_workers) starting from `worker_number` + # will be run on current worker instance + worker_receivers = [ + receivers[i-1] + for i in range(worker_number, total_workers + 1, total_workers) + ] + manager = cotyledon.ServiceManager() with cwd_in_path(): - for receiver in receivers: + for receiver in worker_receivers: manager.add(Worker, args=(receiver,)) manager.run()