Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Gemfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
source "https://rubygems.org"

source "https://gems.contribsys.com/" do
source "https://enterprise.contribsys.com/" do
gem "sidekiq-pro", ">= 7.3.0", "< 9"
end

Expand Down
1 change: 1 addition & 0 deletions lib/simplekiq/orchestration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
module Simplekiq
class Orchestration
attr_accessor :serial_workflow, :parallel_workflow
attr_reader :child_job_options

def initialize(child_job_options: {})
@serial_workflow = []
Expand Down
11 changes: 7 additions & 4 deletions lib/simplekiq/orchestration_executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,34 @@

module Simplekiq
class OrchestrationExecutor
def self.execute(args:, job:, workflow:)
def self.execute(args:, job:, workflow:, child_job_options:)
orchestration_batch = Sidekiq::Batch.new
orchestration_batch.description = "[Simplekiq] #{job.class.name}. Params: #{args}"
orchestration_batch.callback_queue = child_job_options["queue"] if child_job_options&.dig("queue")
Simplekiq.auto_define_callbacks(orchestration_batch, args: args, job: job)

orchestration_batch.jobs do
new.run_step(workflow, 0, job.class.name) unless workflow.empty?
new.run_step(workflow, 0, job.class.name, child_job_options) unless workflow.empty?
end
end

def run_step(workflow, step, orchestration_job_class_name)
def run_step(workflow, step, orchestration_job_class_name, child_job_options)
*jobs = workflow.at(step)
# This will never be empty because Orchestration#serialized_workflow skips inserting
# a new step for in_parallel if there were no inner jobs specified.

next_step = step + 1
step_batch = Sidekiq::Batch.new
step_batch.description = step_batch_description(jobs, next_step, orchestration_job_class_name)
step_batch.callback_queue = child_job_options["queue"] if child_job_options&.dig("queue")
step_batch.on(
"success",
self.class,
{
"orchestration_workflow" => workflow,
"step" => next_step,
"orchestration_job_class_name" => orchestration_job_class_name,
"child_job_options" => child_job_options,
}
)

Expand All @@ -41,7 +44,7 @@ def on_success(status, options)
return if options["step"] == options["orchestration_workflow"].length

Sidekiq::Batch.new(status.parent_bid).jobs do
run_step(options["orchestration_workflow"], options["step"], options["orchestration_job_class_name"])
run_step(options["orchestration_workflow"], options["step"], options["orchestration_job_class_name"], options["child_job_options"])
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/simplekiq/orchestration_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def perform(*args)
# If there is no parent batch, then it will simply be:
# orchestration_batch( batch_of_first_step_of_the_orchestration )
conditionally_within_parent_batch do
OrchestrationExecutor.execute(args: args, job: self, workflow: orchestration.serialized_workflow)
OrchestrationExecutor.execute(args: args, job: self, workflow: orchestration.serialized_workflow, child_job_options: orchestration.child_job_options)
end
end

Expand Down
114 changes: 109 additions & 5 deletions spec/orchestration_executor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ def set(options)
end) }

describe ".execute" do
def execute
described_class.execute(args: [{ "some" => "args" }], job: job, workflow: workflow)
def execute(child_job_options: {})
described_class.execute(args: [{ "some" => "args" }], job: job, workflow: workflow, child_job_options: child_job_options)
end

it "kicks off the first step with a new batch" do
batch_double = instance_double(Sidekiq::Batch, bid: 42)
allow(Sidekiq::Batch).to receive(:new).and_return(batch_double)
expect(batch_double).to receive(:description=).with("[Simplekiq] FakeOrchestration. Params: [{\"some\"=>\"args\"}]")
expect(batch_double).to receive(:description=).with(a_string_matching(/\[Simplekiq\] FakeOrchestration. Params: \[.*some.*args.*\]/))
expect(batch_double).to receive(:on).with("success", FakeOrchestration, "args" => [{ "some" => "args" }])

batch_stack_depth = 0 # to keep track of how deeply nested within batches we are
Expand All @@ -44,19 +44,63 @@ def execute

instance = instance_double(Simplekiq::OrchestrationExecutor)
allow(Simplekiq::OrchestrationExecutor).to receive(:new).and_return(instance)
expect(instance).to receive(:run_step) do |workflow_arg, step|
expect(instance).to receive(:run_step) do |workflow_arg, step, class_name, child_job_opts|
expect(batch_stack_depth).to eq 1
expect(step).to eq 0
expect(child_job_opts).to eq({})
end

execute
end

context "when child_job_options includes a queue" do
it "sets callback_queue on the orchestration batch" do
batch_double = instance_double(Sidekiq::Batch, bid: 42)
allow(Sidekiq::Batch).to receive(:new).and_return(batch_double)
allow(batch_double).to receive(:description=)
allow(batch_double).to receive(:on)
allow(batch_double).to receive(:jobs)

expect(batch_double).to receive(:callback_queue=).with("high")

execute(child_job_options: { "queue" => "high" })
end
end

