Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
f19bc17
Move some responsibilities from Meta to Meta::Metadata.
emmanuel Mar 18, 2011
5680616
Move timestamp responsibility out of Meta into new Timestamps plugin.
emmanuel Mar 18, 2011
79cf5b1
Fix a bug I introduced.
emmanuel Mar 19, 2011
f63c7d1
Oops, not going far without that require.
emmanuel Mar 19, 2011
c310752
Update Metadata.store interface.
emmanuel Mar 19, 2011
08388be
Strip Metadata down to the minimum, put more responsibility on Timest…
emmanuel Mar 19, 2011
975e42b
Stylistic quibbling (variable rename).
emmanuel Mar 19, 2011
c0e506f
Introduce JobIdentity plugin.
emmanuel Mar 22, 2011
64851d0
Update Meta plugin to use JobIdentity.
emmanuel Mar 22, 2011
3044f14
Fix bug I introduced with expiry on non-Timestamp metadata jobs.
emmanuel Mar 22, 2011
76747a3
Clean up and update Timestamps plugin.
emmanuel Mar 22, 2011
4d93e10
Shuffle module_function declaration in Resque::Plugins::Meta.
emmanuel Mar 22, 2011
657a52e
Slight cleanup in test_helper.
emmanuel Mar 22, 2011
514bcd2
Update tests to work with recent changes.
emmanuel Mar 22, 2011
1888a89
Fix TimestampsTest.
emmanuel Mar 22, 2011
3f05d82
Tighten test timings a little to shorten test suite runtime.
emmanuel Mar 22, 2011
68465de
Move Meta#enqueue to JobIdentity, added before_enqueue hooks, and sim…
emmanuel Mar 22, 2011
a94dce5
Rename JobIdentity#job_id to #job_identity, and changed the signature.
emmanuel Mar 22, 2011
8130518
Clean up module_function and public in Meta.
emmanuel Mar 22, 2011
138cc74
Remove unneeded attr_accessor :enqueued_at in Timestamps::Metadata.
emmanuel Mar 22, 2011
6c76e67
Remove debugger cruft from TimestampsTest.
emmanuel Mar 22, 2011
1bee3b1
Fix tests I broke by changing the signature of #enqueue.
emmanuel Mar 22, 2011
022aa61
Move Metadata.get to the top.
emmanuel Apr 21, 2011
8392a5c
Convert lookup keys with to_s, and add delegated #include?.
emmanuel Apr 21, 2011
5b20224
Label the many ends in metadata.rb and timestamps.rb.
emmanuel Apr 21, 2011
0eb7b32
Replace Metadata#from_time_format_str with Timestamps::Metadata#get_t…
emmanuel Apr 21, 2011
1c74653
Replace Metadata#to_time_format_str with Timestamps::Metadata#set_tim…
emmanuel Apr 21, 2011
e4c438a
Rename Timestamps::Metadata -> Timestamps::MetadataExtensions.
emmanuel Apr 21, 2011
f982aee
Symbols everywhere!
emmanuel Apr 21, 2011
cef2d0b
Group methods in Timestamps by type.
emmanuel Apr 21, 2011
f4c6c98
Streamline comment in Timestamps.
emmanuel Apr 21, 2011
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/resque-timestamps.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
require 'resque/plugins/timestamps'
37 changes: 37 additions & 0 deletions lib/resque/plugins/job_identity.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
require 'digest/sha1'

module Resque
module Plugin
def before_enqueue_hooks(job)
job.methods.grep(/^before_enqueue/).sort
end
end

module Plugins
module JobIdentity
# Enqueue a job in Resque and return the associated job_id.
# The returned job_id can be used to refer to the job in the future.
# prepends enqueued job_id to args
def enqueue(*args)
job_id = job_identity(args)

before_enqueue_hooks = Resque::Plugin.before_enqueue_hooks(self)
before_enqueue_hooks.each do |hook|
send(hook, job_id, *args)
end

Resque.enqueue(self, job_id, *args)

