From a58d35030bff2a374d5a2cd4598ba5b288886866 Mon Sep 17 00:00:00 2001 From: Tomasz Gieniusz Date: Tue, 25 Nov 2025 13:40:55 +1100 Subject: [PATCH 1/2] Skip poll based on buffer fullness --- lib/que/command_line_interface.rb | 26 +++++++++---- lib/que/locker.rb | 64 +++++++++++++++++++------------ spec/que/locker_spec.rb | 33 ++++++++++++++++ 3 files changed, 90 insertions(+), 33 deletions(-) diff --git a/lib/que/command_line_interface.rb b/lib/que/command_line_interface.rb index 7e9f2e67..fbc9486a 100644 --- a/lib/que/command_line_interface.rb +++ b/lib/que/command_line_interface.rb @@ -18,14 +18,15 @@ def parse( default_require_file: RAILS_ENVIRONMENT_FILE ) - options = {} - queues = [] - log_level = 'info' - log_internals = false - poll_interval = 5 - connection_url = nil - worker_count = nil - worker_priorities = nil + options = {} + queues = [] + log_level = 'info' + log_internals = false + poll_interval = 5 + poll_buffer_fullness_skip_threshold = 1.0 + connection_url = nil + worker_count = nil + worker_priorities = nil parser = OptionParser.new do |opts| @@ -50,6 +51,14 @@ def parse( poll_interval = i end + opts.on( + '--poll-buffer-fullness-skip-threshold [THRESHOLD]', + Float, + "Set threshold for skipping polls based on buffer fullness (default: 1.0)", + ) do |threshold| + poll_buffer_fullness_skip_threshold = threshold + end + opts.on( '--listen [LISTEN]', String, @@ -233,6 +242,7 @@ def parse( end options[:poll_interval] = poll_interval + options[:poll_buffer_fullness_skip_threshold] = poll_buffer_fullness_skip_threshold locker = begin diff --git a/lib/que/locker.rb b/lib/que/locker.rb index cb7ec40a..7a89a5c9 100644 --- a/lib/que/locker.rb +++ b/lib/que/locker.rb @@ -33,7 +33,7 @@ class << self } class Locker - attr_reader :thread, :workers, :job_buffer, :locks, :queues, :poll_interval + attr_reader :thread, :workers, :job_buffer, :locks, :queues, :poll_interval, :poll_buffer_fullness_skip_threshold MESSAGE_RESOLVERS = {} RESULT_RESOLVERS = {} @@ -47,22 +47,24 @@ class Locker RESULT_RESOLVERS[:job_finished] = -> (messages) { finish_jobs(messages.map{|m| m.fetch(:metajob)}) } - DEFAULT_POLL_INTERVAL = 5.0 - DEFAULT_WAIT_PERIOD = 50 - DEFAULT_MAXIMUM_BUFFER_SIZE = 8 - DEFAULT_WORKER_PRIORITIES = [10, 30, 50, nil, nil, nil].freeze + DEFAULT_POLL_INTERVAL = 5.0 + DEFAULT_POLL_BUFFER_FULLNESS_SKIP_THRESHOLD = 1.0 + DEFAULT_WAIT_PERIOD = 50 + DEFAULT_MAXIMUM_BUFFER_SIZE = 8 + DEFAULT_WORKER_PRIORITIES = [10, 30, 50, nil, nil, nil].freeze def initialize( - queues: [Que.default_queue], - connection_url: nil, - listen: true, - poll: true, - poll_interval: DEFAULT_POLL_INTERVAL, - wait_period: DEFAULT_WAIT_PERIOD, - maximum_buffer_size: DEFAULT_MAXIMUM_BUFFER_SIZE, - worker_priorities: DEFAULT_WORKER_PRIORITIES, - on_worker_start: nil, - pidfile: nil + queues: [Que.default_queue], + connection_url: nil, + listen: true, + poll: true, + poll_interval: DEFAULT_POLL_INTERVAL, + poll_buffer_fullness_skip_threshold: DEFAULT_POLL_BUFFER_FULLNESS_SKIP_THRESHOLD, + wait_period: DEFAULT_WAIT_PERIOD, + maximum_buffer_size: DEFAULT_MAXIMUM_BUFFER_SIZE, + worker_priorities: DEFAULT_WORKER_PRIORITIES, + on_worker_start: nil, + pidfile: nil ) # Sanity-check all our arguments, since some users may instantiate Locker @@ -94,13 +96,14 @@ def initialize( Que.internal_log :locker_instantiate, self do { - queues: queues, - listen: listen, - poll: poll, - poll_interval: poll_interval, - wait_period: wait_period, - maximum_buffer_size: maximum_buffer_size, - worker_priorities: worker_priorities, + queues: queues, + listen: listen, + poll: poll, + poll_interval: poll_interval, + poll_buffer_fullness_skip_threshold: poll_buffer_fullness_skip_threshold, + wait_period: wait_period, + maximum_buffer_size: maximum_buffer_size, + worker_priorities: worker_priorities, } end @@ -108,6 +111,7 @@ def initialize( @locks = Set.new @poll_interval = poll_interval + @poll_buffer_fullness_skip_threshold = poll_buffer_fullness_skip_threshold if queues.is_a?(Hash) @queue_names = queues.keys @@ -330,12 +334,22 @@ def poll # enabled). return unless pollers + # Skip polling if the job buffer is sufficiently full. + if job_buffer.size >= job_buffer.maximum_size * poll_buffer_fullness_skip_threshold + Que.internal_log(:locker_polling_skipped, self) { + { + current_buffer_size: job_buffer.size, + max_buffer_size: job_buffer.maximum_size, + poll_buffer_fullness_skip_threshold: poll_buffer_fullness_skip_threshold, + } + } + + return + end + # Figure out what job priorities we have to fill. priorities = job_buffer.available_priorities - # Only poll when there are workers ready for jobs. - return if priorities.empty? - all_metajobs = [] pollers.each do |poller| diff --git a/spec/que/locker_spec.rb b/spec/que/locker_spec.rb index 80b8276f..2035e721 100644 --- a/spec/que/locker_spec.rb +++ b/spec/que/locker_spec.rb @@ -429,6 +429,39 @@ def run locker.stop! end + + it "should skip polling if the buffer is sufficiently full" do + ids = 12.times.map { BlockJob.enqueue(job_options: { priority: 100 }).que_attrs[:id] } + + locker_settings[:worker_priorities] = [100, 100, 100] + locker_settings[:maximum_buffer_size] = 8 + locker_settings[:poll_buffer_fullness_skip_threshold] = 0.75 + + locker + 3.times { $q1.pop } + + # Should have polled once + locker_polled_events = internal_messages(event: 'poller_polled') + assert_equal 1, locker_polled_events.size + + # Should have locked first 11 only because there are 8 buffer slots and 3 open workers + assert_equal ids[0...11], locked_ids + + # Pretend to have worked 1 job to drop buffer to 7/8 full, should skip polling + $q2.push nil + sleep_until do + locker_polling_skipped_events = internal_messages(event: 'locker_polling_skipped') + locker_polling_skipped_events.any? + end + + # No new polling events should have occurred + locker_polled_events = internal_messages(event: 'poller_polled') + assert_equal 1, locker_polled_events.size + + 3.times { $q2.push nil } + + locker.stop! + end end describe "when receiving a NOTIFY of a new job" do From 1d66a700c9dd47e3c05862aceb5e47a3609e6211 Mon Sep 17 00:00:00 2001 From: Tomasz Gieniusz Date: Tue, 25 Nov 2025 16:59:16 +1100 Subject: [PATCH 2/2] rename param to skip_poll_when_buffer_above_threshold --- lib/que/command_line_interface.rb | 26 +++++++------- lib/que/locker.rb | 56 +++++++++++++++---------------- spec/que/locker_spec.rb | 2 +- 3 files changed, 42 insertions(+), 42 deletions(-) diff --git a/lib/que/command_line_interface.rb b/lib/que/command_line_interface.rb index fbc9486a..2d278272 100644 --- a/lib/que/command_line_interface.rb +++ b/lib/que/command_line_interface.rb @@ -18,15 +18,15 @@ def parse( default_require_file: RAILS_ENVIRONMENT_FILE ) - options = {} - queues = [] - log_level = 'info' - log_internals = false - poll_interval = 5 - poll_buffer_fullness_skip_threshold = 1.0 - connection_url = nil - worker_count = nil - worker_priorities = nil + options = {} + queues = [] + log_level = 'info' + log_internals = false + poll_interval = 5 + skip_poll_when_buffer_above_threshold = 1.0 + connection_url = nil + worker_count = nil + worker_priorities = nil parser = OptionParser.new do |opts| @@ -52,11 +52,11 @@ def parse( end opts.on( - '--poll-buffer-fullness-skip-threshold [THRESHOLD]', + '--skip-poll-when-buffer-above-threshold [THRESHOLD]', Float, "Set threshold for skipping polls based on buffer fullness (default: 1.0)", ) do |threshold| - poll_buffer_fullness_skip_threshold = threshold + skip_poll_when_buffer_above_threshold = threshold end opts.on( @@ -241,8 +241,8 @@ def parse( options[:queues] = queues_hash end - options[:poll_interval] = poll_interval - options[:poll_buffer_fullness_skip_threshold] = poll_buffer_fullness_skip_threshold + options[:poll_interval] = poll_interval + options[:skip_poll_when_buffer_above_threshold] = skip_poll_when_buffer_above_threshold locker = begin diff --git a/lib/que/locker.rb b/lib/que/locker.rb index 7a89a5c9..a0aafb62 100644 --- a/lib/que/locker.rb +++ b/lib/que/locker.rb @@ -33,7 +33,7 @@ class << self } class Locker - attr_reader :thread, :workers, :job_buffer, :locks, :queues, :poll_interval, :poll_buffer_fullness_skip_threshold + attr_reader :thread, :workers, :job_buffer, :locks, :queues, :poll_interval, :skip_poll_when_buffer_above_threshold MESSAGE_RESOLVERS = {} RESULT_RESOLVERS = {} @@ -47,24 +47,24 @@ class Locker RESULT_RESOLVERS[:job_finished] = -> (messages) { finish_jobs(messages.map{|m| m.fetch(:metajob)}) } - DEFAULT_POLL_INTERVAL = 5.0 - DEFAULT_POLL_BUFFER_FULLNESS_SKIP_THRESHOLD = 1.0 - DEFAULT_WAIT_PERIOD = 50 - DEFAULT_MAXIMUM_BUFFER_SIZE = 8 - DEFAULT_WORKER_PRIORITIES = [10, 30, 50, nil, nil, nil].freeze + DEFAULT_POLL_INTERVAL = 5.0 + DEFAULT_SKIP_POLL_WHEN_BUFFER_ABOVE_THRESHOLD = 1.0 + DEFAULT_WAIT_PERIOD = 50 + DEFAULT_MAXIMUM_BUFFER_SIZE = 8 + DEFAULT_WORKER_PRIORITIES = [10, 30, 50, nil, nil, nil].freeze def initialize( - queues: [Que.default_queue], - connection_url: nil, - listen: true, - poll: true, - poll_interval: DEFAULT_POLL_INTERVAL, - poll_buffer_fullness_skip_threshold: DEFAULT_POLL_BUFFER_FULLNESS_SKIP_THRESHOLD, - wait_period: DEFAULT_WAIT_PERIOD, - maximum_buffer_size: DEFAULT_MAXIMUM_BUFFER_SIZE, - worker_priorities: DEFAULT_WORKER_PRIORITIES, - on_worker_start: nil, - pidfile: nil + queues: [Que.default_queue], + connection_url: nil, + listen: true, + poll: true, + poll_interval: DEFAULT_POLL_INTERVAL, + skip_poll_when_buffer_above_threshold: DEFAULT_SKIP_POLL_WHEN_BUFFER_ABOVE_THRESHOLD, + wait_period: DEFAULT_WAIT_PERIOD, + maximum_buffer_size: DEFAULT_MAXIMUM_BUFFER_SIZE, + worker_priorities: DEFAULT_WORKER_PRIORITIES, + on_worker_start: nil, + pidfile: nil ) # Sanity-check all our arguments, since some users may instantiate Locker @@ -96,14 +96,14 @@ def initialize( Que.internal_log :locker_instantiate, self do { - queues: queues, - listen: listen, - poll: poll, - poll_interval: poll_interval, - poll_buffer_fullness_skip_threshold: poll_buffer_fullness_skip_threshold, - wait_period: wait_period, - maximum_buffer_size: maximum_buffer_size, - worker_priorities: worker_priorities, + queues: queues, + listen: listen, + poll: poll, + poll_interval: poll_interval, + skip_poll_when_buffer_above_threshold: skip_poll_when_buffer_above_threshold, + wait_period: wait_period, + maximum_buffer_size: maximum_buffer_size, + worker_priorities: worker_priorities, } end @@ -111,7 +111,7 @@ def initialize( @locks = Set.new @poll_interval = poll_interval - @poll_buffer_fullness_skip_threshold = poll_buffer_fullness_skip_threshold + @skip_poll_when_buffer_above_threshold = skip_poll_when_buffer_above_threshold if queues.is_a?(Hash) @queue_names = queues.keys @@ -335,12 +335,12 @@ def poll return unless pollers # Skip polling if the job buffer is sufficiently full. - if job_buffer.size >= job_buffer.maximum_size * poll_buffer_fullness_skip_threshold + if job_buffer.size >= job_buffer.maximum_size * skip_poll_when_buffer_above_threshold Que.internal_log(:locker_polling_skipped, self) { { current_buffer_size: job_buffer.size, max_buffer_size: job_buffer.maximum_size, - poll_buffer_fullness_skip_threshold: poll_buffer_fullness_skip_threshold, + skip_poll_when_buffer_above_threshold: skip_poll_when_buffer_above_threshold, } } diff --git a/spec/que/locker_spec.rb b/spec/que/locker_spec.rb index 2035e721..fefc45ce 100644 --- a/spec/que/locker_spec.rb +++ b/spec/que/locker_spec.rb @@ -435,7 +435,7 @@ def run locker_settings[:worker_priorities] = [100, 100, 100] locker_settings[:maximum_buffer_size] = 8 - locker_settings[:poll_buffer_fullness_skip_threshold] = 0.75 + locker_settings[:skip_poll_when_buffer_above_threshold] = 0.75 locker 3.times { $q1.pop }