context "when child_job_options does not include a queue" do
it "does not set callback_queue on the orchestration batch" do
batch_double = instance_double(Sidekiq::Batch, bid: 42)
allow(Sidekiq::Batch).to receive(:new).and_return(batch_double)
allow(batch_double).to receive(:description=)
allow(batch_double).to receive(:on)
allow(batch_double).to receive(:jobs)

expect(batch_double).not_to receive(:callback_queue=)

execute(child_job_options: {})
end
end

context "when child_job_options is nil" do
it "does not set callback_queue on the orchestration batch" do
batch_double = instance_double(Sidekiq::Batch, bid: 42)
allow(Sidekiq::Batch).to receive(:new).and_return(batch_double)
allow(batch_double).to receive(:description=)
allow(batch_double).to receive(:on)
allow(batch_double).to receive(:jobs)

expect(batch_double).not_to receive(:callback_queue=)

execute(child_job_options: nil)
end
end
end

describe "run_step" do
let(:step_batch) { instance_double(Sidekiq::Batch, bid: 42) }
let(:step) { 0 }
let(:instance) { described_class.new }
let(:child_job_options) { {} }

it "runs the next job within a new step batch" do
batch_stack_depth = 0 # to keep track of how deeply nested within batches we are
Expand All @@ -76,10 +120,70 @@ def execute
"orchestration_workflow" => workflow,
"step" => 1,
"orchestration_job_class_name" => "FakeOrchestration",
"child_job_options" => {},
})
expect(step_batch).to receive(:description=).with("[Simplekiq] step 1 in FakeOrchestration. Running OrcTest::JobA.")

instance.run_step(workflow, 0, "FakeOrchestration")
instance.run_step(workflow, 0, "FakeOrchestration", child_job_options)
end

context "when child_job_options includes a queue" do
let(:child_job_options) { { "queue" => "high" } }

it "sets callback_queue on the step batch" do
allow(step_batch).to receive(:jobs)
allow(step_batch).to receive(:on)
allow(step_batch).to receive(:description=)
allow(Sidekiq::Batch).to receive(:new).and_return(step_batch)

expect(step_batch).to receive(:callback_queue=).with("high")

instance.run_step(workflow, 0, "FakeOrchestration", child_job_options)
end

it "passes child_job_options through the callback chain" do
allow(step_batch).to receive(:jobs)
allow(step_batch).to receive(:description=)
allow(step_batch).to receive(:callback_queue=)
allow(Sidekiq::Batch).to receive(:new).and_return(step_batch)

expect(step_batch).to receive(:on).with("success", described_class, {
"orchestration_workflow" => workflow,
"step" => 1,
"orchestration_job_class_name" => "FakeOrchestration",
"child_job_options" => { "queue" => "high" },
})

instance.run_step(workflow, 0, "FakeOrchestration", child_job_options)
end
end

context "when child_job_options does not include a queue" do
it "does not set callback_queue on the step batch" do
allow(step_batch).to receive(:jobs)
allow(step_batch).to receive(:on)
allow(step_batch).to receive(:description=)
allow(Sidekiq::Batch).to receive(:new).and_return(step_batch)

expect(step_batch).not_to receive(:callback_queue=)

instance.run_step(workflow, 0, "FakeOrchestration", child_job_options)
end
end

context "when child_job_options is nil" do
let(:child_job_options) { nil }

it "does not set callback_queue on the step batch" do
allow(step_batch).to receive(:jobs)
allow(step_batch).to receive(:on)
allow(step_batch).to receive(:description=)
allow(Sidekiq::Batch).to receive(:new).and_return(step_batch)

expect(step_batch).not_to receive(:callback_queue=)

instance.run_step(workflow, 0, "FakeOrchestration", child_job_options)
end
end
end
end
17 changes: 14 additions & 3 deletions spec/orchestration_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ def perform
workflow: [
{"klass" => "OrcTest::JobA", "args" => ["some"], "opts" => {}},
{"klass" => "OrcTest::JobB", "args" => ["args"], "opts" => {}}
]
],
child_job_options: {}
)

perform
Expand Down Expand Up @@ -81,7 +82,8 @@ def perform_orchestration(first, second)
{"klass" => "OrcTest::JobB", "args" => ["some"], "opts" => {}},
{"klass" => "OrcTest::JobC", "args" => ["args"], "opts" => {}}
]
]
],
child_job_options: {}
)

perform
Expand Down Expand Up @@ -114,10 +116,19 @@ def child_job_options(*args)
job: job,
workflow: [
{"klass" => "OrcTest::JobA", "args" => ["some", "args"], "opts" => { "queue" => "some-test-queue" }},
]
],
child_job_options: { "queue" => "some-test-queue" }
)

perform
end

it "passes child_job_options to the executor" do
expect(Simplekiq::OrchestrationExecutor).to receive(:execute) do |args:, job:, workflow:, child_job_options:|
expect(child_job_options).to eq({ "queue" => "some-test-queue" })
end

perform
end
end
end