diff --git a/lib/resque-timestamps.rb b/lib/resque-timestamps.rb new file mode 100644 index 0000000..9e3a544 --- /dev/null +++ b/lib/resque-timestamps.rb @@ -0,0 +1 @@ +require 'resque/plugins/timestamps' diff --git a/lib/resque/plugins/job_identity.rb b/lib/resque/plugins/job_identity.rb new file mode 100644 index 0000000..eb3ea24 --- /dev/null +++ b/lib/resque/plugins/job_identity.rb @@ -0,0 +1,37 @@ +require 'digest/sha1' + +module Resque + module Plugin + def before_enqueue_hooks(job) + job.methods.grep(/^before_enqueue/).sort + end + end + + module Plugins + module JobIdentity + # Enqueue a job in Resque and return the associated job_id. + # The returned job_id can be used to refer to the job in the future. + # prepends enqueued job_id to args + def enqueue(*args) + job_id = job_identity(args) + + before_enqueue_hooks = Resque::Plugin.before_enqueue_hooks(self) + before_enqueue_hooks.each do |hook| + send(hook, job_id, *args) + end + + Resque.enqueue(self, job_id, *args) + + job_id + end + + # Override in your job to control the job id. It is passed the same + # arguments as `perform`, that is, your job's payload. + # NOTE: expects a single args array, not a list of args! + def job_identity(args) + Digest::SHA1.hexdigest([ Time.now.to_f, rand, self, args ].join) + end + + end + end +end \ No newline at end of file diff --git a/lib/resque/plugins/meta.rb b/lib/resque/plugins/meta.rb index 1294a89..7bb4f47 100644 --- a/lib/resque/plugins/meta.rb +++ b/lib/resque/plugins/meta.rb @@ -1,5 +1,5 @@ -require 'digest/sha1' require 'resque' +require 'resque/plugins/job_identity' require 'resque/plugins/meta/version' require 'resque/plugins/meta/metadata' @@ -15,90 +15,44 @@ module Plugins # class MyJob # extend Resque::Plugins::Meta # - # def self.perform(meta_id, *args) + # def self.perform(job_id, *args) # heavy_lifting # end # end # - # meta0 = MyJob.enqueue('stuff') - # meta0.enqueued_at # => 'Wed May 19 13:42:41 -0600 2010' - # meta0.meta_id # => '03c9e1a045ad012dd20500264a19273c' - # meta0['foo'] = 'bar' # => 'bar' - # meta0.save + # job_id = MyJob.enqueue('stuff') # => '03c9e1a045ad012dd20500264a19273c' + # meta0 = MyJob.get_meta(job_id) + # meta0.job_id # => '03c9e1a045ad012dd20500264a19273c' + # meta0['foo'] = 'bar' # => 'bar' + # meta0.save # => meta0 # # # later # meta1 = MyJob.get_meta('03c9e1a045ad012dd20500264a19273c') - # meta1.job_class # => MyJob - # meta1.enqueued_at # => 'Wed May 19 13:42:41 -0600 2010' - # meta1['foo'] # => 'bar' + # meta1.job_class # => MyJob + # meta1['foo'] # => 'bar' module Meta + include JobIdentity - # Override in your job to control the metadata id. It is - # passed the same arguments as `perform`, that is, your job's - # payload. - def meta_id(*args) - Digest::SHA1.hexdigest([ Time.now.to_f, rand, self, args ].join) + # Enqueues a job in Resque and return the associated metadata. + # The job_id in the returned object can be used to fetch the + # metadata again in the future. + def before_enqueue_create_metadata(job_id, *args) + Metadata.new(job_id, self).save end - # Override in your job to control the how many seconds a job's + # Override in your job to control the many seconds a job's # metadata will live after it finishes. Defaults to 24 hours. # Return nil or 0 to set them to never expire. def expire_meta_in 24 * 60 * 60 end - # Enqueues a job in Resque and return the association metadata. - # The meta_id in the returned object can be used to fetch the - # metadata again in the future. - def enqueue(*args) - meta = Metadata.new({'meta_id' => meta_id(args), 'job_class' => self.to_s}) - meta.save - Resque.enqueue(self, meta.meta_id, *args) - meta - end - - def store_meta(meta) - key = "meta:#{meta.meta_id}" - json = Resque.encode(meta.data) - Resque.redis.set(key, json) - Resque.redis.expireat("resque:#{key}", meta.expire_at) if meta.expire_at > 0 - meta - end - - # Retrieve the metadata for a given job. If you call this - # from a class that extends Meta, then the metadata will - # only be returned if the metadata for that id is for the - # same class. Explicitly, calling Meta.get_meta(some_id) - # will return the metadata for a job of any type. - def get_meta(meta_id) - key = "meta:#{meta_id}" - if json = Resque.redis.get(key) - data = Resque.decode(json) - if self == Meta || self.to_s == data['job_class'] - Metadata.new(data) - end - end + def get_meta(job_id) + Metadata.get(job_id, self) end module_function :get_meta public :get_meta - def before_perform_meta(meta_id, *args) - if meta = get_meta(meta_id) - meta.start! - end - end - - def after_perform_meta(meta_id, *args) - if meta = get_meta(meta_id) - meta.finish! - end - end - - def on_failure_meta(e, meta_id, *args) - if meta = get_meta(meta_id) - meta.fail! - end - end end end end diff --git a/lib/resque/plugins/meta/metadata.rb b/lib/resque/plugins/meta/metadata.rb index 9a495a0..e6fb0d0 100644 --- a/lib/resque/plugins/meta/metadata.rb +++ b/lib/resque/plugins/meta/metadata.rb @@ -4,124 +4,85 @@ module Resque module Plugins module Meta class Metadata - attr_reader :job_class, :meta_id, :data, :enqueued_at, :expire_in + extend Resque::Helpers + + attr_reader :job_id, :job_class, :data, :expire_in + + # Retrieve the metadata for a given job. If you call this + # from a class that extends Meta, then the metadata will + # only be returned if the metadata for that id is for the + # same class. Explicitly calling Metadata.get(some_id) + # will return the metadata for a job of any type. + def self.get(job_id, job_class = nil) + if data = load(job_id, job_class) + Metadata.new(job_id, data["job_class"], data) + end + end - def initialize(data_hash) - data_hash['enqueued_at'] ||= to_time_format_str(Time.now) - @data = data_hash - @meta_id = data_hash['meta_id'].dup - @enqueued_at = from_time_format_str('enqueued_at') - @job_class = data_hash['job_class'] - if @job_class.is_a?(String) - @job_class = Resque.constantize(data_hash['job_class']) - else - data_hash['job_class'] = @job_class.to_s + def self.store(job_id, data, expire_at) + key = "meta:#{job_id}" + redis.set(key, encode(data)) + redis.expireat("resque:#{key}", expire_at) if expire_at > 0 + end + + def self.load(job_id, job_class = nil) + key = "meta:#{job_id}" + if json = redis.get(key) + data = decode(json) + if !job_class || Meta == job_class || job_class.to_s == data['job_class'] + data + end end + end + + def initialize(job_id, job_class, data = nil) + @job_id = job_id + @job_class = job_class.is_a?(String) ? self.class.constantize(job_class) : job_class @expire_in = @job_class.expire_meta_in || 0 + + @data = data || {} + @data["job_class"] = @job_class.to_s end # Reload the metadata from the store def reload! - if new_meta = job_class.get_meta(meta_id) - @data = new_meta.data + if data = self.class.load(job_id, job_class) + @data = data end self end - # Save the metadata + # Save the metadata. returns self def save - job_class.store_meta(self) + self.class.store(job_id, data, expire_at) self end def [](key) - data[key] + data[key.to_s] end def []=(key, val) data[key.to_s] = val end - def start! - @started_at = Time.now - self['started_at'] = to_time_format_str(@started_at) - save - end - - def started_at - @started_at ||= from_time_format_str('started_at') - end - - def finish! - data['succeeded'] = true unless data.has_key?('succeeded') - @finished_at = Time.now - self['finished_at'] = to_time_format_str(@finished_at) - save - end - - def finished_at - @finished_at ||= from_time_format_str('finished_at') - end - - def expire_at - if finished? && expire_in > 0 - finished_at.to_i + expire_in - else - 0 - end - end - - def enqueued? - !started? + def include?(key) + data.include?(key.to_s) end - def working? - started? && !finished? - end - - def started? - started_at ? true :false - end - - def finished? - finished_at ? true : false - end - - def fail! - self['succeeded'] = false - finish! - end - - def succeeded? - finished? ? self['succeeded'] : nil - end - - def failed? - finished? ? !self['succeeded'] : nil - end - - def seconds_enqueued - (started_at || Time.now).to_f - enqueued_at.to_f - end - - def seconds_processing - if started? - (finished_at || Time.now).to_f - started_at.to_f - else - 0 + # methods in modules can be easily overridden or extended later (with more modules) + module OverridableMethods + def expire_at + if @expire_in && 0 < @expire_in + Time.now.to_i + @expire_in + else + 0 + end end end + include OverridableMethods - protected - - def from_time_format_str(key) - (t = self[key]) && Time.parse(t) - end - - def to_time_format_str(time) - time.utc.iso8601(6) - end - end - end - end -end + end # class Metadata + end # module Meta + end # module Plugins +end # module Resque diff --git a/lib/resque/plugins/timestamps.rb b/lib/resque/plugins/timestamps.rb new file mode 100644 index 0000000..5db8e0a --- /dev/null +++ b/lib/resque/plugins/timestamps.rb @@ -0,0 +1,163 @@ +require 'resque/plugins/meta' + +module Resque + module Plugins + # extend Resque::Plugins::Timestamps to get start/finish/fail timestamps + # and lifecycle query methods on Resque::Plugins::Meta::Metadata + # + # For example: + # + # require 'resque-meta' + # + # class MyJob + # extend Resque::Plugins::Timestamps + # + # def self.perform(job_id, *args) + # heavy_lifting + # end + # end + # + # job_id = MyJob.enqueue('stuff') + # meta0 = MyJob.get_meta(job_id) + # meta0.job_id # => '03c9e1a045ad012dd20500264a19273c' + # meta0.enqueued_at # => 'Wed May 19 13:42:41 -0600 2010' + # meta0.started_at # => nil + # + # # later + # meta1 = MyJob.get_meta(job_id) + # meta1.enqueued_at # => 'Wed May 19 13:42:41 -0600 2010' + # meta1.started_at # => 'Wed May 19 13:42:51 -0600 2010' + # + # # later still + # meta2 = MyJob.get_meta(job_id) + # meta2.started_at # => 'Wed May 19 13:42:51 -0600 2010' + # meta2.finished_at # => 'Wed May 19 13:43:01 -0600 2010' + module Timestamps + include Meta + + def self.extended(base) + Meta::Metadata.send(:include, Timestamps::MetadataExtensions) + end + + def after_enqueue_timestamp(job_id, *args) + meta = get_meta(job_id) and meta.enqueued! + end + + def before_perform_timestamp(job_id, *args) + meta = get_meta(job_id) and meta.started! + end + + def after_perform_timestamp(job_id, *args) + meta = get_meta(job_id) and meta.finished! + end + + def on_failure_timestamp(e, job_id, *args) + meta = get_meta(job_id) and meta.failed! + end + + module MetadataExtensions + def enqueued? + !started? + end + + def working? + started? && !finished? + end + + def started? + !!started_at + end + + def finished? + !!finished_at + end + + def succeeded? + finished? ? self[:succeeded] : nil + end + + def failed? + finished? ? !self[:succeeded] : nil + end + + def enqueued! + self.enqueued_at = Time.now + save + end + + def started! + self.started_at = Time.now + save + end + + def finished! + self[:succeeded] = true unless include?(:succeeded) + self.finished_at = Time.now + save + end + + def failed! + self[:succeeded] = false + finished! + end + + def enqueued_at + @enqueued_at ||= get_timestamp(:enqueued_at) + end + + def started_at + @started_at ||= get_timestamp(:started_at) + end + + def finished_at + @finished_at ||= get_timestamp(:finished_at) + end + + def enqueued_at=(time) + set_timestamp(:enqueued_at, time) + end + + def started_at=(time) + set_timestamp(:started_at, time) + end + + def finished_at=(time) + set_timestamp(:finished_at, time) + end + + def seconds_enqueued + (started_at || Time.now).to_f - enqueued_at.to_f + end + + def seconds_processing + if started? + (finished_at || Time.now).to_f - started_at.to_f + else + 0 + end + end + + # override default implementation to base expiry on finish time + def expire_at + if finished? && @expire_in > 0 + finished_at.to_i + @expire_in + else + super + end + end + + private + + def get_timestamp(name) + time_string = self[name] and Time.parse(time_string) + end + + def set_timestamp(name, time) + self.instance_variable_set("@#{name}", time) + self[name] = time.utc.iso8601(6) + end + + end # module MetadataExtensions + end # module Timestamps + end # module Plugins +end # module Resque diff --git a/test/meta_test.rb b/test/meta_test.rb index df9d73e..5e1803b 100644 --- a/test/meta_test.rb +++ b/test/meta_test.rb @@ -1,4 +1,4 @@ -require File.dirname(__FILE__) + '/test_helper' +require File.expand_path('../test_helper', __FILE__) require 'resque-meta' class MetaJob @@ -9,8 +9,8 @@ def self.expire_meta_in 1 end - def self.perform(meta_id, key, val) - meta = get_meta(meta_id) + def self.perform(job_id, key, val) + meta = get_meta(job_id) meta[key] = val meta.save end @@ -20,7 +20,7 @@ class AnotherJob extend Resque::Plugins::Meta @queue = :test - def self.perform(meta_id) + def self.perform(job_id) end end @@ -32,20 +32,11 @@ def self.expire_meta_in 1 end - def self.perform(meta_id, key, val) - meta = get_meta(meta_id) + def self.perform(job_id, key, val) + meta = get_meta(job_id) meta[key] = val meta.save - sleep 1 - end -end - -class FailingJob - extend Resque::Plugins::Meta - @queue = :test - - def self.perform(*args) - raise 'boom' + sleep 0.5 end end @@ -72,98 +63,69 @@ def test_resque_version def test_enqueued_metadata now = Time.now - meta = MetaJob.enqueue('foo', 'bar') + job_id = MetaJob.enqueue('foo', 'bar') + assert_not_nil job_id + meta = MetaJob.get_meta(job_id) assert_not_nil meta - assert_not_nil meta.meta_id - assert meta.enqueued_at.to_f > now.to_f, "#{meta.enqueued_at} should be after #{now}" - assert meta.seconds_enqueued > 0.0, "seconds_enqueued should be greater than zero" - assert meta.enqueued? - assert !meta.started? - assert_equal 0, meta.seconds_processing - assert !meta.finished? + assert_not_nil meta.job_id assert_nil meta['foo'] assert_equal Resque::Plugins::Meta::Metadata, meta.class assert_equal MetaJob, meta.job_class end def test_processed_job - meta = MetaJob.enqueue('foo', 'bar') + job_id = MetaJob.enqueue('foo', 'bar') + meta = MetaJob.get_meta(job_id) assert_nil meta['foo'] worker = Resque::Worker.new(:test) worker.work(0) - meta = MetaJob.get_meta(meta.meta_id) + meta = MetaJob.get_meta(job_id) assert_equal MetaJob, meta.job_class - assert meta.started? - assert meta.finished?, 'Job should be finished' - assert meta.succeeded?, 'Job should have succeeded' - assert !meta.enqueued? - assert meta.seconds_enqueued > 0.0, "seconds_enqueued should be greater than zero" - assert meta.seconds_processing > 0.0, "seconds_processing should be greater than zero" assert_equal 'bar', meta['foo'], "'foo' not found in #{meta.inspect}" end def test_wrong_id_for_class - meta = MetaJob.enqueue('foo', 'bar') + job_id = MetaJob.enqueue('foo', 'bar') - assert_nil AnotherJob.get_meta(meta.meta_id) - assert_not_nil Resque::Plugins::Meta.get_meta(meta.meta_id) + assert_nil AnotherJob.get_meta(job_id) + assert_not_nil Resque::Plugins::Meta.get_meta(job_id) + assert_not_nil Resque::Plugins::Meta::Metadata.get(job_id) end def test_expired_metadata - meta = MetaJob.enqueue('foo', 'bar') + job_id = MetaJob.enqueue('foo', 'bar') worker = Resque::Worker.new(:test) worker.work(0) sleep 2 - meta = MetaJob.get_meta(meta.meta_id) - assert_nil meta + assert_nil MetaJob.get_meta(job_id) end def test_slow_job - meta = SlowJob.enqueue('foo', 'bar') + job_id = SlowJob.enqueue('foo', 'bar') worker = Resque::Worker.new(:test) thread = Thread.new { worker.work(0) } sleep 0.1 - meta = SlowJob.get_meta(meta.meta_id) - assert !meta.enqueued? - assert meta.started? - assert meta.working? - assert !meta.finished? + meta = SlowJob.get_meta(job_id) + assert_not_nil meta thread.join # job should be done meta.reload! - assert !meta.enqueued? - assert meta.started? - assert !meta.working? - assert meta.finished? - assert meta.succeeded? - assert !meta.failed? - sleep 2 - assert_nil Resque::Plugins::Meta.get_meta(meta.meta_id) - end - - def test_failing_job - meta = FailingJob.enqueue() - assert_nil meta.failed? - worker = Resque::Worker.new(:test) - worker.work(0) - - meta.reload! - assert meta.finished? - assert meta.failed? - assert !meta.succeeded? + sleep 2 # metadata should be expired + assert_nil Resque::Plugins::Meta.get_meta(job_id) end def test_saving_additional_metadata - meta = MetaJob.enqueue('stuff') + job_id = MetaJob.enqueue('stuff') + meta = MetaJob.get_meta(job_id) meta['foo'] = 'bar' meta.save # later - meta = MetaJob.get_meta(meta.meta_id) + meta = MetaJob.get_meta(job_id) assert_equal 'bar', meta['foo'] end end diff --git a/test/test_helper.rb b/test/test_helper.rb index 061f1a3..3735f58 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -1,5 +1,5 @@ -dir = File.dirname(File.expand_path(__FILE__)) -$LOAD_PATH.unshift dir + '/../lib' +test_dir = File.dirname(__FILE__) +$LOAD_PATH << File.expand_path('../lib', test_dir) require 'test/unit' require 'rubygems' require 'resque' @@ -27,11 +27,11 @@ pid = `ps -A -o pid,command | grep [r]edis-test`.split(" ")[0] puts "Killing test redis server[#{pid}]..." - `rm -f #{dir}/dump.rdb` + `rm -f #{test_dir}/dump.rdb` Process.kill("KILL", pid.to_i) exit exit_code end puts "Starting redis for testing at localhost:9736..." -`redis-server #{dir}/redis-test.conf` +`redis-server #{test_dir}/redis-test.conf` Resque.redis = 'localhost:9736' diff --git a/test/timestamps_test.rb b/test/timestamps_test.rb new file mode 100644 index 0000000..64f015e --- /dev/null +++ b/test/timestamps_test.rb @@ -0,0 +1,130 @@ +require File.expand_path('../test_helper', __FILE__) +require 'resque-timestamps' + +class TimestampsTest < Test::Unit::TestCase + class TimedJob + extend Resque::Plugins::Timestamps + @queue = :timed + + def self.perform(job_id, key, val) + meta = get_meta(job_id) + meta[key] = val + meta.save + end + end + + class AnotherTimedJob + extend Resque::Plugins::Timestamps + @queue = :timed + + def self.perform(job_id) + end + end + + class SlowTimedJob + extend Resque::Plugins::Timestamps + @queue = :timed + + def self.expire_meta_in + 1 + end + + def self.perform(job_id, key, val) + meta = get_meta(job_id) + meta[key] = val + meta.save + sleep 0.5 + end + end + + class FailingTimedJob + extend Resque::Plugins::Timestamps + @queue = :timed + + def self.perform(*args) + raise 'boom' + end + end + + + def setup + Resque.redis.flushall + end + + def test_lint + assert_nothing_raised do + Resque::Plugin.lint(Resque::Plugins::Timestamps) + end + end + + def test_enqueued_metadata + now = Time.now + job_id = TimedJob.enqueue('foo', 'bar') + meta = TimedJob.get_meta(job_id) + assert meta.enqueued_at.to_f > now.to_f, "#{meta.enqueued_at} should be after #{now}" + assert meta.seconds_enqueued > 0.0, "seconds_enqueued should be greater than zero" + assert meta.enqueued? + assert !meta.started? + assert_equal 0, meta.seconds_processing + assert !meta.finished? + assert_equal Resque::Plugins::Meta::Metadata, meta.class + assert_equal TimedJob, meta.job_class + end + + def test_processed_job + job_id = TimedJob.enqueue('foo', 'bar') + meta = TimedJob.get_meta(job_id) + assert_nil meta['foo'] + worker = Resque::Worker.new(:timed) + worker.work(0) + + meta = TimedJob.get_meta(job_id) + assert_equal TimedJob, meta.job_class + assert meta.started?, 'Job should have started' + assert meta.finished?, 'Job should be finished' + assert meta.succeeded?, 'Job should have succeeded' + assert !meta.enqueued?, 'Job should have been removed from the queue' + assert meta.seconds_enqueued > 0.0, "seconds_enqueued should be greater than zero" + assert meta.seconds_processing > 0.0, "seconds_processing should be greater than zero" + end + + def test_slow_job + job_id = SlowTimedJob.enqueue('foo', 'bar') + worker = Resque::Worker.new(:timed) + thread = Thread.new { worker.work(0) } + + sleep 0.2 + meta = SlowTimedJob.get_meta(job_id) + assert !meta.enqueued? + assert meta.started? + assert meta.working? + assert !meta.finished? + + thread.join # job should be done + meta.reload! + assert !meta.enqueued? + assert meta.started? + assert !meta.working? + assert meta.finished? + assert meta.succeeded? + assert !meta.failed? + + sleep 2 + assert_nil Resque::Plugins::Meta.get_meta(job_id) + end + + def test_failing_job + job_id = FailingTimedJob.enqueue() + meta = FailingTimedJob.get_meta(job_id) + assert_nil meta.failed? + worker = Resque::Worker.new(:timed) + worker.work(0) + + sleep 0.5 + meta.reload! + assert meta.finished? + assert meta.failed? + assert !meta.succeeded? + end + +end