job_id
end

# Override in your job to control the job id. It is passed the same
# arguments as `perform`, that is, your job's payload.
# NOTE: expects a single args array, not a list of args!
def job_identity(args)
Digest::SHA1.hexdigest([ Time.now.to_f, rand, self, args ].join)
end

end
end
end
82 changes: 18 additions & 64 deletions lib/resque/plugins/meta.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
require 'digest/sha1'
require 'resque'
require 'resque/plugins/job_identity'
require 'resque/plugins/meta/version'
require 'resque/plugins/meta/metadata'

Expand All @@ -15,90 +15,44 @@ module Plugins
# class MyJob
# extend Resque::Plugins::Meta
#
# def self.perform(meta_id, *args)
# def self.perform(job_id, *args)
# heavy_lifting
# end
# end
#
# meta0 = MyJob.enqueue('stuff')
# meta0.enqueued_at # => 'Wed May 19 13:42:41 -0600 2010'
# meta0.meta_id # => '03c9e1a045ad012dd20500264a19273c'
# meta0['foo'] = 'bar' # => 'bar'
# meta0.save
# job_id = MyJob.enqueue('stuff') # => '03c9e1a045ad012dd20500264a19273c'
# meta0 = MyJob.get_meta(job_id)
# meta0.job_id # => '03c9e1a045ad012dd20500264a19273c'
# meta0['foo'] = 'bar' # => 'bar'
# meta0.save # => meta0
#
# # later
# meta1 = MyJob.get_meta('03c9e1a045ad012dd20500264a19273c')
# meta1.job_class # => MyJob
# meta1.enqueued_at # => 'Wed May 19 13:42:41 -0600 2010'
# meta1['foo'] # => 'bar'
# meta1.job_class # => MyJob
# meta1['foo'] # => 'bar'
module Meta
include JobIdentity

# Override in your job to control the metadata id. It is
# passed the same arguments as `perform`, that is, your job's
# payload.
def meta_id(*args)
Digest::SHA1.hexdigest([ Time.now.to_f, rand, self, args ].join)
# Enqueues a job in Resque and return the associated metadata.
# The job_id in the returned object can be used to fetch the
# metadata again in the future.
def before_enqueue_create_metadata(job_id, *args)
Metadata.new(job_id, self).save
end

# Override in your job to control the how many seconds a job's
# Override in your job to control the many seconds a job's
# metadata will live after it finishes. Defaults to 24 hours.
# Return nil or 0 to set them to never expire.
def expire_meta_in
24 * 60 * 60
end

# Enqueues a job in Resque and return the association metadata.
# The meta_id in the returned object can be used to fetch the
# metadata again in the future.
def enqueue(*args)
meta = Metadata.new({'meta_id' => meta_id(args), 'job_class' => self.to_s})
meta.save
Resque.enqueue(self, meta.meta_id, *args)
meta
end

def store_meta(meta)
key = "meta:#{meta.meta_id}"
json = Resque.encode(meta.data)
Resque.redis.set(key, json)
Resque.redis.expireat("resque:#{key}", meta.expire_at) if meta.expire_at > 0
meta
end

# Retrieve the metadata for a given job. If you call this
# from a class that extends Meta, then the metadata will
# only be returned if the metadata for that id is for the
# same class. Explicitly, calling Meta.get_meta(some_id)
# will return the metadata for a job of any type.
def get_meta(meta_id)
key = "meta:#{meta_id}"
if json = Resque.redis.get(key)
data = Resque.decode(json)
if self == Meta || self.to_s == data['job_class']
Metadata.new(data)
end
end
def get_meta(job_id)
Metadata.get(job_id, self)
end
module_function :get_meta
public :get_meta

def before_perform_meta(meta_id, *args)
if meta = get_meta(meta_id)
meta.start!
end
end

def after_perform_meta(meta_id, *args)
if meta = get_meta(meta_id)
meta.finish!
end
end

