diff --git a/lib/que/command_line_interface.rb b/lib/que/command_line_interface.rb index 7e9f2e67..2d278272 100644 --- a/lib/que/command_line_interface.rb +++ b/lib/que/command_line_interface.rb @@ -18,14 +18,15 @@ def parse( default_require_file: RAILS_ENVIRONMENT_FILE ) - options = {} - queues = [] - log_level = 'info' - log_internals = false - poll_interval = 5 - connection_url = nil - worker_count = nil - worker_priorities = nil + options = {} + queues = [] + log_level = 'info' + log_internals = false + poll_interval = 5 + skip_poll_when_buffer_above_threshold = 1.0 + connection_url = nil + worker_count = nil + worker_priorities = nil parser = OptionParser.new do |opts| @@ -50,6 +51,14 @@ def parse( poll_interval = i end + opts.on( + '--skip-poll-when-buffer-above-threshold [THRESHOLD]', + Float, + "Set threshold for skipping polls based on buffer fullness (default: 1.0)", + ) do |threshold| + skip_poll_when_buffer_above_threshold = threshold + end + opts.on( '--listen [LISTEN]', String, @@ -232,7 +241,8 @@ def parse( options[:queues] = queues_hash end - options[:poll_interval] = poll_interval + options[:poll_interval] = poll_interval + options[:skip_poll_when_buffer_above_threshold] = skip_poll_when_buffer_above_threshold locker = begin diff --git a/lib/que/locker.rb b/lib/que/locker.rb index cb7ec40a..a0aafb62 100644 --- a/lib/que/locker.rb +++ b/lib/que/locker.rb @@ -33,7 +33,7 @@ class << self } class Locker - attr_reader :thread, :workers, :job_buffer, :locks, :queues, :poll_interval + attr_reader :thread, :workers, :job_buffer, :locks, :queues, :poll_interval, :skip_poll_when_buffer_above_threshold MESSAGE_RESOLVERS = {} RESULT_RESOLVERS = {} @@ -47,22 +47,24 @@ class Locker RESULT_RESOLVERS[:job_finished] = -> (messages) { finish_jobs(messages.map{|m| m.fetch(:metajob)}) } - DEFAULT_POLL_INTERVAL = 5.0 - DEFAULT_WAIT_PERIOD = 50 - DEFAULT_MAXIMUM_BUFFER_SIZE = 8 - DEFAULT_WORKER_PRIORITIES = [10, 30, 50, nil, nil, nil].freeze + DEFAULT_POLL_INTERVAL = 5.0 + DEFAULT_SKIP_POLL_WHEN_BUFFER_ABOVE_THRESHOLD = 1.0 + DEFAULT_WAIT_PERIOD = 50 + DEFAULT_MAXIMUM_BUFFER_SIZE = 8 + DEFAULT_WORKER_PRIORITIES = [10, 30, 50, nil, nil, nil].freeze def initialize( - queues: [Que.default_queue], - connection_url: nil, - listen: true, - poll: true, - poll_interval: DEFAULT_POLL_INTERVAL, - wait_period: DEFAULT_WAIT_PERIOD, - maximum_buffer_size: DEFAULT_MAXIMUM_BUFFER_SIZE, - worker_priorities: DEFAULT_WORKER_PRIORITIES, - on_worker_start: nil, - pidfile: nil + queues: [Que.default_queue], + connection_url: nil, + listen: true, + poll: true, + poll_interval: DEFAULT_POLL_INTERVAL, + skip_poll_when_buffer_above_threshold: DEFAULT_SKIP_POLL_WHEN_BUFFER_ABOVE_THRESHOLD, + wait_period: DEFAULT_WAIT_PERIOD, + maximum_buffer_size: DEFAULT_MAXIMUM_BUFFER_SIZE, + worker_priorities: DEFAULT_WORKER_PRIORITIES, + on_worker_start: nil, + pidfile: nil ) # Sanity-check all our arguments, since some users may instantiate Locker @@ -94,13 +96,14 @@ def initialize( Que.internal_log :locker_instantiate, self do { - queues: queues, - listen: listen, - poll: poll, - poll_interval: poll_interval, - wait_period: wait_period, - maximum_buffer_size: maximum_buffer_size, - worker_priorities: worker_priorities, + queues: queues, + listen: listen, + poll: poll, + poll_interval: poll_interval, + skip_poll_when_buffer_above_threshold: skip_poll_when_buffer_above_threshold, + wait_period: wait_period, + maximum_buffer_size: maximum_buffer_size, + worker_priorities: worker_priorities, } end @@ -108,6 +111,7 @@ def initialize( @locks = Set.new @poll_interval = poll_interval + @skip_poll_when_buffer_above_threshold = skip_poll_when_buffer_above_threshold if queues.is_a?(Hash) @queue_names = queues.keys @@ -330,12 +334,22 @@ def poll # enabled). return unless pollers + # Skip polling if the job buffer is sufficiently full. + if job_buffer.size >= job_buffer.maximum_size * skip_poll_when_buffer_above_threshold + Que.internal_log(:locker_polling_skipped, self) { + { + current_buffer_size: job_buffer.size, + max_buffer_size: job_buffer.maximum_size, + skip_poll_when_buffer_above_threshold: skip_poll_when_buffer_above_threshold, + } + } + + return + end + # Figure out what job priorities we have to fill. priorities = job_buffer.available_priorities - # Only poll when there are workers ready for jobs. - return if priorities.empty? - all_metajobs = [] pollers.each do |poller| diff --git a/spec/que/locker_spec.rb b/spec/que/locker_spec.rb index 80b8276f..fefc45ce 100644 --- a/spec/que/locker_spec.rb +++ b/spec/que/locker_spec.rb @@ -429,6 +429,39 @@ def run locker.stop! end + + it "should skip polling if the buffer is sufficiently full" do + ids = 12.times.map { BlockJob.enqueue(job_options: { priority: 100 }).que_attrs[:id] } + + locker_settings[:worker_priorities] = [100, 100, 100] + locker_settings[:maximum_buffer_size] = 8 + locker_settings[:skip_poll_when_buffer_above_threshold] = 0.75 + + locker + 3.times { $q1.pop } + + # Should have polled once + locker_polled_events = internal_messages(event: 'poller_polled') + assert_equal 1, locker_polled_events.size + + # Should have locked first 11 only because there are 8 buffer slots and 3 open workers + assert_equal ids[0...11], locked_ids + + # Pretend to have worked 1 job to drop buffer to 7/8 full, should skip polling + $q2.push nil + sleep_until do + locker_polling_skipped_events = internal_messages(event: 'locker_polling_skipped') + locker_polling_skipped_events.any? + end + + # No new polling events should have occurred + locker_polled_events = internal_messages(event: 'poller_polled') + assert_equal 1, locker_polled_events.size + + 3.times { $q2.push nil } + + locker.stop! + end end describe "when receiving a NOTIFY of a new job" do