Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,32 @@ Que.job_middleware.push(
)
```

#### Existing Middleware

Que ships with middleware to expose job metrics using ActiveSupport notifications to subscribe to it you can implelent the following
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*implement ;)


```ruby
::ActiveSupport::Notifications.subscribe("que_job.worked") do |message, started, finished, labels|
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just looked this up, and it seems that .subscribe doesn't have monotonic timestamps; .monotonic_subscribe does. Whichever you want to use here is fine, but let's document it correctly.

Copy link
Member

@ZimbiX ZimbiX May 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, you are passing monotonic times from the middleware... I guess I don't understand how this works then 😅

# do something with notification.
end
```

`started` and `finished` are numeric values representing a monotonic clock so can
be used for timing calculations without concerning ourselves with the system clock.

`labels` is a hash containing the following keys

* `job_class` - the class of the job.
* `queue` - the queue this job was queued into.
* `priority` - the priority of this job.
* `latency` - the amount of time this job was waiting in the queue for.

To use this middleware you will have to initialize it with Que

```ruby
Que.job_middleware.push(Que::ActiveSupport::JobMiddleware)
```

### Defining Middleware For SQL statements

SQL middleware wraps queries that Que executes, or which you might decide to execute via Que.execute(). You can use hook this into NewRelic or a similar service to instrument how long SQL queries take, for example.
Expand Down
26 changes: 26 additions & 0 deletions lib/que/active_support/job_middleware.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# frozen_string_literal: true

module Que
module ActiveSupport
module JobMiddleware
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some usage documentation would be in order, I reckon.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have added to readme do you think its better here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, nah, I just wanted to start a thread so it'd be able to be marked resolved 😆

Cheers for adding that; looks good

def self.call(job)
labels = {
job_class: job.que_attrs[:job_class],
priority: job.que_attrs[:priority],
queue: job.que_attrs[:queue],
latency: job.que_attrs[:latency],

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be also useful to have a duration published, so we know both how long the job was waiting in the queue, and how long it took to execute. WDYT?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Execution duration can be calculated using start and end currently, we could make this more intuitive by doing that for consumers

}

started = Process.clock_gettime(Process::CLOCK_MONOTONIC)
yield
ensure
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like a test for the exception case. But is publishing a que_job.worked notification even upon an exception really what you want? I'd have thought that'd mean the job is complete 🤔 There may be an example of this elsewhere..

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good will add one shortly, I think so in terms of the metric were more concerned with how Que is performing rather than if the Jobs were executed correctly.

::ActiveSupport::Notifications.publish(
"que_job.worked",
Copy link

@ameykusurkar ameykusurkar May 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"que_job.worked",
"que_job.job_worked",

Nitpick: what about prefixing the event with job_? This allows us to have different types of events

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

que.job_worked? =P

started,
Process.clock_gettime(Process::CLOCK_MONOTONIC),
labels.merge(error: job.que_error.present?),
)
end
end
end
end
2 changes: 2 additions & 0 deletions lib/que/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class Poller
SELECT
(j).*,
l.locked,
extract(epoch from (now() - (j).run_at)) as latency,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NB, using now() - (j).run_at as latency/delay can be misleading. If a job errors and retries with a backoff then the run_at changes. It would be best to add an "initial_run_at" column to the main jobs table, but that would be a better change....

@hlascelles This is a good point. I think the intention here is to permit ascertaining worker contention - by allowing you to analyse at the delay between when a job is desired to be run at (or re-run at), and when it's actually picked up to be run. In terms of that desire, I think this metric is more useful. But I'm sure there's also merit in #325.

l.remaining_priorities
FROM (
SELECT j
Expand All @@ -81,6 +82,7 @@ class Poller
SELECT
(j).*,
l.locked,
extract(epoch from (now() - (j).run_at)) as latency,
l.remaining_priorities
FROM (
SELECT
Expand Down
53 changes: 53 additions & 0 deletions spec/que/active_support/job_middleware_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# frozen_string_literal: true

require 'spec_helper'

if defined?(::ActiveSupport)
require 'que/active_support/job_middleware'

describe Que::ActiveSupport::JobMiddleware do
let(:job) { Que::Job.new(**labels) }

let(:labels) do
{
job_class: "Foo",
priority: 100,
queue: "foo_queue",
latency: 100,
}
end

it "records metrics when job succeeds" do
called = false
subscriber = ::ActiveSupport::Notifications.subscribe("que_job.worked") do |message, started, finished, metric_labels|
assert_equal "que_job.worked", message
assert started != nil
assert finished != nil
assert_equal labels.merge(error: false), metric_labels
called = true
end

Que::ActiveSupport::JobMiddleware.call(job) { }

assert_equal true, called

::ActiveSupport::Notifications.unsubscribe(subscriber)
end

it "records metrics when job fails" do
called = false
subscriber = ::ActiveSupport::Notifications.subscribe("que_job.worked") do |message, started, finished, metric_labels|
assert_equal "que_job.worked", message
assert started != nil
assert finished != nil
assert_equal labels.merge(error: true), metric_labels
called = true
end

Que::ActiveSupport::JobMiddleware.call(job) { job.que_error = "error" }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I was thinking you'd need to raise within the block here, but you're right - an exception from a job does not bubble up to middleware.

Testing job failure is good, but I was meaning that you test that it's in an ensure - when it encounters an exception. If not caused by the job, perhaps by a subsequent middleware.

For realistic data, I think job.que_error should also be set to an instance of an exception, not that it would have any effect here.

Although.. I've just looked up the term job_worked in the code, and found that logging makes a distinction: job_worked vs job_errored. So maybe this middleware should do the same? Or find a more generic term - something like job_attempted?


assert_equal true, called
::ActiveSupport::Notifications.unsubscribe(subscriber)
end
end
end
1 change: 1 addition & 0 deletions spec/que/poller_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def poll(
metajobs.each do |metajob|
# Make sure we pull in run_at timestamps in iso8601 format.
assert_match(Que::TIME_REGEX, metajob.job[:run_at])
assert metajob.job[:latency].to_f > 0
end

returned_job_ids = metajobs.map(&:id)
Expand Down
1 change: 1 addition & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# in some spec runs.
if ENV['USE_RAILS'] == 'true'
require 'active_record'
require 'active_support/notifications'
require 'active_job'

ActiveJob::Base.queue_adapter = :que
Expand Down