From af01d50a73879be4a7b3e1d14d912d6755a0a1f7 Mon Sep 17 00:00:00 2001 From: Stephen Binns Date: Tue, 17 May 2022 15:05:01 +0100 Subject: [PATCH 1/4] Improve specs and add test for error handling --- lib/que/active_support/job_middleware.rb | 26 +++++++++ .../que/active_support/job_middleware_spec.rb | 53 +++++++++++++++++++ spec/spec_helper.rb | 1 + 3 files changed, 80 insertions(+) create mode 100644 lib/que/active_support/job_middleware.rb create mode 100644 spec/que/active_support/job_middleware_spec.rb diff --git a/lib/que/active_support/job_middleware.rb b/lib/que/active_support/job_middleware.rb new file mode 100644 index 00000000..e239618b --- /dev/null +++ b/lib/que/active_support/job_middleware.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +module Que + module ActiveSupport + module JobMiddleware + def self.call(job) + labels = { + job_class: job.que_attrs[:job_class], + priority: job.que_attrs[:priority], + queue: job.que_attrs[:queue], + latency: job.que_attrs[:latency], + } + + started = Process.clock_gettime(Process::CLOCK_MONOTONIC) + yield + ensure + ::ActiveSupport::Notifications.publish( + "que_job.worked", + started, + Process.clock_gettime(Process::CLOCK_MONOTONIC), + labels.merge(error: job.que_error.present?), + ) + end + end + end +end diff --git a/spec/que/active_support/job_middleware_spec.rb b/spec/que/active_support/job_middleware_spec.rb new file mode 100644 index 00000000..d9f9a98d --- /dev/null +++ b/spec/que/active_support/job_middleware_spec.rb @@ -0,0 +1,53 @@ +# frozen_string_literal: true + +require 'spec_helper' + +if defined?(::ActiveSupport) + require 'que/active_support/job_middleware' + + describe Que::ActiveSupport::JobMiddleware do + let(:job) { Que::Job.new(**labels) } + + let(:labels) do + { + job_class: "Foo", + priority: 100, + queue: "foo_queue", + latency: 100, + } + end + + it "records metrics when job succeeds" do + called = false + subscriber = ::ActiveSupport::Notifications.subscribe("que_job.worked") do |message, started, finished, metric_labels| + assert_equal "que_job.worked", message + assert started != nil + assert finished != nil + assert_equal labels.merge(error: false), metric_labels + called = true + end + + Que::ActiveSupport::JobMiddleware.call(job) { } + + assert_equal true, called + + ::ActiveSupport::Notifications.unsubscribe(subscriber) + end + + it "records metrics when job fails" do + called = false + subscriber = ::ActiveSupport::Notifications.subscribe("que_job.worked") do |message, started, finished, metric_labels| + assert_equal "que_job.worked", message + assert started != nil + assert finished != nil + assert_equal labels.merge(error: true), metric_labels + called = true + end + + Que::ActiveSupport::JobMiddleware.call(job) { job.que_error = "error" } + + assert_equal true, called + ::ActiveSupport::Notifications.unsubscribe(subscriber) + end + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index dc775c82..1fe948be 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -8,6 +8,7 @@ # in some spec runs. if ENV['USE_RAILS'] == 'true' require 'active_record' + require 'active_support/notifications' require 'active_job' ActiveJob::Base.queue_adapter = :que From a389b207d0d563a97883bb08ad87a94f6c6f146f Mon Sep 17 00:00:00 2001 From: Stephen Binns Date: Mon, 16 May 2022 15:50:25 +0100 Subject: [PATCH 2/4] Expose latency in polling --- lib/que/poller.rb | 2 ++ spec/que/poller_spec.rb | 1 + 2 files changed, 3 insertions(+) diff --git a/lib/que/poller.rb b/lib/que/poller.rb index 520879fd..1035396b 100644 --- a/lib/que/poller.rb +++ b/lib/que/poller.rb @@ -63,6 +63,7 @@ class Poller SELECT (j).*, l.locked, + extract(epoch from (now() - (j).run_at)) as latency, l.remaining_priorities FROM ( SELECT j @@ -81,6 +82,7 @@ class Poller SELECT (j).*, l.locked, + extract(epoch from (now() - (j).run_at)) as latency, l.remaining_priorities FROM ( SELECT diff --git a/spec/que/poller_spec.rb b/spec/que/poller_spec.rb index 57d39bd8..d9bf5aa7 100644 --- a/spec/que/poller_spec.rb +++ b/spec/que/poller_spec.rb @@ -53,6 +53,7 @@ def poll( metajobs.each do |metajob| # Make sure we pull in run_at timestamps in iso8601 format. assert_match(Que::TIME_REGEX, metajob.job[:run_at]) + assert metajob.job[:latency].to_f > 0 end returned_job_ids = metajobs.map(&:id) From b5a22ac72be826d9e71f2df48f717b6c36586ccc Mon Sep 17 00:00:00 2001 From: Stephen Binns Date: Mon, 16 May 2022 15:50:39 +0100 Subject: [PATCH 3/4] Job metrics and subscribers --- docs/README.md | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/docs/README.md b/docs/README.md index 31a72b50..4d6cd19f 100644 --- a/docs/README.md +++ b/docs/README.md @@ -798,6 +798,32 @@ Que.job_middleware.push( ) ``` +#### Existing Middleware + +Que ships with middleware to expose job metrics using ActiveSupport notifications to subscribe to it you can implelent the following + +```ruby +::ActiveSupport::Notifications.subscribe("que_job.worked") do |message, started, finished, labels| + # do something with notification. +end +``` + +`started` and `finished` are numeric values representing a monotonic clock so can +be used for timing calculations without concerning ourselves with the system clock. + +`labels` is a hash containing the following keys + +* `job_class` - the class of the job. +* `queue` - the queue this job was queued into. +* `priority` - the priority of this job. +* `latency` - the amount of time this job was waiting in the queue for. + +To use this middleware you will have to initialize it with Que + +```ruby +Que.job_middleware.push(Que::ActiveSupport::JobMiddleware) +``` + ### Defining Middleware For SQL statements SQL middleware wraps queries that Que executes, or which you might decide to execute via Que.execute(). You can use hook this into NewRelic or a similar service to instrument how long SQL queries take, for example. From a32ec578a19e692a4f09ce63d061c58b7b01c41a Mon Sep 17 00:00:00 2001 From: Matt Brown Date: Tue, 30 Aug 2022 17:24:01 +0200 Subject: [PATCH 4/4] make the current running Locker retrievable via Que.locker --- lib/que/locker.rb | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/lib/que/locker.rb b/lib/que/locker.rb index 013393b6..d16ec870 100644 --- a/lib/que/locker.rb +++ b/lib/que/locker.rb @@ -7,6 +7,10 @@ require 'set' module Que + class << self + attr_accessor :locker + end + Listener::MESSAGE_FORMATS[:job_available] = { queue: String, @@ -71,6 +75,11 @@ def initialize( Que.assert Array, worker_priorities worker_priorities.each { |p| Que.assert([Integer, NilClass], p) } + # We assign this globally because we only ever expect on locker to be + # created per worker process. This can be used by middleware or external + # code to access the locker during runtime. + Que.locker = self + # We use a JobBuffer to track jobs and pass them to workers, and a # ResultQueue to receive messages from workers. @job_buffer = JobBuffer.new(