From f19bc178b111d5cfcd3f63dd74104e487a409820 Mon Sep 17 00:00:00 2001 From: Emmanuel Gomez Date: Fri, 18 Mar 2011 12:24:44 -0700 Subject: [PATCH 01/31] Move some responsibilities from Meta to Meta::Metadata. --- lib/resque/plugins/meta.rb | 23 ++---------------- lib/resque/plugins/meta/metadata.rb | 36 ++++++++++++++++++++++++----- test/meta_test.rb | 1 + 3 files changed, 33 insertions(+), 27 deletions(-) diff --git a/lib/resque/plugins/meta.rb b/lib/resque/plugins/meta.rb index 1294a89..c9a87df 100644 --- a/lib/resque/plugins/meta.rb +++ b/lib/resque/plugins/meta.rb @@ -51,33 +51,14 @@ def expire_meta_in # The meta_id in the returned object can be used to fetch the # metadata again in the future. def enqueue(*args) - meta = Metadata.new({'meta_id' => meta_id(args), 'job_class' => self.to_s}) + meta = Metadata.new('meta_id' => meta_id(args), 'job_class' => self.to_s) meta.save Resque.enqueue(self, meta.meta_id, *args) meta end - def store_meta(meta) - key = "meta:#{meta.meta_id}" - json = Resque.encode(meta.data) - Resque.redis.set(key, json) - Resque.redis.expireat("resque:#{key}", meta.expire_at) if meta.expire_at > 0 - meta - end - - # Retrieve the metadata for a given job. If you call this - # from a class that extends Meta, then the metadata will - # only be returned if the metadata for that id is for the - # same class. Explicitly, calling Meta.get_meta(some_id) - # will return the metadata for a job of any type. def get_meta(meta_id) - key = "meta:#{meta_id}" - if json = Resque.redis.get(key) - data = Resque.decode(json) - if self == Meta || self.to_s == data['job_class'] - Metadata.new(data) - end - end + Metadata.get(meta_id, self) end module_function :get_meta public :get_meta diff --git a/lib/resque/plugins/meta/metadata.rb b/lib/resque/plugins/meta/metadata.rb index 9a495a0..50e19df 100644 --- a/lib/resque/plugins/meta/metadata.rb +++ b/lib/resque/plugins/meta/metadata.rb @@ -6,6 +6,29 @@ module Meta class Metadata attr_reader :job_class, :meta_id, :data, :enqueued_at, :expire_in + def self.store(meta) + key = "meta:#{meta.meta_id}" + json = Resque.encode(meta.data) + Resque.redis.set(key, json) + Resque.redis.expireat("resque:#{key}", meta.expire_at) if meta.expire_at > 0 + meta + end + + # Retrieve the metadata for a given job. If you call this + # from a class that extends Meta, then the metadata will + # only be returned if the metadata for that id is for the + # same class. Explicitly, calling Metadata.get(some_id) + # will return the metadata for a job of any type. + def self.get(meta_id, job_class = nil) + key = "meta:#{meta_id}" + if json = Resque.redis.get(key) + data = Resque.decode(json) + if !job_class || Meta == job_class || job_class.to_s == data['job_class'] + Metadata.new(data) + end + end + end + def initialize(data_hash) data_hash['enqueued_at'] ||= to_time_format_str(Time.now) @data = data_hash @@ -22,15 +45,15 @@ def initialize(data_hash) # Reload the metadata from the store def reload! - if new_meta = job_class.get_meta(meta_id) + if new_meta = self.class.get(meta_id, job_class.to_s) @data = new_meta.data end self end - # Save the metadata + # Save the metadata. returns self def save - job_class.store_meta(self) + self.class.store(self) self end @@ -80,11 +103,11 @@ def working? end def started? - started_at ? true :false + !!started_at end def finished? - finished_at ? true : false + !!finished_at end def fail! @@ -112,7 +135,7 @@ def seconds_processing end end - protected + protected def from_time_format_str(key) (t = self[key]) && Time.parse(t) @@ -121,6 +144,7 @@ def from_time_format_str(key) def to_time_format_str(time) time.utc.iso8601(6) end + end end end diff --git a/test/meta_test.rb b/test/meta_test.rb index df9d73e..f4b2a0b 100644 --- a/test/meta_test.rb +++ b/test/meta_test.rb @@ -108,6 +108,7 @@ def test_wrong_id_for_class assert_nil AnotherJob.get_meta(meta.meta_id) assert_not_nil Resque::Plugins::Meta.get_meta(meta.meta_id) + assert_not_nil Resque::Plugins::Meta::Metadata.get(meta.meta_id) end def test_expired_metadata From 5680616fa55f0bf0e050d696514083f89a1eb8e9 Mon Sep 17 00:00:00 2001 From: Emmanuel Gomez Date: Fri, 18 Mar 2011 12:35:45 -0700 Subject: [PATCH 02/31] Move timestamp responsibility out of Meta into new Timestamps plugin. Currently extending Meta with the new Timestamps plugin, so no API change for now. Down the road, this implies an additional line ('extend Resque::Plugins::Timestamps') for users who want Timestamps, but it makes them optional, which is nice since they add 3 redis calls per job execution. --- lib/resque/plugins/meta.rb | 19 ++----------------- lib/resque/plugins/timestamps.rb | 27 +++++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 17 deletions(-) create mode 100644 lib/resque/plugins/timestamps.rb diff --git a/lib/resque/plugins/meta.rb b/lib/resque/plugins/meta.rb index c9a87df..ce00acc 100644 --- a/lib/resque/plugins/meta.rb +++ b/lib/resque/plugins/meta.rb @@ -32,6 +32,8 @@ module Plugins # meta1.enqueued_at # => 'Wed May 19 13:42:41 -0600 2010' # meta1['foo'] # => 'bar' module Meta + # drop this extend and allow users to opt-in for timestamps + extend Timestamps # Override in your job to control the metadata id. It is # passed the same arguments as `perform`, that is, your job's @@ -63,23 +65,6 @@ def get_meta(meta_id) module_function :get_meta public :get_meta - def before_perform_meta(meta_id, *args) - if meta = get_meta(meta_id) - meta.start! - end - end - - def after_perform_meta(meta_id, *args) - if meta = get_meta(meta_id) - meta.finish! - end - end - - def on_failure_meta(e, meta_id, *args) - if meta = get_meta(meta_id) - meta.fail! - end - end end end end diff --git a/lib/resque/plugins/timestamps.rb b/lib/resque/plugins/timestamps.rb new file mode 100644 index 0000000..daa30a8 --- /dev/null +++ b/lib/resque/plugins/timestamps.rb @@ -0,0 +1,27 @@ +# After extending Resque::Plugins::Meta, +# extend Resque::Plugins::Timestamps to get start/finish/fail timestamps +module Resque + module Plugins + module Timestamps + + def before_perform_meta(meta_id, *args) + if meta = Metadata.get(meta_id, self) + meta.start! + end + end + + def after_perform_meta(meta_id, *args) + if meta = Metadata.get(meta_id, self) + meta.finish! + end + end + + def on_failure_meta(e, meta_id, *args) + if meta = Metadata.get(meta_id, self) + meta.fail! + end + end + + end + end +end \ No newline at end of file From 79cf5b1338103381e648d2cde872038cbc86eac5 Mon Sep 17 00:00:00 2001 From: Emmanuel Gomez Date: Sat, 19 Mar 2011 02:36:33 -0700 Subject: [PATCH 03/31] Fix a bug I introduced. --- lib/resque/plugins/meta/metadata.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/resque/plugins/meta/metadata.rb b/lib/resque/plugins/meta/metadata.rb index 50e19df..275cec5 100644 --- a/lib/resque/plugins/meta/metadata.rb +++ b/lib/resque/plugins/meta/metadata.rb @@ -45,7 +45,7 @@ def initialize(data_hash) # Reload the metadata from the store def reload! - if new_meta = self.class.get(meta_id, job_class.to_s) + if new_meta = self.class.get(meta_id, job_class) @data = new_meta.data end self From f63c7d12b6a330802f6671c0647e02a8a35cee0f Mon Sep 17 00:00:00 2001 From: Emmanuel Gomez Date: Sat, 19 Mar 2011 03:33:28 -0700 Subject: [PATCH 04/31] Oops, not going far without that require. --- lib/resque/plugins/meta.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/resque/plugins/meta.rb b/lib/resque/plugins/meta.rb index ce00acc..9cd690e 100644 --- a/lib/resque/plugins/meta.rb +++ b/lib/resque/plugins/meta.rb @@ -2,6 +2,7 @@ require 'resque' require 'resque/plugins/meta/version' require 'resque/plugins/meta/metadata' +require 'resque/plugins/timestamps' module Resque module Plugins From c310752077aa312655e7afe5dbd639bf1ac26b5f Mon Sep 17 00:00:00 2001 From: Emmanuel Gomez Date: Sat, 19 Mar 2011 03:34:19 -0700 Subject: [PATCH 05/31] Update Metadata.store interface. --- lib/resque/plugins/meta/metadata.rb | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/lib/resque/plugins/meta/metadata.rb b/lib/resque/plugins/meta/metadata.rb index 275cec5..3436280 100644 --- a/lib/resque/plugins/meta/metadata.rb +++ b/lib/resque/plugins/meta/metadata.rb @@ -6,12 +6,10 @@ module Meta class Metadata attr_reader :job_class, :meta_id, :data, :enqueued_at, :expire_in - def self.store(meta) - key = "meta:#{meta.meta_id}" - json = Resque.encode(meta.data) - Resque.redis.set(key, json) - Resque.redis.expireat("resque:#{key}", meta.expire_at) if meta.expire_at > 0 - meta + def self.store(meta_id, data, expire_at) + key = "meta:#{meta_id}" + Resque.redis.set(key, Resque.encode(data)) + Resque.redis.expireat("resque:#{key}", expire_at) if expire_at > 0 end # Retrieve the metadata for a given job. If you call this @@ -53,7 +51,7 @@ def reload! # Save the metadata. returns self def save - self.class.store(self) + self.class.store(meta_id, data, expire_at) self end From 08388be2a0a7e26b10220a8d394c2a17af2efd8b Mon Sep 17 00:00:00 2001 From: Emmanuel Gomez Date: Sat, 19 Mar 2011 03:35:53 -0700 Subject: [PATCH 06/31] Strip Metadata down to the minimum, put more responsibility on Timestamps. --- lib/resque/plugins/meta/metadata.rb | 71 ++------------------------ lib/resque/plugins/timestamps.rb | 78 +++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 67 deletions(-) diff --git a/lib/resque/plugins/meta/metadata.rb b/lib/resque/plugins/meta/metadata.rb index 3436280..412627b 100644 --- a/lib/resque/plugins/meta/metadata.rb +++ b/lib/resque/plugins/meta/metadata.rb @@ -63,73 +63,10 @@ def []=(key, val) data[key.to_s] = val end - def start! - @started_at = Time.now - self['started_at'] = to_time_format_str(@started_at) - save - end - - def started_at - @started_at ||= from_time_format_str('started_at') - end - - def finish! - data['succeeded'] = true unless data.has_key?('succeeded') - @finished_at = Time.now - self['finished_at'] = to_time_format_str(@finished_at) - save - end - - def finished_at - @finished_at ||= from_time_format_str('finished_at') - end - - def expire_at - if finished? && expire_in > 0 - finished_at.to_i + expire_in - else - 0 - end - end - - def enqueued? - !started? - end - - def working? - started? && !finished? - end - - def started? - !!started_at - end - - def finished? - !!finished_at - end - - def fail! - self['succeeded'] = false - finish! - end - - def succeeded? - finished? ? self['succeeded'] : nil - end - - def failed? - finished? ? !self['succeeded'] : nil - end - - def seconds_enqueued - (started_at || Time.now).to_f - enqueued_at.to_f - end - - def seconds_processing - if started? - (finished_at || Time.now).to_f - started_at.to_f - else - 0 + # methods in modules can be easily overridden or extended later (with more modules) + include module InstanceMethods + def expire_at + Time.now.to_i + expire_in end end diff --git a/lib/resque/plugins/timestamps.rb b/lib/resque/plugins/timestamps.rb index daa30a8..37c3132 100644 --- a/lib/resque/plugins/timestamps.rb +++ b/lib/resque/plugins/timestamps.rb @@ -1,5 +1,6 @@ # After extending Resque::Plugins::Meta, # extend Resque::Plugins::Timestamps to get start/finish/fail timestamps +# and lifecycle query methods on Resque::Plugins::Meta::Metadata module Resque module Plugins module Timestamps @@ -22,6 +23,83 @@ def on_failure_meta(e, meta_id, *args) end end + module Metadata + module InstanceMethods + def start! + @started_at = Time.now + self['started_at'] = to_time_format_str(@started_at) + save + end + + def started_at + @started_at ||= from_time_format_str('started_at') + end + + def finish! + data['succeeded'] = true unless data.has_key?('succeeded') + @finished_at = Time.now + self['finished_at'] = to_time_format_str(@finished_at) + save + end + + def finished_at + @finished_at ||= from_time_format_str('finished_at') + end + + def expire_at + if finished? && expire_in > 0 + finished_at.to_i + expire_in + else + 0 + end + end + + def enqueued? + !started? + end + + def working? + started? && !finished? + end + + def started? + !!started_at + end + + def finished? + !!finished_at + end + + def fail! + self['succeeded'] = false + finish! + end + + def succeeded? + finished? ? self['succeeded'] : nil + end + + def failed? + finished? ? !self['succeeded'] : nil + end + + def seconds_enqueued + (started_at || Time.now).to_f - enqueued_at.to_f + end + + def seconds_processing + if started? + (finished_at || Time.now).to_f - started_at.to_f + else + 0 + end + end + + end + end + end + + Meta::Metadata.send(:include, Timestamps::Metadata::InstanceMethods) end end \ No newline at end of file From 975e42b488a597b23d0d1a4639c7f370f359c30d Mon Sep 17 00:00:00 2001 From: Emmanuel Gomez Date: Sat, 19 Mar 2011 03:37:00 -0700 Subject: [PATCH 07/31] Stylistic quibbling (variable rename). --- lib/resque/plugins/meta/metadata.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/resque/plugins/meta/metadata.rb b/lib/resque/plugins/meta/metadata.rb index 412627b..8ba2066 100644 --- a/lib/resque/plugins/meta/metadata.rb +++ b/lib/resque/plugins/meta/metadata.rb @@ -15,7 +15,7 @@ def self.store(meta_id, data, expire_at) # Retrieve the metadata for a given job. If you call this # from a class that extends Meta, then the metadata will # only be returned if the metadata for that id is for the - # same class. Explicitly, calling Metadata.get(some_id) + # same class. Explicitly calling Metadata.get(some_id) # will return the metadata for a job of any type. def self.get(meta_id, job_class = nil) key = "meta:#{meta_id}" @@ -43,8 +43,8 @@ def initialize(data_hash) # Reload the metadata from the store def reload! - if new_meta = self.class.get(meta_id, job_class) - @data = new_meta.data + if loaded = self.class.get(meta_id, job_class) + @data = loaded.data end self end From c0e506f3ec380da332d6b1d710091994431e911f Mon Sep 17 00:00:00 2001 From: Emmanuel Gomez Date: Tue, 22 Mar 2011 02:09:46 -0700 Subject: [PATCH 08/31] Introduce JobIdentity plugin. Separated out in case someone wants to have uniquely identifiable jobs, but doesn't want metadata, or in case this gets merged upstream. --- lib/resque/plugins/job_identity.rb | 23 +++++++++++++ lib/resque/plugins/meta/metadata.rb | 51 +++++++++++++++-------------- 2 files changed, 50 insertions(+), 24 deletions(-) create mode 100644 lib/resque/plugins/job_identity.rb diff --git a/lib/resque/plugins/job_identity.rb b/lib/resque/plugins/job_identity.rb new file mode 100644 index 0000000..061b477 --- /dev/null +++ b/lib/resque/plugins/job_identity.rb @@ -0,0 +1,23 @@ +require 'digest/sha1' + +module Resque + module Plugins + module JobIdentity + # Enqueue a job in Resque and return the associated job_id. + # The returned job_id can be used to refer to the job in the future. + def enqueue(*args) + job_id = job_id(args) + yield(job_id) if block_given? + Resque.enqueue(self, job_id, *args) + job_id + end + + # Override in your job to control the job id. It is passed the same + # arguments as `perform`, that is, your job's payload. + def job_id(*args) + Digest::SHA1.hexdigest([ Time.now.to_f, rand, self, args ].join) + end + + end + end +end \ No newline at end of file diff --git a/lib/resque/plugins/meta/metadata.rb b/lib/resque/plugins/meta/metadata.rb index 8ba2066..8f94a09 100644 --- a/lib/resque/plugins/meta/metadata.rb +++ b/lib/resque/plugins/meta/metadata.rb @@ -4,12 +4,14 @@ module Resque module Plugins module Meta class Metadata - attr_reader :job_class, :meta_id, :data, :enqueued_at, :expire_in + extend Resque::Helpers - def self.store(meta_id, data, expire_at) - key = "meta:#{meta_id}" - Resque.redis.set(key, Resque.encode(data)) - Resque.redis.expireat("resque:#{key}", expire_at) if expire_at > 0 + attr_reader :job_id, :job_class, :data, :expire_in + + def self.store(job_id, data, expire_at) + key = "meta:#{job_id}" + redis.set(key, encode(data)) + redis.expireat("resque:#{key}", expire_at) if expire_at > 0 end # Retrieve the metadata for a given job. If you call this @@ -17,41 +19,42 @@ def self.store(meta_id, data, expire_at) # only be returned if the metadata for that id is for the # same class. Explicitly calling Metadata.get(some_id) # will return the metadata for a job of any type. - def self.get(meta_id, job_class = nil) - key = "meta:#{meta_id}" - if json = Resque.redis.get(key) - data = Resque.decode(json) + def self.get(job_id, job_class = nil) + if data = load(job_id, job_class) + Metadata.new(job_id, data["job_class"], data) + end + end + + def self.load(job_id, job_class = nil) + key = "meta:#{job_id}" + if json = redis.get(key) + data = decode(json) if !job_class || Meta == job_class || job_class.to_s == data['job_class'] - Metadata.new(data) + data end end end - def initialize(data_hash) - data_hash['enqueued_at'] ||= to_time_format_str(Time.now) - @data = data_hash - @meta_id = data_hash['meta_id'].dup - @enqueued_at = from_time_format_str('enqueued_at') - @job_class = data_hash['job_class'] - if @job_class.is_a?(String) - @job_class = Resque.constantize(data_hash['job_class']) - else - data_hash['job_class'] = @job_class.to_s - end + def initialize(job_id, job_class, data = nil) + @job_id = job_id + @job_class = job_class.is_a?(String) ? self.class.constantize(job_class) : job_class @expire_in = @job_class.expire_meta_in || 0 + + @data = data || {} + @data["job_class"] = @job_class.to_s end # Reload the metadata from the store def reload! - if loaded = self.class.get(meta_id, job_class) - @data = loaded.data + if data = self.class.load(job_id, job_class) + @data = data end self end # Save the metadata. returns self def save - self.class.store(meta_id, data, expire_at) + self.class.store(job_id, data, expire_at) self end From 64851d0e8b6c260202e462b710e9c280a503cf6f Mon Sep 17 00:00:00 2001 From: Emmanuel Gomez Date: Tue, 22 Mar 2011 02:12:45 -0700 Subject: [PATCH 09/31] Update Meta plugin to use JobIdentity. --- lib/resque/plugins/meta.rb | 41 ++++++++++++++++---------------------- 1 file changed, 17 insertions(+), 24 deletions(-) diff --git a/lib/resque/plugins/meta.rb b/lib/resque/plugins/meta.rb index 9cd690e..040a5d9 100644 --- a/lib/resque/plugins/meta.rb +++ b/lib/resque/plugins/meta.rb @@ -1,8 +1,7 @@ -require 'digest/sha1' require 'resque' +require 'resque/plugins/job_identity' require 'resque/plugins/meta/version' require 'resque/plugins/meta/metadata' -require 'resque/plugins/timestamps' module Resque module Plugins @@ -16,50 +15,44 @@ module Plugins # class MyJob # extend Resque::Plugins::Meta # - # def self.perform(meta_id, *args) + # def self.perform(job_id, *args) # heavy_lifting # end # end # # meta0 = MyJob.enqueue('stuff') - # meta0.enqueued_at # => 'Wed May 19 13:42:41 -0600 2010' - # meta0.meta_id # => '03c9e1a045ad012dd20500264a19273c' + # meta0.job_id # => '03c9e1a045ad012dd20500264a19273c' # meta0['foo'] = 'bar' # => 'bar' # meta0.save # # # later # meta1 = MyJob.get_meta('03c9e1a045ad012dd20500264a19273c') # meta1.job_class # => MyJob - # meta1.enqueued_at # => 'Wed May 19 13:42:41 -0600 2010' # meta1['foo'] # => 'bar' module Meta - # drop this extend and allow users to opt-in for timestamps - extend Timestamps + include JobIdentity - # Override in your job to control the metadata id. It is - # passed the same arguments as `perform`, that is, your job's - # payload. - def meta_id(*args) - Digest::SHA1.hexdigest([ Time.now.to_f, rand, self, args ].join) + # Enqueues a job in Resque and return the association metadata. + # The job_id in the returned object can be used to fetch the + # metadata again in the future. + def enqueue(*args) + metadata = nil + # need to save the metadata before queueing the job, hence the block + super do |job_id| + metadata = Metadata.new(job_id, self) + metadata.save + end + # metadata is updated by the after_enqueue hook + metadata.reload! end - # Override in your job to control the how many seconds a job's + # Override in your job to control the many seconds a job's # metadata will live after it finishes. Defaults to 24 hours. # Return nil or 0 to set them to never expire. def expire_meta_in 24 * 60 * 60 end - # Enqueues a job in Resque and return the association metadata. - # The meta_id in the returned object can be used to fetch the - # metadata again in the future. - def enqueue(*args) - meta = Metadata.new('meta_id' => meta_id(args), 'job_class' => self.to_s) - meta.save - Resque.enqueue(self, meta.meta_id, *args) - meta - end - def get_meta(meta_id) Metadata.get(meta_id, self) end From 3044f149fdb308f15e3a0acbd35027ddd2ea5caf Mon Sep 17 00:00:00 2001 From: Emmanuel Gomez Date: Tue, 22 Mar 2011 02:14:13 -0700 Subject: [PATCH 10/31] Fix bug I introduced with expiry on non-Timestamp metadata jobs. Also rename Meta::Metadata::InstanceMethods -> OverridableMethods to clarify intent. --- lib/resque/plugins/meta/metadata.rb | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/lib/resque/plugins/meta/metadata.rb b/lib/resque/plugins/meta/metadata.rb index 8f94a09..920ee77 100644 --- a/lib/resque/plugins/meta/metadata.rb +++ b/lib/resque/plugins/meta/metadata.rb @@ -67,11 +67,16 @@ def []=(key, val) end # methods in modules can be easily overridden or extended later (with more modules) - include module InstanceMethods + module OverridableMethods def expire_at - Time.now.to_i + expire_in + if @expire_in && 0 < @expire_in + Time.now.to_i + @expire_in + else + 0 + end end end + include OverridableMethods protected From 76747a303fda2603a814654d0a37471fd84d0136 Mon Sep 17 00:00:00 2001 From: Emmanuel Gomez Date: Tue, 22 Mar 2011 02:14:55 -0700 Subject: [PATCH 11/31] Clean up and update Timestamps plugin. --- lib/resque/plugins/timestamps.rb | 184 +++++++++++++++++++------------ 1 file changed, 111 insertions(+), 73 deletions(-) diff --git a/lib/resque/plugins/timestamps.rb b/lib/resque/plugins/timestamps.rb index 37c3132..6fbc691 100644 --- a/lib/resque/plugins/timestamps.rb +++ b/lib/resque/plugins/timestamps.rb @@ -1,105 +1,143 @@ -# After extending Resque::Plugins::Meta, -# extend Resque::Plugins::Timestamps to get start/finish/fail timestamps -# and lifecycle query methods on Resque::Plugins::Meta::Metadata +require 'resque/plugins/meta' + module Resque module Plugins + # extend Resque::Plugins::Timestamps to get start/finish/fail timestamps + # and lifecycle query methods on Resque::Plugins::Meta::Metadata + # + # For example: + # + # require 'resque-meta' + # + # class MyJob + # extend Resque::Plugins::Timestamps + # + # def self.perform(job_id, *args) + # heavy_lifting + # end + # end + # + # meta0 = MyJob.enqueue('stuff') + # meta0.job_id # => '03c9e1a045ad012dd20500264a19273c' + # meta0.enqueued_at # => 'Wed May 19 13:42:41 -0600 2010' + # meta0.started_at # => nil + # + # # later + # meta1 = MyJob.get_meta('03c9e1a045ad012dd20500264a19273c') + # meta1.enqueued_at # => 'Wed May 19 13:42:41 -0600 2010' + # meta1.started_at # => 'Wed May 19 13:42:51 -0600 2010' + # + # # later still + # meta2 = MyJob.get_meta('03c9e1a045ad012dd20500264a19273c') + # meta2.started_at # => 'Wed May 19 13:42:51 -0600 2010' + # meta2.finished_at # => 'Wed May 19 13:43:01 -0600 2010' module Timestamps + include Meta - def before_perform_meta(meta_id, *args) - if meta = Metadata.get(meta_id, self) - meta.start! - end + def self.extended(base) + Resque::Plugins::Meta::Metadata.send(:include, Resque::Plugins::Timestamps::Metadata) end - def after_perform_meta(meta_id, *args) - if meta = Metadata.get(meta_id, self) - meta.finish! - end + def after_enqueue_timestamp(job_id, *args) + meta = get_meta(job_id) and meta.enqueued! end - def on_failure_meta(e, meta_id, *args) - if meta = Metadata.get(meta_id, self) - meta.fail! - end + def before_perform_timestamp(job_id, *args) + meta = get_meta(job_id) and meta.started! + end + + def after_perform_timestamp(job_id, *args) + meta = get_meta(job_id) and meta.finished! + end + + def on_failure_timestamp(e, job_id, *args) + meta = get_meta(job_id) and meta.failed! end module Metadata - module InstanceMethods - def start! - @started_at = Time.now - self['started_at'] = to_time_format_str(@started_at) - save - end + attr_accessor :enqueued_at - def started_at - @started_at ||= from_time_format_str('started_at') - end + def enqueued! + @enqueued_at = Time.now + self['enqueued_at'] = to_time_format_str(@enqueued_at) + save + end - def finish! - data['succeeded'] = true unless data.has_key?('succeeded') - @finished_at = Time.now - self['finished_at'] = to_time_format_str(@finished_at) - save - end + def enqueued_at + @enqueued_at ||= from_time_format_str('enqueued_at') + end - def finished_at - @finished_at ||= from_time_format_str('finished_at') - end + def started! + @started_at = Time.now + self['started_at'] = to_time_format_str(@started_at) + save + end - def expire_at - if finished? && expire_in > 0 - finished_at.to_i + expire_in - else - 0 - end - end + def started_at + @started_at ||= from_time_format_str('started_at') + end - def enqueued? - !started? - end + def finished! + data['succeeded'] = true unless data.has_key?('succeeded') + @finished_at = Time.now + self['finished_at'] = to_time_format_str(@finished_at) + save + end - def working? - started? && !finished? - end + def finished_at + @finished_at ||= from_time_format_str('finished_at') + end - def started? - !!started_at - end + def failed! + self['succeeded'] = false + finished! + end - def finished? - !!finished_at - end + def enqueued? + !started? + end - def fail! - self['succeeded'] = false - finish! - end + def working? + started? && !finished? + end - def succeeded? - finished? ? self['succeeded'] : nil - end + def started? + !!started_at + end - def failed? - finished? ? !self['succeeded'] : nil - end + def finished? + !!finished_at + end - def seconds_enqueued - (started_at || Time.now).to_f - enqueued_at.to_f - end + def succeeded? + finished? ? self['succeeded'] : nil + end + + def failed? + finished? ? !self['succeeded'] : nil + end - def seconds_processing - if started? - (finished_at || Time.now).to_f - started_at.to_f - else - 0 - end + def expire_at + if finished? && @expire_in > 0 + finished_at.to_i + @expire_in + else + super end + end + def seconds_enqueued + (started_at || Time.now).to_f - enqueued_at.to_f + end + + def seconds_processing + if started? + (finished_at || Time.now).to_f - started_at.to_f + else + 0 + end end end end - - Meta::Metadata.send(:include, Timestamps::Metadata::InstanceMethods) end end \ No newline at end of file From 4d93e10084c4183ace4568c40f749dc49c22f481 Mon Sep 17 00:00:00 2001 From: Emmanuel Gomez Date: Tue, 22 Mar 2011 02:15:15 -0700 Subject: [PATCH 12/31] Shuffle module_function declaration in Resque::Plugins::Meta. --- lib/resque/plugins/meta.rb | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/lib/resque/plugins/meta.rb b/lib/resque/plugins/meta.rb index 040a5d9..bf8a772 100644 --- a/lib/resque/plugins/meta.rb +++ b/lib/resque/plugins/meta.rb @@ -46,17 +46,19 @@ def enqueue(*args) metadata.reload! end + module_function + # Override in your job to control the many seconds a job's # metadata will live after it finishes. Defaults to 24 hours. # Return nil or 0 to set them to never expire. def expire_meta_in 24 * 60 * 60 end + public :expire_meta_in - def get_meta(meta_id) - Metadata.get(meta_id, self) + def get_meta(job_id) + Metadata.get(job_id, self) end - module_function :get_meta public :get_meta end From 657a52e11a82fb3d9363240703387a2d9aa9a633 Mon Sep 17 00:00:00 2001 From: Emmanuel Gomez Date: Tue, 22 Mar 2011 02:15:38 -0700 Subject: [PATCH 13/31] Slight cleanup in test_helper. --- test/test_helper.rb | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/test_helper.rb b/test/test_helper.rb index 061f1a3..3735f58 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -1,5 +1,5 @@ -dir = File.dirname(File.expand_path(__FILE__)) -$LOAD_PATH.unshift dir + '/../lib' +test_dir = File.dirname(__FILE__) +$LOAD_PATH << File.expand_path('../lib', test_dir) require 'test/unit' require 'rubygems' require 'resque' @@ -27,11 +27,11 @@ pid = `ps -A -o pid,command | grep [r]edis-test`.split(" ")[0] puts "Killing test redis server[#{pid}]..." - `rm -f #{dir}/dump.rdb` + `rm -f #{test_dir}/dump.rdb` Process.kill("KILL", pid.to_i) exit exit_code end puts "Starting redis for testing at localhost:9736..." -`redis-server #{dir}/redis-test.conf` +`redis-server #{test_dir}/redis-test.conf` Resque.redis = 'localhost:9736' From 514bcd2a4d4e1528fe8d827c777c0f2778ada718 Mon Sep 17 00:00:00 2001 From: Emmanuel Gomez Date: Tue, 22 Mar 2011 02:15:51 -0700 Subject: [PATCH 14/31] Update tests to work with recent changes. --- test/meta_test.rb | 77 +++++----------------- test/timestamps_test.rb | 139 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 156 insertions(+), 60 deletions(-) create mode 100644 test/timestamps_test.rb diff --git a/test/meta_test.rb b/test/meta_test.rb index f4b2a0b..8b75123 100644 --- a/test/meta_test.rb +++ b/test/meta_test.rb @@ -1,4 +1,4 @@ -require File.dirname(__FILE__) + '/test_helper' +require File.expand_path('../test_helper', __FILE__) require 'resque-meta' class MetaJob @@ -9,8 +9,8 @@ def self.expire_meta_in 1 end - def self.perform(meta_id, key, val) - meta = get_meta(meta_id) + def self.perform(job_id, key, val) + meta = get_meta(job_id) meta[key] = val meta.save end @@ -20,7 +20,7 @@ class AnotherJob extend Resque::Plugins::Meta @queue = :test - def self.perform(meta_id) + def self.perform(job_id) end end @@ -32,23 +32,14 @@ def self.expire_meta_in 1 end - def self.perform(meta_id, key, val) - meta = get_meta(meta_id) + def self.perform(job_id, key, val) + meta = get_meta(job_id) meta[key] = val meta.save sleep 1 end end -class FailingJob - extend Resque::Plugins::Meta - @queue = :test - - def self.perform(*args) - raise 'boom' - end -end - class MetaTest < Test::Unit::TestCase def setup Resque.redis.flushall @@ -74,13 +65,7 @@ def test_enqueued_metadata now = Time.now meta = MetaJob.enqueue('foo', 'bar') assert_not_nil meta - assert_not_nil meta.meta_id - assert meta.enqueued_at.to_f > now.to_f, "#{meta.enqueued_at} should be after #{now}" - assert meta.seconds_enqueued > 0.0, "seconds_enqueued should be greater than zero" - assert meta.enqueued? - assert !meta.started? - assert_equal 0, meta.seconds_processing - assert !meta.finished? + assert_not_nil meta.job_id assert_nil meta['foo'] assert_equal Resque::Plugins::Meta::Metadata, meta.class assert_equal MetaJob, meta.job_class @@ -92,23 +77,17 @@ def test_processed_job worker = Resque::Worker.new(:test) worker.work(0) - meta = MetaJob.get_meta(meta.meta_id) + meta = MetaJob.get_meta(meta.job_id) assert_equal MetaJob, meta.job_class - assert meta.started? - assert meta.finished?, 'Job should be finished' - assert meta.succeeded?, 'Job should have succeeded' - assert !meta.enqueued? - assert meta.seconds_enqueued > 0.0, "seconds_enqueued should be greater than zero" - assert meta.seconds_processing > 0.0, "seconds_processing should be greater than zero" assert_equal 'bar', meta['foo'], "'foo' not found in #{meta.inspect}" end def test_wrong_id_for_class meta = MetaJob.enqueue('foo', 'bar') - assert_nil AnotherJob.get_meta(meta.meta_id) - assert_not_nil Resque::Plugins::Meta.get_meta(meta.meta_id) - assert_not_nil Resque::Plugins::Meta::Metadata.get(meta.meta_id) + assert_nil AnotherJob.get_meta(meta.job_id) + assert_not_nil Resque::Plugins::Meta.get_meta(meta.job_id) + assert_not_nil Resque::Plugins::Meta::Metadata.get(meta.job_id) end def test_expired_metadata @@ -117,8 +96,8 @@ def test_expired_metadata worker.work(0) sleep 2 - meta = MetaJob.get_meta(meta.meta_id) - assert_nil meta + reloaded = MetaJob.get_meta(meta.job_id) + assert_nil reloaded end def test_slow_job @@ -127,35 +106,13 @@ def test_slow_job thread = Thread.new { worker.work(0) } sleep 0.1 - meta = SlowJob.get_meta(meta.meta_id) - assert !meta.enqueued? - assert meta.started? - assert meta.working? - assert !meta.finished? + meta = SlowJob.get_meta(meta.job_id) thread.join # job should be done meta.reload! - assert !meta.enqueued? - assert meta.started? - assert !meta.working? - assert meta.finished? - assert meta.succeeded? - assert !meta.failed? - sleep 2 - assert_nil Resque::Plugins::Meta.get_meta(meta.meta_id) - end - - def test_failing_job - meta = FailingJob.enqueue() - assert_nil meta.failed? - worker = Resque::Worker.new(:test) - worker.work(0) - - meta.reload! - assert meta.finished? - assert meta.failed? - assert !meta.succeeded? + sleep 2 # metadata should be expired + assert_nil Resque::Plugins::Meta.get_meta(meta.job_id) end def test_saving_additional_metadata @@ -164,7 +121,7 @@ def test_saving_additional_metadata meta.save # later - meta = MetaJob.get_meta(meta.meta_id) + meta = MetaJob.get_meta(meta.job_id) assert_equal 'bar', meta['foo'] end end diff --git a/test/timestamps_test.rb b/test/timestamps_test.rb new file mode 100644 index 0000000..f89eca9 --- /dev/null +++ b/test/timestamps_test.rb @@ -0,0 +1,139 @@ +require File.expand_path('../test_helper', __FILE__) +require 'resque-meta' + +class TimestampsTest < Test::Unit::TestCase + class TimedJob + extend Resque::Plugins::Timestamps + @queue = :timed + + def self.perform(job_id, key, val) + meta = get_meta(job_id) + meta[key] = val + meta.save + end + end + + class AnotherTimedJob + extend Resque::Plugins::Timestamps + @queue = :timed + + def self.perform(job_id) + end + end + + class SlowTimedJob + extend Resque::Plugins::Timestamps + @queue = :timed + + def self.expire_meta_in + 1 + end + + def self.perform(job_id, key, val) + meta = get_meta(job_id) + meta[key] = val + meta.save + sleep 1 + end + end + + class FailingTimedJob + extend Resque::Plugins::Timestamps + @queue = :timed + + def self.perform(*args) + raise 'boom' + end + end + + + def setup + Resque.redis.flushall + end + + def test_lint + assert_nothing_raised do + Resque::Plugin.lint(Resque::Plugins::Timestamps) + end + end + + def test_enqueued_metadata + now = Time.now + meta = TimedJob.enqueue('foo', 'bar') + assert meta.enqueued_at.to_f > now.to_f, "#{meta.enqueued_at} should be after #{now}" + assert meta.seconds_enqueued > 0.0, "seconds_enqueued should be greater than zero" + assert meta.enqueued? + assert !meta.started? + assert_equal 0, meta.seconds_processing + assert !meta.finished? + assert_equal Resque::Plugins::Meta::Metadata, meta.class + assert_equal TimedJob, meta.job_class + end + + def test_processed_job + meta = TimedJob.enqueue('foo', 'bar') + assert_nil meta['foo'] + worker = Resque::Worker.new(:timed) + worker.work(0) + + meta = TimedJob.get_meta(meta.job_id) + assert_equal TimedJob, meta.job_class + assert meta.started?, 'Job should have started' + assert meta.finished?, 'Job should be finished' + assert meta.succeeded?, 'Job should have succeeded' + assert !meta.enqueued?, 'Job should have been removed from the queue' + assert meta.seconds_enqueued > 0.0, "seconds_enqueued should be greater than zero" + assert meta.seconds_processing > 0.0, "seconds_processing should be greater than zero" + end + + def test_expired_metadata + meta = MetaJob.enqueue('foo', 'bar') + worker = Resque::Worker.new(:timed) + worker.work(0) + + sleep 2 + meta = MetaJob.get_meta(meta.job_id) + assert_nil meta + end + + def test_slow_job + meta = SlowTimedJob.enqueue('foo', 'bar') + worker = Resque::Worker.new(:timed) + thread = Thread.new { worker.work(0) } + + sleep 0.1 + meta = SlowTimedJob.get_meta(meta.job_id) + assert !meta.enqueued? + assert meta.started? + assert meta.working? + assert !meta.finished? + + thread.join # job should be done + meta.reload! + assert !meta.enqueued? + assert meta.started? + assert !meta.working? + assert meta.finished? + assert meta.succeeded? + assert !meta.failed? + + sleep 2 + assert_nil Resque::Plugins::Meta.get_meta(meta.job_id) + end + + def test_failing_job + meta = FailingTimedJob.enqueue() + assert_nil meta.failed? + worker = Resque::Worker.new(:timed) + # debugger + worker.work(0) + + sleep 0.5 + # debugger + meta.reload! + assert meta.finished? + assert meta.failed? + assert !meta.succeeded? + end + +end From 1888a8980f9af7e26b3878c4e80e1bd6bd1a183f Mon Sep 17 00:00:00 2001 From: Emmanuel Gomez Date: Tue, 22 Mar 2011 02:31:41 -0700 Subject: [PATCH 15/31] Fix TimestampsTest. --- lib/resque-timestamps.rb | 1 + test/timestamps_test.rb | 12 +----------- 2 files changed, 2 insertions(+), 11 deletions(-) create mode 100644 lib/resque-timestamps.rb diff --git a/lib/resque-timestamps.rb b/lib/resque-timestamps.rb new file mode 100644 index 0000000..9e3a544 --- /dev/null +++ b/lib/resque-timestamps.rb @@ -0,0 +1 @@ +require 'resque/plugins/timestamps' diff --git a/test/timestamps_test.rb b/test/timestamps_test.rb index f89eca9..95d48d3 100644 --- a/test/timestamps_test.rb +++ b/test/timestamps_test.rb @@ -1,5 +1,5 @@ require File.expand_path('../test_helper', __FILE__) -require 'resque-meta' +require 'resque-timestamps' class TimestampsTest < Test::Unit::TestCase class TimedJob @@ -86,16 +86,6 @@ def test_processed_job assert meta.seconds_processing > 0.0, "seconds_processing should be greater than zero" end - def test_expired_metadata - meta = MetaJob.enqueue('foo', 'bar') - worker = Resque::Worker.new(:timed) - worker.work(0) - - sleep 2 - meta = MetaJob.get_meta(meta.job_id) - assert_nil meta - end - def test_slow_job meta = SlowTimedJob.enqueue('foo', 'bar') worker = Resque::Worker.new(:timed) From 3f05d82dffeee03f5c6efe73b752e8f6b1e36e0b Mon Sep 17 00:00:00 2001 From: Emmanuel Gomez Date: Tue, 22 Mar 2011 02:31:58 -0700 Subject: [PATCH 16/31] Tighten test timings a little to shorten test suite runtime. --- test/meta_test.rb | 2 +- test/timestamps_test.rb | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/test/meta_test.rb b/test/meta_test.rb index 8b75123..b0a124c 100644 --- a/test/meta_test.rb +++ b/test/meta_test.rb @@ -36,7 +36,7 @@ def self.perform(job_id, key, val) meta = get_meta(job_id) meta[key] = val meta.save - sleep 1 + sleep 0.5 end end diff --git a/test/timestamps_test.rb b/test/timestamps_test.rb index 95d48d3..48082bb 100644 --- a/test/timestamps_test.rb +++ b/test/timestamps_test.rb @@ -33,7 +33,7 @@ def self.perform(job_id, key, val) meta = get_meta(job_id) meta[key] = val meta.save - sleep 1 + sleep 0.5 end end @@ -91,7 +91,7 @@ def test_slow_job worker = Resque::Worker.new(:timed) thread = Thread.new { worker.work(0) } - sleep 0.1 + sleep 0.2 meta = SlowTimedJob.get_meta(meta.job_id) assert !meta.enqueued? assert meta.started? From 68465dee6330682b265838c737b9aad1d5224477 Mon Sep 17 00:00:00 2001 From: Emmanuel Gomez Date: Tue, 22 Mar 2011 12:33:52 -0700 Subject: [PATCH 17/31] Move Meta#enqueue to JobIdentity, added before_enqueue hooks, and simplified Meta. WARNING: this changes the signature of #enqueue, which no longer returns the new Metadata instance, and instance returns the new job_id. The Metadata instance is created before #enqueue returns, but it must be looked up separately if needed immediately. --- lib/resque/plugins/job_identity.rb | 13 ++++++++++++- lib/resque/plugins/meta.rb | 26 ++++++++++---------------- lib/resque/plugins/timestamps.rb | 3 ++- 3 files changed, 24 insertions(+), 18 deletions(-) diff --git a/lib/resque/plugins/job_identity.rb b/lib/resque/plugins/job_identity.rb index 061b477..3a1857e 100644 --- a/lib/resque/plugins/job_identity.rb +++ b/lib/resque/plugins/job_identity.rb @@ -1,13 +1,24 @@ require 'digest/sha1' module Resque + module Plugin + def before_enqueue_hooks(job) + job.methods.grep(/^before_enqueue/).sort + end + end + module Plugins module JobIdentity # Enqueue a job in Resque and return the associated job_id. # The returned job_id can be used to refer to the job in the future. + # prepends enqueued job_id to args def enqueue(*args) job_id = job_id(args) - yield(job_id) if block_given? + before_enqueue_hooks = Resque::Plugin.before_enqueue_hooks(self) + before_enqueue_hooks.each do |hook| + send(hook, job_id, *args) + end + Resque.enqueue(self, job_id, *args) job_id end diff --git a/lib/resque/plugins/meta.rb b/lib/resque/plugins/meta.rb index bf8a772..fb0f3f3 100644 --- a/lib/resque/plugins/meta.rb +++ b/lib/resque/plugins/meta.rb @@ -20,30 +20,24 @@ module Plugins # end # end # - # meta0 = MyJob.enqueue('stuff') - # meta0.job_id # => '03c9e1a045ad012dd20500264a19273c' - # meta0['foo'] = 'bar' # => 'bar' - # meta0.save + # job_id = MyJob.enqueue('stuff') # => '03c9e1a045ad012dd20500264a19273c' + # meta0 = MyJob.get_meta(job_id) + # meta0.job_id # => '03c9e1a045ad012dd20500264a19273c' + # meta0['foo'] = 'bar' # => 'bar' + # meta0.save # => meta0 # # # later # meta1 = MyJob.get_meta('03c9e1a045ad012dd20500264a19273c') - # meta1.job_class # => MyJob - # meta1['foo'] # => 'bar' + # meta1.job_class # => MyJob + # meta1['foo'] # => 'bar' module Meta include JobIdentity - # Enqueues a job in Resque and return the association metadata. + # Enqueues a job in Resque and return the associated metadata. # The job_id in the returned object can be used to fetch the # metadata again in the future. - def enqueue(*args) - metadata = nil - # need to save the metadata before queueing the job, hence the block - super do |job_id| - metadata = Metadata.new(job_id, self) - metadata.save - end - # metadata is updated by the after_enqueue hook - metadata.reload! + def before_enqueue_create_metadata(job_id, *args) + Metadata.new(job_id, self).save end module_function diff --git a/lib/resque/plugins/timestamps.rb b/lib/resque/plugins/timestamps.rb index 6fbc691..71462d0 100644 --- a/lib/resque/plugins/timestamps.rb +++ b/lib/resque/plugins/timestamps.rb @@ -17,7 +17,8 @@ module Plugins # end # end # - # meta0 = MyJob.enqueue('stuff') + # job_id = MyJob.enqueue('stuff') + # meta0 = MyJob.get_meta(job_id) # meta0.job_id # => '03c9e1a045ad012dd20500264a19273c' # meta0.enqueued_at # => 'Wed May 19 13:42:41 -0600 2010' # meta0.started_at # => nil From a94dce5f0e672a890ebb51beb587f9f15ae861a9 Mon Sep 17 00:00:00 2001 From: Emmanuel Gomez Date: Tue, 22 Mar 2011 12:36:45 -0700 Subject: [PATCH 18/31] Rename JobIdentity#job_id to #job_identity, and changed the signature. Now it takes a single argument: an array of args, instead of a list of args, as previously. --- lib/resque/plugins/job_identity.rb | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lib/resque/plugins/job_identity.rb b/lib/resque/plugins/job_identity.rb index 3a1857e..eb3ea24 100644 --- a/lib/resque/plugins/job_identity.rb +++ b/lib/resque/plugins/job_identity.rb @@ -13,19 +13,22 @@ module JobIdentity # The returned job_id can be used to refer to the job in the future. # prepends enqueued job_id to args def enqueue(*args) - job_id = job_id(args) + job_id = job_identity(args) + before_enqueue_hooks = Resque::Plugin.before_enqueue_hooks(self) before_enqueue_hooks.each do |hook| send(hook, job_id, *args) end Resque.enqueue(self, job_id, *args) + job_id end # Override in your job to control the job id. It is passed the same # arguments as `perform`, that is, your job's payload. - def job_id(*args) + # NOTE: expects a single args array, not a list of args! + def job_identity(args) Digest::SHA1.hexdigest([ Time.now.to_f, rand, self, args ].join) end From 81305186152fefeec6d5a325bedd5df841d9a4b4 Mon Sep 17 00:00:00 2001 From: Emmanuel Gomez Date: Tue, 22 Mar 2011 12:37:05 -0700 Subject: [PATCH 19/31] Clean up module_function and public in Meta. --- lib/resque/plugins/meta.rb | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/resque/plugins/meta.rb b/lib/resque/plugins/meta.rb index fb0f3f3..7bb4f47 100644 --- a/lib/resque/plugins/meta.rb +++ b/lib/resque/plugins/meta.rb @@ -40,19 +40,17 @@ def before_enqueue_create_metadata(job_id, *args) Metadata.new(job_id, self).save end - module_function - # Override in your job to control the many seconds a job's # metadata will live after it finishes. Defaults to 24 hours. # Return nil or 0 to set them to never expire. def expire_meta_in 24 * 60 * 60 end - public :expire_meta_in def get_meta(job_id) Metadata.get(job_id, self) end + module_function :get_meta public :get_meta end From 138cc74a968e654c90699fd72cddf348e4a3b319 Mon Sep 17 00:00:00 2001 From: Emmanuel Gomez Date: Tue, 22 Mar 2011 12:37:26 -0700 Subject: [PATCH 20/31] Remove unneeded attr_accessor :enqueued_at in Timestamps::Metadata. --- lib/resque/plugins/timestamps.rb | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/resque/plugins/timestamps.rb b/lib/resque/plugins/timestamps.rb index 71462d0..8d10e85 100644 --- a/lib/resque/plugins/timestamps.rb +++ b/lib/resque/plugins/timestamps.rb @@ -56,8 +56,6 @@ def on_failure_timestamp(e, job_id, *args) end module Metadata - attr_accessor :enqueued_at - def enqueued! @enqueued_at = Time.now self['enqueued_at'] = to_time_format_str(@enqueued_at) From 6c76e6724f35af73f56e670f0c6c8811ddeb6328 Mon Sep 17 00:00:00 2001 From: Emmanuel Gomez Date: Tue, 22 Mar 2011 12:37:42 -0700 Subject: [PATCH 21/31] Remove debugger cruft from TimestampsTest. --- test/timestamps_test.rb | 2 -- 1 file changed, 2 deletions(-) diff --git a/test/timestamps_test.rb b/test/timestamps_test.rb index 48082bb..f4dd2b7 100644 --- a/test/timestamps_test.rb +++ b/test/timestamps_test.rb @@ -115,11 +115,9 @@ def test_failing_job meta = FailingTimedJob.enqueue() assert_nil meta.failed? worker = Resque::Worker.new(:timed) - # debugger worker.work(0) sleep 0.5 - # debugger meta.reload! assert meta.finished? assert meta.failed? From 1bee3b1701400bc2e3ed152cb99bc12a215962ce Mon Sep 17 00:00:00 2001 From: Emmanuel Gomez Date: Tue, 22 Mar 2011 12:44:11 -0700 Subject: [PATCH 22/31] Fix tests I broke by changing the signature of #enqueue. --- test/meta_test.rb | 34 +++++++++++++++++++--------------- test/timestamps_test.rb | 17 ++++++++++------- 2 files changed, 29 insertions(+), 22 deletions(-) diff --git a/test/meta_test.rb b/test/meta_test.rb index b0a124c..5e1803b 100644 --- a/test/meta_test.rb +++ b/test/meta_test.rb @@ -63,7 +63,9 @@ def test_resque_version def test_enqueued_metadata now = Time.now - meta = MetaJob.enqueue('foo', 'bar') + job_id = MetaJob.enqueue('foo', 'bar') + assert_not_nil job_id + meta = MetaJob.get_meta(job_id) assert_not_nil meta assert_not_nil meta.job_id assert_nil meta['foo'] @@ -72,56 +74,58 @@ def test_enqueued_metadata end def test_processed_job - meta = MetaJob.enqueue('foo', 'bar') + job_id = MetaJob.enqueue('foo', 'bar') + meta = MetaJob.get_meta(job_id) assert_nil meta['foo'] worker = Resque::Worker.new(:test) worker.work(0) - meta = MetaJob.get_meta(meta.job_id) + meta = MetaJob.get_meta(job_id) assert_equal MetaJob, meta.job_class assert_equal 'bar', meta['foo'], "'foo' not found in #{meta.inspect}" end def test_wrong_id_for_class - meta = MetaJob.enqueue('foo', 'bar') + job_id = MetaJob.enqueue('foo', 'bar') - assert_nil AnotherJob.get_meta(meta.job_id) - assert_not_nil Resque::Plugins::Meta.get_meta(meta.job_id) - assert_not_nil Resque::Plugins::Meta::Metadata.get(meta.job_id) + assert_nil AnotherJob.get_meta(job_id) + assert_not_nil Resque::Plugins::Meta.get_meta(job_id) + assert_not_nil Resque::Plugins::Meta::Metadata.get(job_id) end def test_expired_metadata - meta = MetaJob.enqueue('foo', 'bar') + job_id = MetaJob.enqueue('foo', 'bar') worker = Resque::Worker.new(:test) worker.work(0) sleep 2 - reloaded = MetaJob.get_meta(meta.job_id) - assert_nil reloaded + assert_nil MetaJob.get_meta(job_id) end def test_slow_job - meta = SlowJob.enqueue('foo', 'bar') + job_id = SlowJob.enqueue('foo', 'bar') worker = Resque::Worker.new(:test) thread = Thread.new { worker.work(0) } sleep 0.1 - meta = SlowJob.get_meta(meta.job_id) + meta = SlowJob.get_meta(job_id) + assert_not_nil meta thread.join # job should be done meta.reload! sleep 2 # metadata should be expired - assert_nil Resque::Plugins::Meta.get_meta(meta.job_id) + assert_nil Resque::Plugins::Meta.get_meta(job_id) end def test_saving_additional_metadata - meta = MetaJob.enqueue('stuff') + job_id = MetaJob.enqueue('stuff') + meta = MetaJob.get_meta(job_id) meta['foo'] = 'bar' meta.save # later - meta = MetaJob.get_meta(meta.job_id) + meta = MetaJob.get_meta(job_id) assert_equal 'bar', meta['foo'] end end diff --git a/test/timestamps_test.rb b/test/timestamps_test.rb index f4dd2b7..64f015e 100644 --- a/test/timestamps_test.rb +++ b/test/timestamps_test.rb @@ -59,7 +59,8 @@ def test_lint def test_enqueued_metadata now = Time.now - meta = TimedJob.enqueue('foo', 'bar') + job_id = TimedJob.enqueue('foo', 'bar') + meta = TimedJob.get_meta(job_id) assert meta.enqueued_at.to_f > now.to_f, "#{meta.enqueued_at} should be after #{now}" assert meta.seconds_enqueued > 0.0, "seconds_enqueued should be greater than zero" assert meta.enqueued? @@ -71,12 +72,13 @@ def test_enqueued_metadata end def test_processed_job - meta = TimedJob.enqueue('foo', 'bar') + job_id = TimedJob.enqueue('foo', 'bar') + meta = TimedJob.get_meta(job_id) assert_nil meta['foo'] worker = Resque::Worker.new(:timed) worker.work(0) - meta = TimedJob.get_meta(meta.job_id) + meta = TimedJob.get_meta(job_id) assert_equal TimedJob, meta.job_class assert meta.started?, 'Job should have started' assert meta.finished?, 'Job should be finished' @@ -87,12 +89,12 @@ def test_processed_job end def test_slow_job - meta = SlowTimedJob.enqueue('foo', 'bar') + job_id = SlowTimedJob.enqueue('foo', 'bar') worker = Resque::Worker.new(:timed) thread = Thread.new { worker.work(0) } sleep 0.2 - meta = SlowTimedJob.get_meta(meta.job_id) + meta = SlowTimedJob.get_meta(job_id) assert !meta.enqueued? assert meta.started? assert meta.working? @@ -108,11 +110,12 @@ def test_slow_job assert !meta.failed? sleep 2 - assert_nil Resque::Plugins::Meta.get_meta(meta.job_id) + assert_nil Resque::Plugins::Meta.get_meta(job_id) end def test_failing_job - meta = FailingTimedJob.enqueue() + job_id = FailingTimedJob.enqueue() + meta = FailingTimedJob.get_meta(job_id) assert_nil meta.failed? worker = Resque::Worker.new(:timed) worker.work(0) From 022aa61bc56046a0668c452f69b93a3d089ee25b Mon Sep 17 00:00:00 2001 From: Emmanuel Gomez Date: Thu, 21 Apr 2011 00:04:52 -0700 Subject: [PATCH 23/31] Move Metadata.get to the top. --- lib/resque/plugins/meta/metadata.rb | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/resque/plugins/meta/metadata.rb b/lib/resque/plugins/meta/metadata.rb index 920ee77..ab404a0 100644 --- a/lib/resque/plugins/meta/metadata.rb +++ b/lib/resque/plugins/meta/metadata.rb @@ -8,12 +8,6 @@ class Metadata attr_reader :job_id, :job_class, :data, :expire_in - def self.store(job_id, data, expire_at) - key = "meta:#{job_id}" - redis.set(key, encode(data)) - redis.expireat("resque:#{key}", expire_at) if expire_at > 0 - end - # Retrieve the metadata for a given job. If you call this # from a class that extends Meta, then the metadata will # only be returned if the metadata for that id is for the @@ -25,6 +19,12 @@ def self.get(job_id, job_class = nil) end end + def self.store(job_id, data, expire_at) + key = "meta:#{job_id}" + redis.set(key, encode(data)) + redis.expireat("resque:#{key}", expire_at) if expire_at > 0 + end + def self.load(job_id, job_class = nil) key = "meta:#{job_id}" if json = redis.get(key) From 8392a5cff3fe4f97645efbdae686955ea7049e72 Mon Sep 17 00:00:00 2001 From: Emmanuel Gomez Date: Thu, 21 Apr 2011 00:06:07 -0700 Subject: [PATCH 24/31] Convert lookup keys with to_s, and add delegated #include?. --- lib/resque/plugins/meta/metadata.rb | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/resque/plugins/meta/metadata.rb b/lib/resque/plugins/meta/metadata.rb index ab404a0..18b4584 100644 --- a/lib/resque/plugins/meta/metadata.rb +++ b/lib/resque/plugins/meta/metadata.rb @@ -59,13 +59,17 @@ def save end def [](key) - data[key] + data[key.to_s] end def []=(key, val) data[key.to_s] = val end + def include?(key) + data.include?(key.to_s) + end + # methods in modules can be easily overridden or extended later (with more modules) module OverridableMethods def expire_at From 5b20224cf59af6edc4bf40e5339fdd28862e2a71 Mon Sep 17 00:00:00 2001 From: Emmanuel Gomez Date: Thu, 21 Apr 2011 00:06:39 -0700 Subject: [PATCH 25/31] Label the many ends in metadata.rb and timestamps.rb. --- lib/resque/plugins/meta/metadata.rb | 8 ++++---- lib/resque/plugins/timestamps.rb | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/lib/resque/plugins/meta/metadata.rb b/lib/resque/plugins/meta/metadata.rb index 18b4584..94f8b48 100644 --- a/lib/resque/plugins/meta/metadata.rb +++ b/lib/resque/plugins/meta/metadata.rb @@ -92,7 +92,7 @@ def to_time_format_str(time) time.utc.iso8601(6) end - end - end - end -end + end # class Metadata + end # module Meta + end # module Plugins +end # module Resque diff --git a/lib/resque/plugins/timestamps.rb b/lib/resque/plugins/timestamps.rb index 8d10e85..73636f0 100644 --- a/lib/resque/plugins/timestamps.rb +++ b/lib/resque/plugins/timestamps.rb @@ -135,8 +135,8 @@ def seconds_processing 0 end end - end + end # module Metadata - end - end -end \ No newline at end of file + end # module Timestamps + end # module Plugins +end # module Resque From 0eb7b3244fd6971386bec09a56d47091b35cb7ba Mon Sep 17 00:00:00 2001 From: Emmanuel Gomez Date: Thu, 21 Apr 2011 00:18:46 -0700 Subject: [PATCH 26/31] Replace Metadata#from_time_format_str with Timestamps::Metadata#get_timestamp. --- lib/resque/plugins/meta/metadata.rb | 4 ---- lib/resque/plugins/timestamps.rb | 14 ++++++++++---- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/lib/resque/plugins/meta/metadata.rb b/lib/resque/plugins/meta/metadata.rb index 94f8b48..9babd8f 100644 --- a/lib/resque/plugins/meta/metadata.rb +++ b/lib/resque/plugins/meta/metadata.rb @@ -84,10 +84,6 @@ def expire_at protected - def from_time_format_str(key) - (t = self[key]) && Time.parse(t) - end - def to_time_format_str(time) time.utc.iso8601(6) end diff --git a/lib/resque/plugins/timestamps.rb b/lib/resque/plugins/timestamps.rb index 73636f0..8b4256a 100644 --- a/lib/resque/plugins/timestamps.rb +++ b/lib/resque/plugins/timestamps.rb @@ -63,7 +63,7 @@ def enqueued! end def enqueued_at - @enqueued_at ||= from_time_format_str('enqueued_at') + @enqueued_at ||= get_timestamp(:enqueued_at) end def started! @@ -73,7 +73,7 @@ def started! end def started_at - @started_at ||= from_time_format_str('started_at') + @started_at ||= get_timestamp(:started_at) end def finished! @@ -84,7 +84,7 @@ def finished! end def finished_at - @finished_at ||= from_time_format_str('finished_at') + @finished_at ||= get_timestamp(:finished_at) end def failed! @@ -135,8 +135,14 @@ def seconds_processing 0 end end - end # module Metadata + private + + def get_timestamp(name) + time_string = self[name] and Time.parse(time_string) + end + + end # module Metadata end # module Timestamps end # module Plugins end # module Resque From 1c74653f7e4932e29e3251861ca70dd510c8e1bc Mon Sep 17 00:00:00 2001 From: Emmanuel Gomez Date: Thu, 21 Apr 2011 00:22:27 -0700 Subject: [PATCH 27/31] Replace Metadata#to_time_format_str with Timestamps::Metadata#set_timestamp. --- lib/resque/plugins/meta/metadata.rb | 6 ------ lib/resque/plugins/timestamps.rb | 26 ++++++++++++++++++++------ 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/lib/resque/plugins/meta/metadata.rb b/lib/resque/plugins/meta/metadata.rb index 9babd8f..e6fb0d0 100644 --- a/lib/resque/plugins/meta/metadata.rb +++ b/lib/resque/plugins/meta/metadata.rb @@ -82,12 +82,6 @@ def expire_at end include OverridableMethods - protected - - def to_time_format_str(time) - time.utc.iso8601(6) - end - end # class Metadata end # module Meta end # module Plugins diff --git a/lib/resque/plugins/timestamps.rb b/lib/resque/plugins/timestamps.rb index 8b4256a..68d50f1 100644 --- a/lib/resque/plugins/timestamps.rb +++ b/lib/resque/plugins/timestamps.rb @@ -57,32 +57,41 @@ def on_failure_timestamp(e, job_id, *args) module Metadata def enqueued! - @enqueued_at = Time.now - self['enqueued_at'] = to_time_format_str(@enqueued_at) + self.enqueued_at = Time.now save end + def enqueued_at=(time) + set_timestamp(:enqueued_at, time) + end + def enqueued_at @enqueued_at ||= get_timestamp(:enqueued_at) end def started! - @started_at = Time.now - self['started_at'] = to_time_format_str(@started_at) + self.started_at = Time.now save end + def started_at=(time) + set_timestamp(:started_at, time) + end + def started_at @started_at ||= get_timestamp(:started_at) end def finished! data['succeeded'] = true unless data.has_key?('succeeded') - @finished_at = Time.now - self['finished_at'] = to_time_format_str(@finished_at) + self.finished_at = Time.now save end + def finished_at=(time) + set_timestamp(:finished_at, time) + end + def finished_at @finished_at ||= get_timestamp(:finished_at) end @@ -142,6 +151,11 @@ def get_timestamp(name) time_string = self[name] and Time.parse(time_string) end + def set_timestamp(name, time) + self.instance_variable_set("@#{name}", time) + self[name] = time.utc.iso8601(6) + end + end # module Metadata end # module Timestamps end # module Plugins From e4c438a7a74a88552702f6ada234be18bb3a8515 Mon Sep 17 00:00:00 2001 From: Emmanuel Gomez Date: Thu, 21 Apr 2011 00:07:32 -0700 Subject: [PATCH 28/31] Rename Timestamps::Metadata -> Timestamps::MetadataExtensions. --- lib/resque/plugins/timestamps.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/resque/plugins/timestamps.rb b/lib/resque/plugins/timestamps.rb index 68d50f1..8e56acb 100644 --- a/lib/resque/plugins/timestamps.rb +++ b/lib/resque/plugins/timestamps.rb @@ -36,7 +36,7 @@ module Timestamps include Meta def self.extended(base) - Resque::Plugins::Meta::Metadata.send(:include, Resque::Plugins::Timestamps::Metadata) + Meta::Metadata.send(:include, Timestamps::MetadataExtensions) end def after_enqueue_timestamp(job_id, *args) @@ -55,7 +55,7 @@ def on_failure_timestamp(e, job_id, *args) meta = get_meta(job_id) and meta.failed! end - module Metadata + module MetadataExtensions def enqueued! self.enqueued_at = Time.now save @@ -156,7 +156,7 @@ def set_timestamp(name, time) self[name] = time.utc.iso8601(6) end - end # module Metadata + end # module MetadataExtensions end # module Timestamps end # module Plugins end # module Resque From f982aeef47ee4b3b64230cb02889c8d2c9e0da48 Mon Sep 17 00:00:00 2001 From: Emmanuel Gomez Date: Thu, 21 Apr 2011 00:25:11 -0700 Subject: [PATCH 29/31] Symbols everywhere! --- lib/resque/plugins/timestamps.rb | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/resque/plugins/timestamps.rb b/lib/resque/plugins/timestamps.rb index 8e56acb..6d95524 100644 --- a/lib/resque/plugins/timestamps.rb +++ b/lib/resque/plugins/timestamps.rb @@ -83,7 +83,7 @@ def started_at end def finished! - data['succeeded'] = true unless data.has_key?('succeeded') + self[:succeeded] = true unless include?(:succeeded) self.finished_at = Time.now save end @@ -97,7 +97,7 @@ def finished_at end def failed! - self['succeeded'] = false + self[:succeeded] = false finished! end @@ -118,11 +118,11 @@ def finished? end def succeeded? - finished? ? self['succeeded'] : nil + finished? ? self[:succeeded] : nil end def failed? - finished? ? !self['succeeded'] : nil + finished? ? !self[:succeeded] : nil end def expire_at From cef2d0ba3dd2d45fb96ff25660a62305eca2332a Mon Sep 17 00:00:00 2001 From: Emmanuel Gomez Date: Thu, 21 Apr 2011 00:59:44 -0700 Subject: [PATCH 30/31] Group methods in Timestamps by type. --- lib/resque/plugins/timestamps.rb | 83 ++++++++++++++++---------------- 1 file changed, 42 insertions(+), 41 deletions(-) diff --git a/lib/resque/plugins/timestamps.rb b/lib/resque/plugins/timestamps.rb index 6d95524..16b88c3 100644 --- a/lib/resque/plugins/timestamps.rb +++ b/lib/resque/plugins/timestamps.rb @@ -56,44 +56,44 @@ def on_failure_timestamp(e, job_id, *args) end module MetadataExtensions - def enqueued! - self.enqueued_at = Time.now - save + def enqueued? + !started? end - def enqueued_at=(time) - set_timestamp(:enqueued_at, time) + def working? + started? && !finished? end - def enqueued_at - @enqueued_at ||= get_timestamp(:enqueued_at) + def started? + !!started_at end - def started! - self.started_at = Time.now - save + def finished? + !!finished_at end - def started_at=(time) - set_timestamp(:started_at, time) + def succeeded? + finished? ? self[:succeeded] : nil end - def started_at - @started_at ||= get_timestamp(:started_at) + def failed? + finished? ? !self[:succeeded] : nil end - def finished! - self[:succeeded] = true unless include?(:succeeded) - self.finished_at = Time.now + def enqueued! + self.enqueued_at = Time.now save end - def finished_at=(time) - set_timestamp(:finished_at, time) + def started! + self.started_at = Time.now + save end - def finished_at - @finished_at ||= get_timestamp(:finished_at) + def finished! + self[:succeeded] = true unless include?(:succeeded) + self.finished_at = Time.now + save end def failed! @@ -101,36 +101,28 @@ def failed! finished! end - def enqueued? - !started? - end - - def working? - started? && !finished? + def enqueued_at + @enqueued_at ||= get_timestamp(:enqueued_at) end - def started? - !!started_at + def started_at + @started_at ||= get_timestamp(:started_at) end - def finished? - !!finished_at + def finished_at + @finished_at ||= get_timestamp(:finished_at) end - def succeeded? - finished? ? self[:succeeded] : nil + def enqueued_at=(time) + set_timestamp(:enqueued_at, time) end - def failed? - finished? ? !self[:succeeded] : nil + def started_at=(time) + set_timestamp(:started_at, time) end - def expire_at - if finished? && @expire_in > 0 - finished_at.to_i + @expire_in - else - super - end + def finished_at=(time) + set_timestamp(:finished_at, time) end def seconds_enqueued @@ -145,6 +137,15 @@ def seconds_processing end end + # override default implementation to base expiry on finish time + def expire_at + if finished? && @expire_in > 0 + finished_at.to_i + @expire_in + else + super + end + end + private def get_timestamp(name) From f4c6c981cad1d2d77f6b4786e759cbff1463d1a6 Mon Sep 17 00:00:00 2001 From: Emmanuel Gomez Date: Thu, 21 Apr 2011 01:25:10 -0700 Subject: [PATCH 31/31] Streamline comment in Timestamps. --- lib/resque/plugins/timestamps.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/resque/plugins/timestamps.rb b/lib/resque/plugins/timestamps.rb index 16b88c3..5db8e0a 100644 --- a/lib/resque/plugins/timestamps.rb +++ b/lib/resque/plugins/timestamps.rb @@ -24,12 +24,12 @@ module Plugins # meta0.started_at # => nil # # # later - # meta1 = MyJob.get_meta('03c9e1a045ad012dd20500264a19273c') + # meta1 = MyJob.get_meta(job_id) # meta1.enqueued_at # => 'Wed May 19 13:42:41 -0600 2010' # meta1.started_at # => 'Wed May 19 13:42:51 -0600 2010' # # # later still - # meta2 = MyJob.get_meta('03c9e1a045ad012dd20500264a19273c') + # meta2 = MyJob.get_meta(job_id) # meta2.started_at # => 'Wed May 19 13:42:51 -0600 2010' # meta2.finished_at # => 'Wed May 19 13:43:01 -0600 2010' module Timestamps