diff --git a/contract.txt b/contract.txt index 9b885cf..d8e224a 100644 --- a/contract.txt +++ b/contract.txt @@ -199,6 +199,7 @@ Job: // id = generated uuid MULTI RPUSH feed.ids:[feed] [id] + INCR feed.publishes:[feed] HSET feed.items:[feed] [id] [item] ZADD feed.published:[feed] [utc epoch milliseconds] [id] EXEC @@ -284,4 +285,4 @@ Job: LPUSH feed.ids:[feed] [key] check claimed jobs to see if any have been claimed too long and "Cancel" or "Stall" them - publish stats to a feed \ No newline at end of file + publish stats to a feed diff --git a/thoonk/feeds/job.py b/thoonk/feeds/job.py index c439d78..18d15de 100644 --- a/thoonk/feeds/job.py +++ b/thoonk/feeds/job.py @@ -90,8 +90,8 @@ def __init__(self, thoonk, feed): self.feed_claimed = 'feed.claimed:%s' % feed self.feed_stalled = 'feed.stalled:%s' % feed self.feed_running = 'feed.running:%s' % feed - - self.job_finish = 'job.finish:%s' % feed + + self.job_finish = 'job.finish:%s' % feed def get_channels(self): return (self.feed_publishes, self.feed_claimed, self.feed_stalled, @@ -127,7 +127,7 @@ def _retract(pipe): pipe.srem(self.feed_stalled, id) pipe.zrem(self.feed_claimed, id) pipe.lrem(self.feed_ids, 1, id) - + self.redis.transaction(_retract, self.feed_items) def put(self, item, priority=False): @@ -172,7 +172,7 @@ def get(self, timeout=0): Arguments: timeout -- Optional time in seconds to wait before raising an exception. - + Returns: id -- The id of the job job -- The job content @@ -188,14 +188,14 @@ def get(self, timeout=0): pipe.hget(self.feed_items, id) pipe.hget(self.feed_cancelled, id) result = pipe.execute() - + self.thoonk._publish(self.feed_claimed, (id,)) return id, result[1], 0 if result[2] is None else int(result[2]) def get_failure_count(self, id): return int(self.redis.hget(self.feed_cancelled, id) or 0) - + NO_RESULT = [] def finish(self, id, result=NO_RESULT): """ @@ -216,7 +216,7 @@ def _finish(pipe): if result is not self.NO_RESULT: self.thoonk._publish(self.job_finish, (id, result), pipe) pipe.hdel(self.feed_items, id) - + self.redis.transaction(_finish, self.feed_claimed) def cancel(self, id): @@ -233,7 +233,7 @@ def _cancel(pipe): pipe.hincrby(self.feed_cancelled, id, 1) pipe.lpush(self.feed_ids, id) pipe.zrem(self.feed_claimed, id) - + self.redis.transaction(_cancel, self.feed_claimed) def stall(self, id): @@ -253,7 +253,7 @@ def _stall(pipe): pipe.hdel(self.feed_cancelled, id) pipe.sadd(self.feed_stalled, id) pipe.zrem(self.feed_published, id) - + self.redis.transaction(_stall, self.feed_claimed) def retry(self, id): @@ -270,7 +270,7 @@ def _retry(pipe): pipe.srem(self.feed_stalled, id) pipe.lpush(self.feed_ids, id) pipe.zadd(self.feed_published, **{id: time.time()}) - + results = self.redis.transaction(_retry, self.feed_stalled) if not results[0]: return # raise exception?