def on_failure_meta(e, meta_id, *args)
if meta = get_meta(meta_id)
meta.fail!
end
end
end
end
end
149 changes: 55 additions & 94 deletions lib/resque/plugins/meta/metadata.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,124 +4,85 @@ module Resque
module Plugins
module Meta
class Metadata
attr_reader :job_class, :meta_id, :data, :enqueued_at, :expire_in
extend Resque::Helpers

attr_reader :job_id, :job_class, :data, :expire_in

# Retrieve the metadata for a given job. If you call this
# from a class that extends Meta, then the metadata will
# only be returned if the metadata for that id is for the
# same class. Explicitly calling Metadata.get(some_id)
# will return the metadata for a job of any type.
def self.get(job_id, job_class = nil)
if data = load(job_id, job_class)
Metadata.new(job_id, data["job_class"], data)
end
end

def initialize(data_hash)
data_hash['enqueued_at'] ||= to_time_format_str(Time.now)
@data = data_hash
@meta_id = data_hash['meta_id'].dup
@enqueued_at = from_time_format_str('enqueued_at')
@job_class = data_hash['job_class']
if @job_class.is_a?(String)
@job_class = Resque.constantize(data_hash['job_class'])
else
data_hash['job_class'] = @job_class.to_s
def self.store(job_id, data, expire_at)
key = "meta:#{job_id}"
redis.set(key, encode(data))
redis.expireat("resque:#{key}", expire_at) if expire_at > 0
end

def self.load(job_id, job_class = nil)
key = "meta:#{job_id}"
if json = redis.get(key)
data = decode(json)
if !job_class || Meta == job_class || job_class.to_s == data['job_class']
data
end
end
end

def initialize(job_id, job_class, data = nil)
@job_id = job_id
@job_class = job_class.is_a?(String) ? self.class.constantize(job_class) : job_class
@expire_in = @job_class.expire_meta_in || 0

@data = data || {}
@data["job_class"] = @job_class.to_s
end

# Reload the metadata from the store
def reload!
if new_meta = job_class.get_meta(meta_id)
@data = new_meta.data
if data = self.class.load(job_id, job_class)
@data = data
end
self
end

# Save the metadata
# Save the metadata. returns self
def save
job_class.store_meta(self)
self.class.store(job_id, data, expire_at)
self
end

def [](key)
data[key]
data[key.to_s]
end

def []=(key, val)
data[key.to_s] = val
end

def start!
@started_at = Time.now
self['started_at'] = to_time_format_str(@started_at)
save
end

def started_at
@started_at ||= from_time_format_str('started_at')
end

def finish!
data['succeeded'] = true unless data.has_key?('succeeded')
@finished_at = Time.now
self['finished_at'] = to_time_format_str(@finished_at)
save
end

def finished_at
@finished_at ||= from_time_format_str('finished_at')
end

def expire_at
if finished? && expire_in > 0
finished_at.to_i + expire_in
else
0
end
end

def enqueued?
!started?
def include?(key)
data.include?(key.to_s)
end

def working?
started? && !finished?
end

def started?
started_at ? true :false
end

def finished?
finished_at ? true : false
end

def fail!
self['succeeded'] = false
finish!
end

def succeeded?
finished? ? self['succeeded'] : nil
end

def failed?
finished? ? !self['succeeded'] : nil
end

def seconds_enqueued
(started_at || Time.now).to_f - enqueued_at.to_f
end

def seconds_processing
if started?
(finished_at || Time.now).to_f - started_at.to_f
else
0
# methods in modules can be easily overridden or extended later (with more modules)
module OverridableMethods
def expire_at
if @expire_in && 0 < @expire_in
Time.now.to_i + @expire_in
else
0
end
end
end
include OverridableMethods

protected

def from_time_format_str(key)
(t = self[key]) && Time.parse(t)
end

def to_time_format_str(time)
time.utc.iso8601(6)
end
end
end
end
end
end # class Metadata
end # module Meta
end # module Plugins
end # module Resque
Loading