From 341d12613d1b31256f6a52dbc8e3b242a6e7dfc2 Mon Sep 17 00:00:00 2001 From: Brendan Mulholland Date: Fri, 26 Dec 2025 14:56:12 +0100 Subject: [PATCH] Fix empty batch queue routing to respect child_job_options MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a Simplekiq orchestration encounters an empty batch (e.g., when guard clauses skip all child jobs in a step), Sidekiq Pro automatically schedules a Sidekiq::Batch::Empty job to ensure batch callbacks fire. However, this Empty job was being scheduled to the default queue instead of respecting the child_job_options queue configuration. This occurred because Sidekiq Pro uses the batch.callback_queue attribute to determine the queue for Empty jobs, and Simplekiq never set this value. This commit fixes the issue by: - Exposing child_job_options as a reader on Orchestration - Setting callback_queue on both orchestration and step batches when a queue is specified in child_job_options - Passing child_job_options through the callback chain so it's available in multi-step workflows Empty jobs will now respect the configured queue, maintaining queue isolation for orchestrations that use child_job_options. Orchestrations without child_job_options continue to use the default queue as before. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 --- Gemfile | 2 +- lib/simplekiq/orchestration.rb | 1 + lib/simplekiq/orchestration_executor.rb | 11 ++- lib/simplekiq/orchestration_job.rb | 2 +- spec/orchestration_executor_spec.rb | 114 ++++++++++++++++++++++-- spec/orchestration_job_spec.rb | 17 +++- 6 files changed, 133 insertions(+), 14 deletions(-) diff --git a/Gemfile b/Gemfile index c1b27ba..284faa2 100644 --- a/Gemfile +++ b/Gemfile @@ -1,6 +1,6 @@ source "https://rubygems.org" -source "https://gems.contribsys.com/" do +source "https://enterprise.contribsys.com/" do gem "sidekiq-pro", ">= 7.3.0", "< 9" end diff --git a/lib/simplekiq/orchestration.rb b/lib/simplekiq/orchestration.rb index 5e0a333..9dee8f2 100644 --- a/lib/simplekiq/orchestration.rb +++ b/lib/simplekiq/orchestration.rb @@ -3,6 +3,7 @@ module Simplekiq class Orchestration attr_accessor :serial_workflow, :parallel_workflow + attr_reader :child_job_options def initialize(child_job_options: {}) @serial_workflow = [] diff --git a/lib/simplekiq/orchestration_executor.rb b/lib/simplekiq/orchestration_executor.rb index e0aaec0..bacf4bb 100644 --- a/lib/simplekiq/orchestration_executor.rb +++ b/lib/simplekiq/orchestration_executor.rb @@ -2,17 +2,18 @@ module Simplekiq class OrchestrationExecutor - def self.execute(args:, job:, workflow:) + def self.execute(args:, job:, workflow:, child_job_options:) orchestration_batch = Sidekiq::Batch.new orchestration_batch.description = "[Simplekiq] #{job.class.name}. Params: #{args}" + orchestration_batch.callback_queue = child_job_options["queue"] if child_job_options&.dig("queue") Simplekiq.auto_define_callbacks(orchestration_batch, args: args, job: job) orchestration_batch.jobs do - new.run_step(workflow, 0, job.class.name) unless workflow.empty? + new.run_step(workflow, 0, job.class.name, child_job_options) unless workflow.empty? end end - def run_step(workflow, step, orchestration_job_class_name) + def run_step(workflow, step, orchestration_job_class_name, child_job_options) *jobs = workflow.at(step) # This will never be empty because Orchestration#serialized_workflow skips inserting # a new step for in_parallel if there were no inner jobs specified. @@ -20,6 +21,7 @@ def run_step(workflow, step, orchestration_job_class_name) next_step = step + 1 step_batch = Sidekiq::Batch.new step_batch.description = step_batch_description(jobs, next_step, orchestration_job_class_name) + step_batch.callback_queue = child_job_options["queue"] if child_job_options&.dig("queue") step_batch.on( "success", self.class, @@ -27,6 +29,7 @@ def run_step(workflow, step, orchestration_job_class_name) "orchestration_workflow" => workflow, "step" => next_step, "orchestration_job_class_name" => orchestration_job_class_name, + "child_job_options" => child_job_options, } ) @@ -41,7 +44,7 @@ def on_success(status, options) return if options["step"] == options["orchestration_workflow"].length Sidekiq::Batch.new(status.parent_bid).jobs do - run_step(options["orchestration_workflow"], options["step"], options["orchestration_job_class_name"]) + run_step(options["orchestration_workflow"], options["step"], options["orchestration_job_class_name"], options["child_job_options"]) end end diff --git a/lib/simplekiq/orchestration_job.rb b/lib/simplekiq/orchestration_job.rb index 5581c7e..ed3be05 100644 --- a/lib/simplekiq/orchestration_job.rb +++ b/lib/simplekiq/orchestration_job.rb @@ -19,7 +19,7 @@ def perform(*args) # If there is no parent batch, then it will simply be: # orchestration_batch( batch_of_first_step_of_the_orchestration ) conditionally_within_parent_batch do - OrchestrationExecutor.execute(args: args, job: self, workflow: orchestration.serialized_workflow) + OrchestrationExecutor.execute(args: args, job: self, workflow: orchestration.serialized_workflow, child_job_options: orchestration.child_job_options) end end diff --git a/spec/orchestration_executor_spec.rb b/spec/orchestration_executor_spec.rb index e7a01d7..42fa261 100644 --- a/spec/orchestration_executor_spec.rb +++ b/spec/orchestration_executor_spec.rb @@ -25,14 +25,14 @@ def set(options) end) } describe ".execute" do - def execute - described_class.execute(args: [{ "some" => "args" }], job: job, workflow: workflow) + def execute(child_job_options: {}) + described_class.execute(args: [{ "some" => "args" }], job: job, workflow: workflow, child_job_options: child_job_options) end it "kicks off the first step with a new batch" do batch_double = instance_double(Sidekiq::Batch, bid: 42) allow(Sidekiq::Batch).to receive(:new).and_return(batch_double) - expect(batch_double).to receive(:description=).with("[Simplekiq] FakeOrchestration. Params: [{\"some\"=>\"args\"}]") + expect(batch_double).to receive(:description=).with(a_string_matching(/\[Simplekiq\] FakeOrchestration. Params: \[.*some.*args.*\]/)) expect(batch_double).to receive(:on).with("success", FakeOrchestration, "args" => [{ "some" => "args" }]) batch_stack_depth = 0 # to keep track of how deeply nested within batches we are @@ -44,19 +44,63 @@ def execute instance = instance_double(Simplekiq::OrchestrationExecutor) allow(Simplekiq::OrchestrationExecutor).to receive(:new).and_return(instance) - expect(instance).to receive(:run_step) do |workflow_arg, step| + expect(instance).to receive(:run_step) do |workflow_arg, step, class_name, child_job_opts| expect(batch_stack_depth).to eq 1 expect(step).to eq 0 + expect(child_job_opts).to eq({}) end execute end + + context "when child_job_options includes a queue" do + it "sets callback_queue on the orchestration batch" do + batch_double = instance_double(Sidekiq::Batch, bid: 42) + allow(Sidekiq::Batch).to receive(:new).and_return(batch_double) + allow(batch_double).to receive(:description=) + allow(batch_double).to receive(:on) + allow(batch_double).to receive(:jobs) + + expect(batch_double).to receive(:callback_queue=).with("high") + + execute(child_job_options: { "queue" => "high" }) + end + end + + context "when child_job_options does not include a queue" do + it "does not set callback_queue on the orchestration batch" do + batch_double = instance_double(Sidekiq::Batch, bid: 42) + allow(Sidekiq::Batch).to receive(:new).and_return(batch_double) + allow(batch_double).to receive(:description=) + allow(batch_double).to receive(:on) + allow(batch_double).to receive(:jobs) + + expect(batch_double).not_to receive(:callback_queue=) + + execute(child_job_options: {}) + end + end + + context "when child_job_options is nil" do + it "does not set callback_queue on the orchestration batch" do + batch_double = instance_double(Sidekiq::Batch, bid: 42) + allow(Sidekiq::Batch).to receive(:new).and_return(batch_double) + allow(batch_double).to receive(:description=) + allow(batch_double).to receive(:on) + allow(batch_double).to receive(:jobs) + + expect(batch_double).not_to receive(:callback_queue=) + + execute(child_job_options: nil) + end + end end describe "run_step" do let(:step_batch) { instance_double(Sidekiq::Batch, bid: 42) } let(:step) { 0 } let(:instance) { described_class.new } + let(:child_job_options) { {} } it "runs the next job within a new step batch" do batch_stack_depth = 0 # to keep track of how deeply nested within batches we are @@ -76,10 +120,70 @@ def execute "orchestration_workflow" => workflow, "step" => 1, "orchestration_job_class_name" => "FakeOrchestration", + "child_job_options" => {}, }) expect(step_batch).to receive(:description=).with("[Simplekiq] step 1 in FakeOrchestration. Running OrcTest::JobA.") - instance.run_step(workflow, 0, "FakeOrchestration") + instance.run_step(workflow, 0, "FakeOrchestration", child_job_options) + end + + context "when child_job_options includes a queue" do + let(:child_job_options) { { "queue" => "high" } } + + it "sets callback_queue on the step batch" do + allow(step_batch).to receive(:jobs) + allow(step_batch).to receive(:on) + allow(step_batch).to receive(:description=) + allow(Sidekiq::Batch).to receive(:new).and_return(step_batch) + + expect(step_batch).to receive(:callback_queue=).with("high") + + instance.run_step(workflow, 0, "FakeOrchestration", child_job_options) + end + + it "passes child_job_options through the callback chain" do + allow(step_batch).to receive(:jobs) + allow(step_batch).to receive(:description=) + allow(step_batch).to receive(:callback_queue=) + allow(Sidekiq::Batch).to receive(:new).and_return(step_batch) + + expect(step_batch).to receive(:on).with("success", described_class, { + "orchestration_workflow" => workflow, + "step" => 1, + "orchestration_job_class_name" => "FakeOrchestration", + "child_job_options" => { "queue" => "high" }, + }) + + instance.run_step(workflow, 0, "FakeOrchestration", child_job_options) + end + end + + context "when child_job_options does not include a queue" do + it "does not set callback_queue on the step batch" do + allow(step_batch).to receive(:jobs) + allow(step_batch).to receive(:on) + allow(step_batch).to receive(:description=) + allow(Sidekiq::Batch).to receive(:new).and_return(step_batch) + + expect(step_batch).not_to receive(:callback_queue=) + + instance.run_step(workflow, 0, "FakeOrchestration", child_job_options) + end + end + + context "when child_job_options is nil" do + let(:child_job_options) { nil } + + it "does not set callback_queue on the step batch" do + allow(step_batch).to receive(:jobs) + allow(step_batch).to receive(:on) + allow(step_batch).to receive(:description=) + allow(Sidekiq::Batch).to receive(:new).and_return(step_batch) + + expect(step_batch).not_to receive(:callback_queue=) + + instance.run_step(workflow, 0, "FakeOrchestration", child_job_options) + end end end end diff --git a/spec/orchestration_job_spec.rb b/spec/orchestration_job_spec.rb index 7d3fbdd..2c2abd9 100644 --- a/spec/orchestration_job_spec.rb +++ b/spec/orchestration_job_spec.rb @@ -30,7 +30,8 @@ def perform workflow: [ {"klass" => "OrcTest::JobA", "args" => ["some"], "opts" => {}}, {"klass" => "OrcTest::JobB", "args" => ["args"], "opts" => {}} - ] + ], + child_job_options: {} ) perform @@ -81,7 +82,8 @@ def perform_orchestration(first, second) {"klass" => "OrcTest::JobB", "args" => ["some"], "opts" => {}}, {"klass" => "OrcTest::JobC", "args" => ["args"], "opts" => {}} ] - ] + ], + child_job_options: {} ) perform @@ -114,10 +116,19 @@ def child_job_options(*args) job: job, workflow: [ {"klass" => "OrcTest::JobA", "args" => ["some", "args"], "opts" => { "queue" => "some-test-queue" }}, - ] + ], + child_job_options: { "queue" => "some-test-queue" } ) perform end + + it "passes child_job_options to the executor" do + expect(Simplekiq::OrchestrationExecutor).to receive(:execute) do |args:, job:, workflow:, child_job_options:| + expect(child_job_options).to eq({ "queue" => "some-test-queue" }) + end + + perform + end end end