Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions lib/que/command_line_interface.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ def parse(
default_require_file: RAILS_ENVIRONMENT_FILE
)

options = {}
queues = []
log_level = 'info'
log_internals = false
poll_interval = 5
connection_url = nil
worker_count = nil
worker_priorities = nil
options = {}
queues = []
log_level = 'info'
log_internals = false
poll_interval = 5
skip_poll_when_buffer_above_threshold = 1.0
connection_url = nil
worker_count = nil
worker_priorities = nil

parser =
OptionParser.new do |opts|
Expand All @@ -50,6 +51,14 @@ def parse(
poll_interval = i
end

opts.on(
'--skip-poll-when-buffer-above-threshold [THRESHOLD]',
Float,
"Set threshold for skipping polls based on buffer fullness (default: 1.0)",
) do |threshold|
skip_poll_when_buffer_above_threshold = threshold
end

opts.on(
'--listen [LISTEN]',
String,
Expand Down Expand Up @@ -232,7 +241,8 @@ def parse(
options[:queues] = queues_hash
end

options[:poll_interval] = poll_interval
options[:poll_interval] = poll_interval
options[:skip_poll_when_buffer_above_threshold] = skip_poll_when_buffer_above_threshold

locker =
begin
Expand Down
64 changes: 39 additions & 25 deletions lib/que/locker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class << self
}

class Locker
attr_reader :thread, :workers, :job_buffer, :locks, :queues, :poll_interval
attr_reader :thread, :workers, :job_buffer, :locks, :queues, :poll_interval, :skip_poll_when_buffer_above_threshold

MESSAGE_RESOLVERS = {}
RESULT_RESOLVERS = {}
Expand All @@ -47,22 +47,24 @@ class Locker
RESULT_RESOLVERS[:job_finished] =
-> (messages) { finish_jobs(messages.map{|m| m.fetch(:metajob)}) }

DEFAULT_POLL_INTERVAL = 5.0
DEFAULT_WAIT_PERIOD = 50
DEFAULT_MAXIMUM_BUFFER_SIZE = 8
DEFAULT_WORKER_PRIORITIES = [10, 30, 50, nil, nil, nil].freeze
DEFAULT_POLL_INTERVAL = 5.0
DEFAULT_SKIP_POLL_WHEN_BUFFER_ABOVE_THRESHOLD = 1.0
DEFAULT_WAIT_PERIOD = 50
DEFAULT_MAXIMUM_BUFFER_SIZE = 8
DEFAULT_WORKER_PRIORITIES = [10, 30, 50, nil, nil, nil].freeze

def initialize(
queues: [Que.default_queue],
connection_url: nil,
listen: true,
poll: true,
poll_interval: DEFAULT_POLL_INTERVAL,
wait_period: DEFAULT_WAIT_PERIOD,
maximum_buffer_size: DEFAULT_MAXIMUM_BUFFER_SIZE,
worker_priorities: DEFAULT_WORKER_PRIORITIES,
on_worker_start: nil,
pidfile: nil
queues: [Que.default_queue],
connection_url: nil,
listen: true,
poll: true,
poll_interval: DEFAULT_POLL_INTERVAL,
skip_poll_when_buffer_above_threshold: DEFAULT_SKIP_POLL_WHEN_BUFFER_ABOVE_THRESHOLD,
wait_period: DEFAULT_WAIT_PERIOD,
maximum_buffer_size: DEFAULT_MAXIMUM_BUFFER_SIZE,
worker_priorities: DEFAULT_WORKER_PRIORITIES,
on_worker_start: nil,
pidfile: nil
)

# Sanity-check all our arguments, since some users may instantiate Locker
Expand Down Expand Up @@ -94,20 +96,22 @@ def initialize(

Que.internal_log :locker_instantiate, self do
{
queues: queues,
listen: listen,
poll: poll,
poll_interval: poll_interval,
wait_period: wait_period,
maximum_buffer_size: maximum_buffer_size,
worker_priorities: worker_priorities,
queues: queues,
listen: listen,
poll: poll,
poll_interval: poll_interval,
skip_poll_when_buffer_above_threshold: skip_poll_when_buffer_above_threshold,
wait_period: wait_period,
maximum_buffer_size: maximum_buffer_size,
worker_priorities: worker_priorities,
}
end

# Local cache of which advisory locks are held by this connection.
@locks = Set.new

@poll_interval = poll_interval
@skip_poll_when_buffer_above_threshold = skip_poll_when_buffer_above_threshold

if queues.is_a?(Hash)
@queue_names = queues.keys
Expand Down Expand Up @@ -330,12 +334,22 @@ def poll
# enabled).
return unless pollers

# Skip polling if the job buffer is sufficiently full.
if job_buffer.size >= job_buffer.maximum_size * skip_poll_when_buffer_above_threshold
Que.internal_log(:locker_polling_skipped, self) {
{
current_buffer_size: job_buffer.size,
max_buffer_size: job_buffer.maximum_size,
skip_poll_when_buffer_above_threshold: skip_poll_when_buffer_above_threshold,
}
}

return
end

# Figure out what job priorities we have to fill.
priorities = job_buffer.available_priorities

# Only poll when there are workers ready for jobs.
return if priorities.empty?

all_metajobs = []

pollers.each do |poller|
Expand Down
33 changes: 33 additions & 0 deletions spec/que/locker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,39 @@ def run

locker.stop!
end

it "should skip polling if the buffer is sufficiently full" do
ids = 12.times.map { BlockJob.enqueue(job_options: { priority: 100 }).que_attrs[:id] }

locker_settings[:worker_priorities] = [100, 100, 100]
locker_settings[:maximum_buffer_size] = 8
locker_settings[:skip_poll_when_buffer_above_threshold] = 0.75

locker
3.times { $q1.pop }

# Should have polled once
locker_polled_events = internal_messages(event: 'poller_polled')
assert_equal 1, locker_polled_events.size

# Should have locked first 11 only because there are 8 buffer slots and 3 open workers
assert_equal ids[0...11], locked_ids

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is 0..11 12 or 11? I think it is 12 right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.. vs ...

> (0..100).to_a[0..11]
=> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]

> (0..100).to_a[0...11]
=> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

This spec is hugely copy-pasted inspired by the one above

que/spec/que/locker_spec.rb

Lines 422 to 423 in a58d350

# Should have locked first 11 only.
assert_equal ids[0...11], locked_ids


# Pretend to have worked 1 job to drop buffer to 7/8 full, should skip polling
$q2.push nil
sleep_until do
locker_polling_skipped_events = internal_messages(event: 'locker_polling_skipped')
locker_polling_skipped_events.any?
end

# No new polling events should have occurred
locker_polled_events = internal_messages(event: 'poller_polled')
assert_equal 1, locker_polled_events.size

3.times { $q2.push nil }
Copy link
Collaborator Author

@tomgi tomgi Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No idea what it does, but 🙈 since it hangs without it and 5 other tests also have it 🤷‍♂️

I'm going to follow the existing pattern and resist shaving this yak 🙂

Probably something to do with this mutex

@cv.wait(mutex)
that push signals

que/lib/que/job_buffer.rb

Lines 248 to 252 in 1d66a70

def _push(item)
Que.assert(waiting_count > 0)
@items << item
@cv.signal
end


locker.stop!
end
end

describe "when receiving a NOTIFY of a new job" do
Expand Down
Loading