Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions lib/que/command_line_interface.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ def parse(
default_require_file: RAILS_ENVIRONMENT_FILE
)

options = {}
queues = []
log_level = 'info'
log_internals = false
poll_interval = 5
connection_url = nil
worker_count = nil
worker_priorities = nil
options = {}
queues = []
log_level = 'info'
log_internals = false
poll_interval = 5
skip_poll_when_buffer_above_threshold = 1.0
connection_url = nil
worker_count = nil
worker_priorities = nil

parser =
OptionParser.new do |opts|
Expand All @@ -50,6 +51,14 @@ def parse(
poll_interval = i
end

opts.on(
'--skip-poll-when-buffer-above-threshold [THRESHOLD]',
Float,
"Set threshold for skipping polls based on buffer fullness (default: 1.0)",
) do |threshold|
skip_poll_when_buffer_above_threshold = threshold
end

opts.on(
'--listen [LISTEN]',
String,
Expand Down Expand Up @@ -232,7 +241,8 @@ def parse(
options[:queues] = queues_hash
end

options[:poll_interval] = poll_interval
options[:poll_interval] = poll_interval
options[:skip_poll_when_buffer_above_threshold] = skip_poll_when_buffer_above_threshold

locker =
begin
Expand Down
64 changes: 39 additions & 25 deletions lib/que/locker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class << self
}

class Locker
attr_reader :thread, :workers, :job_buffer, :locks, :queues, :poll_interval
attr_reader :thread, :workers, :job_buffer, :locks, :queues, :poll_interval, :skip_poll_when_buffer_above_threshold

MESSAGE_RESOLVERS = {}
RESULT_RESOLVERS = {}
Expand All @@ -47,22 +47,24 @@ class Locker
RESULT_RESOLVERS[:job_finished] =
-> (messages) { finish_jobs(messages.map{|m| m.fetch(:metajob)}) }

DEFAULT_POLL_INTERVAL = 5.0
DEFAULT_WAIT_PERIOD = 50
DEFAULT_MAXIMUM_BUFFER_SIZE = 8
DEFAULT_WORKER_PRIORITIES = [10, 30, 50, nil, nil, nil].freeze
DEFAULT_POLL_INTERVAL = 5.0
DEFAULT_SKIP_POLL_WHEN_BUFFER_ABOVE_THRESHOLD = 1.0
DEFAULT_WAIT_PERIOD = 50
DEFAULT_MAXIMUM_BUFFER_SIZE = 8
DEFAULT_WORKER_PRIORITIES = [10, 30, 50, nil, nil, nil].freeze

def initialize(
queues: [Que.default_queue],
connection_url: nil,
listen: true,
poll: true,
poll_interval: DEFAULT_POLL_INTERVAL,
wait_period: DEFAULT_WAIT_PERIOD,
maximum_buffer_size: DEFAULT_MAXIMUM_BUFFER_SIZE,
worker_priorities: DEFAULT_WORKER_PRIORITIES,
on_worker_start: nil,
pidfile: nil
queues: [Que.default_queue],
connection_url: nil,
listen: true,
poll: true,
poll_interval: DEFAULT_POLL_INTERVAL,
skip_poll_when_buffer_above_threshold: DEFAULT_SKIP_POLL_WHEN_BUFFER_ABOVE_THRESHOLD,
wait_period: DEFAULT_WAIT_PERIOD,
maximum_buffer_size: DEFAULT_MAXIMUM_BUFFER_SIZE,
worker_priorities: DEFAULT_WORKER_PRIORITIES,
on_worker_start: nil,
pidfile: nil
)

# Sanity-check all our arguments, since some users may instantiate Locker
Expand Down Expand Up @@ -94,20 +96,22 @@ def initialize(

Que.internal_log :locker_instantiate, self do
{
queues: queues,
listen: listen,
poll: poll,
poll_interval: poll_interval,
wait_period: wait_period,
maximum_buffer_size: maximum_buffer_size,
worker_priorities: worker_priorities,
queues: queues,
listen: listen,
poll: poll,
poll_interval: poll_interval,
skip_poll_when_buffer_above_threshold: skip_poll_when_buffer_above_threshold,
wait_period: wait_period,
maximum_buffer_size: maximum_buffer_size,
worker_priorities: worker_priorities,
}
end

# Local cache of which advisory locks are held by this connection.
@locks = Set.new

@poll_interval = poll_interval
@skip_poll_when_buffer_above_threshold = skip_poll_when_buffer_above_threshold

if queues.is_a?(Hash)
@queue_names = queues.keys
Expand Down Expand Up @@ -330,12 +334,22 @@ def poll
# enabled).
return unless pollers

# Skip polling if the job buffer is sufficiently full.
if job_buffer.size >= job_buffer.maximum_size * skip_poll_when_buffer_above_threshold
Que.internal_log(:locker_polling_skipped, self) {
{
current_buffer_size: job_buffer.size,
max_buffer_size: job_buffer.maximum_size,
skip_poll_when_buffer_above_threshold: skip_poll_when_buffer_above_threshold,
}
}

return
end

# Figure out what job priorities we have to fill.
priorities = job_buffer.available_priorities

# Only poll when there are workers ready for jobs.
return if priorities.empty?

all_metajobs = []

pollers.each do |poller|
Expand Down
33 changes: 33 additions & 0 deletions spec/que/locker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,39 @@ def run

locker.stop!
end

it "should skip polling if the buffer is sufficiently full" do
ids = 12.times.map { BlockJob.enqueue(job_options: { priority: 100 }).que_attrs[:id] }

locker_settings[:worker_priorities] = [100, 100, 100]
locker_settings[:maximum_buffer_size] = 8
locker_settings[:skip_poll_when_buffer_above_threshold] = 0.75

locker
3.times { $q1.pop }

# Should have polled once
locker_polled_events = internal_messages(event: 'poller_polled')
assert_equal 1, locker_polled_events.size

# Should have locked first 11 only because there are 8 buffer slots and 3 open workers
assert_equal ids[0...11], locked_ids

# Pretend to have worked 1 job to drop buffer to 7/8 full, should skip polling
$q2.push nil
sleep_until do
locker_polling_skipped_events = internal_messages(event: 'locker_polling_skipped')
locker_polling_skipped_events.any?
end

# No new polling events should have occurred
locker_polled_events = internal_messages(event: 'poller_polled')
assert_equal 1, locker_polled_events.size

3.times { $q2.push nil }

locker.stop!
end
end

describe "when receiving a NOTIFY of a new job" do
Expand Down
Loading