From 58ff6f7f8ce204b2fb9b29d630b36d348b2b2f9f Mon Sep 17 00:00:00 2001 From: Simon Hewitt Date: Sat, 2 Jul 2011 12:50:19 +0100 Subject: [PATCH 1/8] return number of times a job has been cancelled --- tests/test_job.py | 12 +++++++++--- thoonk/feeds/job.py | 8 +++++++- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/tests/test_job.py b/tests/test_job.py index fa15a8f..03e4ccc 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -36,7 +36,7 @@ def test_10_basic_job(self): #worker testjobworker = self.ps.job("testjob") - id_worker, query_worker = testjobworker.get(timeout=3) + id_worker, query_worker, cancelled = testjobworker.get(timeout=3) result_worker = math.sqrt(float(query_worker)) testjobworker.finish(id_worker, result_worker, True) @@ -51,13 +51,19 @@ def test_20_cancel_job(self): #publisher id = j.put(9.0) #worker claims - id, query = j.get() + id, query, cancelled = j.get() + self.assertEqual(cancelled, 0) #publisher or worker cancels j.cancel(id) - id2, query2 = j.get() + id2, query2, cancelled2 = j.get() + self.assertEqual(cancelled2, 1) self.assertEqual(id, id2) #cancel the work again j.cancel(id) + # check the cancelled increment again + id3, query3, cancelled3 = j.get() + self.assertEqual(cancelled3, 2) + self.assertEqual(id, id3) #cleanup -- remove the job from the queue j.retract(id) self.assertEqual(j.get_ids(), []) diff --git a/thoonk/feeds/job.py b/thoonk/feeds/job.py index 43d06c6..bb6721e 100644 --- a/thoonk/feeds/job.py +++ b/thoonk/feeds/job.py @@ -178,6 +178,11 @@ def get(self, timeout=0): Arguments: timeout -- Optional time in seconds to wait before raising an exception. + + Returns: + id -- The id of the job + job -- The job content + cancelled -- The number of times the job has been cancelled """ id = self.redis.brpop(self.feed_ids, timeout) if id is None: @@ -187,8 +192,9 @@ def get(self, timeout=0): pipe = self.redis.pipeline() pipe.zadd(self.feed_job_claimed, id, time.time()) pipe.hget(self.feed_items, id) + pipe.hget(self.feed_cancelled, id) result = pipe.execute() - return id, result[1] + return id, result[1], int(result[2]) if isinstance(result[2], basestring) else 0 def finish(self, id, item=None, result=False, timeout=None): """ From 309ec1fc6aee6d7658e3e9059d551be268708679 Mon Sep 17 00:00:00 2001 From: Simon Hewitt Date: Sat, 2 Jul 2011 12:57:58 +0100 Subject: [PATCH 2/8] reworked cancelled if then statement --- thoonk/feeds/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/thoonk/feeds/job.py b/thoonk/feeds/job.py index bb6721e..087693c 100644 --- a/thoonk/feeds/job.py +++ b/thoonk/feeds/job.py @@ -194,7 +194,7 @@ def get(self, timeout=0): pipe.hget(self.feed_items, id) pipe.hget(self.feed_cancelled, id) result = pipe.execute() - return id, result[1], int(result[2]) if isinstance(result[2], basestring) else 0 + return id, result[1], 0 if result[2] is None else int(result[2]) def finish(self, id, item=None, result=False, timeout=None): """ From 8442f34f2abe875cb4d90d0f76e3f2b661f5d3d4 Mon Sep 17 00:00:00 2001 From: Simon Hewitt Date: Sat, 2 Jul 2011 13:21:12 +0100 Subject: [PATCH 3/8] job.get now raises Empty if it times out --- tests/test_job.py | 5 ++++- thoonk/feeds/job.py | 3 ++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/test_job.py b/tests/test_job.py index 03e4ccc..2414ef9 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -68,6 +68,9 @@ def test_20_cancel_job(self): j.retract(id) self.assertEqual(j.get_ids(), []) - + def test_30_no_job(self): + j = self.ps.job("testjob") + self.assertRaises(Exception, j.get, timeout=1) + suite = unittest.TestLoader().loadTestsFromTestCase(TestJob) diff --git a/thoonk/feeds/job.py b/thoonk/feeds/job.py index 087693c..ae38c63 100644 --- a/thoonk/feeds/job.py +++ b/thoonk/feeds/job.py @@ -8,6 +8,7 @@ from thoonk.exceptions import * from thoonk.feeds import Queue +from thoonk.feeds.queue import Empty class JobDoesNotExist(Exception): @@ -186,7 +187,7 @@ def get(self, timeout=0): """ id = self.redis.brpop(self.feed_ids, timeout) if id is None: - return # raise exception? + raise Empty id = id[1] pipe = self.redis.pipeline() From 0dbbf8fca6d8a967172eda662d773f5378795321 Mon Sep 17 00:00:00 2001 From: Simon Hewitt Date: Sat, 2 Jul 2011 13:23:57 +0100 Subject: [PATCH 4/8] made assertRaises explicitly catch Empty exception --- tests/test_job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_job.py b/tests/test_job.py index 2414ef9..7f2cf28 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -70,7 +70,7 @@ def test_20_cancel_job(self): def test_30_no_job(self): j = self.ps.job("testjob") - self.assertRaises(Exception, j.get, timeout=1) + self.assertRaises(thoonk.feeds.queue.Empty, j.get, timeout=1) suite = unittest.TestLoader().loadTestsFromTestCase(TestJob) From c915e77dbefca932b4b2a3e6a6894e67ccb9c02c Mon Sep 17 00:00:00 2001 From: Simon Hewitt Date: Mon, 25 Jul 2011 15:44:52 +0100 Subject: [PATCH 5/8] added publish events for job feeds --- tests/test_job.py | 4 +- tests/test_notice.py | 181 +++++++++++++++++++++++++++++++++++++++++++ thoonk/feeds/feed.py | 2 +- thoonk/feeds/job.py | 39 +++++++--- thoonk/pubsub.py | 116 ++++++++++++++++++++++++++- 5 files changed, 327 insertions(+), 15 deletions(-) create mode 100644 tests/test_notice.py diff --git a/tests/test_job.py b/tests/test_job.py index 7f2cf28..95967ef 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -32,7 +32,7 @@ def test_10_basic_job(self): """JOB publish, retrieve, finish, get result""" #publisher testjob = self.ps.job("testjob") - id = testjob.put(9.0) + id = testjob.put('9.0') #worker testjobworker = self.ps.job("testjob") @@ -49,7 +49,7 @@ def test_20_cancel_job(self): """Test cancelling a job""" j = self.ps.job("testjob") #publisher - id = j.put(9.0) + id = j.put('9.0') #worker claims id, query, cancelled = j.get() self.assertEqual(cancelled, 0) diff --git a/tests/test_notice.py b/tests/test_notice.py new file mode 100644 index 0000000..dac741a --- /dev/null +++ b/tests/test_notice.py @@ -0,0 +1,181 @@ +import thoonk +import unittest +import time +from ConfigParser import ConfigParser + +class TestNotice(unittest.TestCase): + + def __init__(self, *args, **kwargs): + unittest.TestCase.__init__(self, *args, **kwargs) + + conf = ConfigParser() + conf.read('test.cfg') + if conf.sections() == ['Test']: + self.ps = thoonk.Thoonk(host=conf.get('Test', 'host'), + port=conf.getint('Test', 'port'), + db=conf.getint('Test', 'db'), + listen=True) + self.ps.redis.flushdb() + else: + print 'No test configuration found in test.cfg' + exit() + + + def tearDown(self): + self.ps.close() + + "claimed, cancelled, stalled, finished" + + def test_05_publish_notice(self): + notice_received = [False] + ids = [None, None] + + def received_handler(feed, item, id): + self.assertEqual(feed, "testfeed") + ids[1] = id + notice_received[0] = True + + self.ps.register_handler('publish_notice', received_handler) + + j = self.ps.feed("testfeed") + + self.assertFalse(notice_received[0]) + + #publisher + ids[0] = j.publish('a') + + # block while waiting for notice + i = 0 + while not notice_received[0] and i < 3: + i += 1 + time.sleep(1) + + self.assertEqual(ids[0], ids[1]) + + self.assertTrue(notice_received[0], "Notice not received") + + self.ps.remove_handler('publish_notice', received_handler) + + def test_10_job_notices(self): + notices_received = [False] + ids = [None, None] + + def publish_handler(feed, item, id): + self.assertEqual(feed, "testjob") + ids[-1] = id + notices_received[-1] = "publish" + + def claimed_handler(feed, id): + self.assertEqual(feed, "testjob") + ids[-1] = id + notices_received[-1] = "claimed" + + def cancelled_handler(feed, id): + self.assertEqual(feed, "testjob") + ids[-1] = id + notices_received[-1] = "cancelled" + + def stalled_handler(feed, id): + self.assertEqual(feed, "testjob") + ids[-1] = id + notices_received[-1] = "stalled" + + def retried_handler(feed, id): + self.assertEqual(feed, "testjob") + ids[-1] = id + notices_received[-1] = "retried" + + def finished_handler(feed, id, result): + self.assertEqual(feed, "testjob") + ids[-1] = id + notices_received[-1] = "finished" + + def do_wait(): + i = 0 + while not notices_received[-1] and i < 2: + i += 1 + time.sleep(1) + + self.ps.register_handler('publish_notice', publish_handler) + self.ps.register_handler('claimed_notice', claimed_handler) + self.ps.register_handler('cancelled_notice', cancelled_handler) + self.ps.register_handler('stalled_notice', stalled_handler) + self.ps.register_handler('retried_notice', retried_handler) + self.ps.register_handler('finished_notice', finished_handler) + + j = self.ps.job("testjob") + + self.assertFalse(notices_received[0]) + + # create the job + ids[0] = j.put('b') + do_wait() + self.assertEqual(notices_received[0], "publish", "Notice not received") + self.assertEqual(ids[0], ids[-1]) + + notices_received.append(False); ids.append(None); + # claim the job + id, job, cancelled = j.get() + self.assertEqual(job, 'b') + self.assertEqual(cancelled, 0) + self.assertEqual(ids[0], id) + do_wait() + self.assertEqual(notices_received[-1], "claimed", "Claimed notice not received") + self.assertEqual(ids[0], ids[-1]) + + notices_received.append(False); ids.append(None); + # cancel the job + j.cancel(id) + do_wait() + self.assertEqual(notices_received[-1], "cancelled", "Cancelled notice not received") + self.assertEqual(ids[0], ids[-1]) + + notices_received.append(False); ids.append(None); + # get the job again + id, job, cancelled = j.get() + self.assertEqual(job, 'b') + self.assertEqual(cancelled, 1) + self.assertEqual(ids[0], id) + do_wait() + self.assertEqual(notices_received[-1], "claimed", "Claimed notice not received") + self.assertEqual(ids[0], ids[-1]) + + notices_received.append(False); ids.append(None); + # stall the job + j.stall(id) + do_wait() + self.assertEqual(notices_received[-1], "stalled", "Stalled notice not received") + self.assertEqual(ids[0], ids[-1]) + + notices_received.append(False); ids.append(None); + # retry the job + j.retry(id) + do_wait() + self.assertEqual(notices_received[-1], "retried", "Retried notice not received") + self.assertEqual(ids[0], ids[-1]) + + notices_received.append(False); ids.append(None); + # get the job again + id, job, cancelled = j.get() + self.assertEqual(job, 'b') + self.assertEqual(cancelled, 0) + self.assertEqual(ids[0], id) + do_wait() + self.assertEqual(notices_received[-1], "claimed", "Claimed notice not received") + self.assertEqual(ids[0], ids[-1]) + + notices_received.append(False); ids.append(None); + # finish the job + j.finish(id) + do_wait() + self.assertEqual(notices_received[-1], "finished", "Finished notice not received") + self.assertEqual(ids[0], ids[-1]) + + self.ps.remove_handler('publish_notice', publish_handler) + self.ps.remove_handler('claimed_notice', claimed_handler) + self.ps.remove_handler('cancelled_notice', cancelled_handler) + self.ps.remove_handler('stalled_notice', stalled_handler) + self.ps.remove_handler('retried_notice', retried_handler) + self.ps.remove_handler('finished_notice', finished_handler) + +suite = unittest.TestLoader().loadTestsFromTestCase(TestNotice) diff --git a/thoonk/feeds/feed.py b/thoonk/feeds/feed.py index ed89e04..ca32152 100644 --- a/thoonk/feeds/feed.py +++ b/thoonk/feeds/feed.py @@ -124,7 +124,7 @@ def config(self): """ with self.config_lock: if not self.config_valid: - conf = self.redis.get(self.feed_config) + conf = self.redis.get(self.feed_config) or "{}" self._config = json.loads(conf) self.config_valid = True return self._config diff --git a/thoonk/feeds/job.py b/thoonk/feeds/job.py index ae38c63..bb4303b 100644 --- a/thoonk/feeds/job.py +++ b/thoonk/feeds/job.py @@ -5,6 +5,7 @@ import time import uuid +import redis from thoonk.exceptions import * from thoonk.feeds import Queue @@ -56,7 +57,7 @@ class Job(Queue): feed.cancelled:[feed] -- A hash table of cancelled jobs. feed.claimed:[feed] -- A hash table of claimed jobs. feed.stalled:[feed] -- A hash table of stalled jobs. - feeed.funning:[feed] -- A hash table of running jobs. + feed.running:[feed] -- A hash table of running jobs. feed.finished:[feed]\x00[id] -- Temporary queue for receiving job result data. @@ -91,24 +92,30 @@ def __init__(self, thoonk, feed, config=None): """ Queue.__init__(self, thoonk, feed, config=None) - self.feed_published = 'feed.published:%s' % feed + #self.feed_publish = 'feed.publish:%s' % feed self.feed_cancelled = 'feed.cancelled:%s' % feed + self.feed_retried = 'feed.retried:%s' % feed + self.feed_finished = 'feed.finished:%s' % feed self.feed_job_claimed = 'feed.claimed:%s' % feed self.feed_job_stalled = 'feed.stalled:%s' % feed self.feed_job_finished = 'feed.finished:%s\x00%s' % (feed, '%s') self.feed_job_running = 'feed.running:%s' % feed + def get_channels(self): + return (self.feed_publish, self.feed_job_claimed, self.feed_job_stalled, + self.feed_finished, self.feed_cancelled, self.feed_retried) + def get_schemas(self): """Return the set of Redis keys used exclusively by this feed.""" schema = set((self.feed_job_claimed, self.feed_job_stalled, self.feed_job_running, - self.feed_published, + self.feed_publish, self.feed_cancelled)) for id in self.get_ids(): schema.add(self.feed_job_finished % id) - + return schema.union(Queue.get_schemas(self)) def get_ids(self): @@ -128,7 +135,7 @@ def retract(self, id): pipe = self.redis.pipeline() pipe.hdel(self.feed_items, id) pipe.hdel(self.feed_cancelled, id) - pipe.zrem(self.feed_published, id) + pipe.zrem(self.feed_publish, id) pipe.srem(self.feed_job_stalled, id) pipe.zrem(self.feed_job_claimed, id) pipe.lrem(self.feed_ids, 1, id) @@ -165,9 +172,16 @@ def put(self, item, priority=False): pipe.lpush(self.feed_ids, id) pipe.incr(self.feed_publishes) pipe.hset(self.feed_items, id, item) - pipe.zadd(self.feed_published, id, time.time()) + pipe.zadd(self.feed_publish, id, time.time()) results = pipe.execute() + + if results[-1]: + # If zadd was successful + self.thoonk._publish(self.feed_publish, (id, item)) + else: + self.thoonk._publish(self.feed_edit, (id, item)) + return id def get(self, timeout=0): @@ -195,6 +209,9 @@ def get(self, timeout=0): pipe.hget(self.feed_items, id) pipe.hget(self.feed_cancelled, id) result = pipe.execute() + + self.thoonk._publish(self.feed_job_claimed, (id,)) + return id, result[1], 0 if result[2] is None else int(result[2]) def finish(self, id, item=None, result=False, timeout=None): @@ -226,7 +243,8 @@ def finish(self, id, item=None, result=False, timeout=None): pipe.expire(self.feed_job_finished % id, timeout) pipe.hdel(self.feed_items, id) try: - result = pipe.execute() + pipe.execute() + self.thoonk._publish(self.feed_finished, (id, result if result else "")) break except redis.exceptions.WatchError: pass @@ -263,6 +281,7 @@ def cancel(self, id): pipe.zrem(self.feed_job_claimed, id) try: pipe.execute() + self.thoonk._publish(self.feed_cancelled, (id,)) break except redis.exceptions.WatchError: pass @@ -286,9 +305,10 @@ def stall(self, id): pipe.zrem(self.feed_job_claimed, id) pipe.hdel(self.feed_cancelled, id) pipe.sadd(self.feed_job_stalled, id) - pipe.zrem(self.feed_published, id) + pipe.zrem(self.feed_publish, id) try: pipe.execute() + self.thoonk._publish(self.feed_job_stalled, (id,)) break except redis.exceptions.WatchError: pass @@ -309,9 +329,10 @@ def retry(self, id): pipe = self.redis.pipeline() pipe.srem(self.feed_job_stalled, id) pipe.lpush(self.feed_ids, id) - pipe.zadd(self.feed_published, time.time(), id) + pipe.zadd(self.feed_publish, time.time(), id) try: results = pipe.execute() + self.thoonk._publish(self.feed_retried, (id,)) if not results[0]: return # raise exception? break diff --git a/thoonk/pubsub.py b/thoonk/pubsub.py index a060798..7c30566 100644 --- a/thoonk/pubsub.py +++ b/thoonk/pubsub.py @@ -95,7 +95,12 @@ def __init__(self, host='localhost', port=6379, db=0, listen=False): 'delete_notice': [], 'publish_notice': [], 'retract_notice': [], - 'position_notice': []} + 'position_notice': [], + 'stalled_notice': [], + 'retried_notice': [], + 'finished_notice': [], + 'claimed_notice': [], + 'cancelled_notice': []} self.listen_ready = threading.Event() self.listening = listen @@ -206,6 +211,20 @@ def register_handler(self, name, handler): self.handlers[name] = [] self.handlers[name].append(handler) + def remove_handler(self, name, handler): + """ + Unregister a function that was registered via register_handler + + Arguments: + name -- The name of the feed event. + handler -- The function for handling the event. + """ + try: + self.handlers[name].remove(handler) + except (KeyError, ValueError): + pass + + def create_feed(self, feed, config): """ Create a new feed with a given configuration. @@ -273,6 +292,12 @@ def set_config(self, feed, config): self.redis.set(self.feed_config % feed, jconfig) self._publish(self.conf_feed, (feed, self._feed_config.instance)) + def get_config(self, feed): + if not self.feed_exists(feed): + raise FeedDoesNotExist + config = self.redis.get(self.feed_config % feed) + return json.loads(config) + def get_feeds(self): """ Return the set of known feeds. @@ -341,12 +366,36 @@ def listen(self): elif event['channel'].startswith('feed.position'): self.position_notice(event['channel'].split(':', 1)[-1], event['data']) + elif event['channel'].startswith('feed.claimed'): + self.claimed_notice(event['channel'].split(':', 1)[-1], + event['data']) + elif event['channel'].startswith('feed.finished'): + id, data = event['data'].split(':', 1)[-1].split("\x00", 1) + self.finished_notice(event['channel'].split(':', 1)[-1], id, + data) + elif event['channel'].startswith('feed.cancelled'): + self.cancelled_notice(event['channel'].split(':', 1)[-1], + event['data']) + elif event['channel'].startswith('feed.stalled'): + self.stalled_notice(event['channel'].split(':', 1)[-1], + event['data']) + elif event['channel'].startswith('feed.retried'): + self.retried_notice(event['channel'].split(':', 1)[-1], + event['data']) elif event['channel'] == self.new_feed: #feed created event name, instance = event['data'].split('\x00') self.feeds.add(name) - self.lredis.subscribe((self.feed_publish % name, - self.feed_retract % name)) + config = self.get_config(name) + if config["type"] == "job": + self.lredis.subscribe((self.feed_publish % name, + "feed.cancelled:%s" % name, + "feed.claimed:%s" % name, + "feed.finished:%s" % name, + "feed.stalled:%s" % name,)) + else: + self.lredis.subscribe((self.feed_publish % name, + self.feed_retract % name)) self.create_notice(name) elif event['channel'] == self.del_feed: #feed destroyed event @@ -424,3 +473,64 @@ def position_notice(self, feed, id, rel_id): """ for handler in self.handlers['position_notice']: handler(feed, id, rel_id) + + def stalled_notice(self, feed, id): + """ + Generate a notice that a job has been stalled, and + execute any relevant event handlers. + + Arguments: + feed -- The name of the feed. + id -- The ID of the stalled item. + """ + for handler in self.handlers['stalled_notice']: + handler(feed, id) + + def retried_notice(self, feed, id): + """ + Generate a notice that a job has been retried, and + execute any relevant event handlers. + + Arguments: + feed -- The name of the feed. + id -- The ID of the retried item. + """ + for handler in self.handlers['retried_notice']: + handler(feed, id) + + def cancelled_notice(self, feed, id): + """ + Generate a notice that a job has been cancelled, and + execute any relevant event handlers. + + Arguments: + feed -- The name of the feed. + id -- The ID of the stalled item. + """ + for handler in self.handlers['cancelled_notice']: + handler(feed, id) + + def finished_notice(self, feed, id, result): + """ + Generate a notice that a job has finished, and + execute any relevant event handlers. + + Arguments: + feed -- The name of the feed. + id -- The ID of the stalled item. + """ + for handler in self.handlers['finished_notice']: + handler(feed, id, result) + + def claimed_notice(self, feed, id): + """ + Generate a notice that a job has been claimed, and + execute any relevant event handlers. + + Arguments: + feed -- The name of the feed. + id -- The ID of the stalled item. + """ + for handler in self.handlers['claimed_notice']: + handler(feed, id) + \ No newline at end of file From 332b4613b125e5323048c059f10104d6e5b9948a Mon Sep 17 00:00:00 2001 From: Simon Hewitt Date: Wed, 27 Jul 2011 00:21:37 +0100 Subject: [PATCH 6/8] updated get_feeds to update from redis --- thoonk/pubsub.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/thoonk/pubsub.py b/thoonk/pubsub.py index 7c30566..b8b0ded 100644 --- a/thoonk/pubsub.py +++ b/thoonk/pubsub.py @@ -304,6 +304,7 @@ def get_feeds(self): Returns: set """ + self.feeds.update(self.redis.smembers('feeds')) return self.feeds def feed_exists(self, feed): @@ -346,11 +347,10 @@ def listen(self): self.lredis.subscribe((self.new_feed, self.del_feed, self.conf_feed)) # get set of feeds - self.feeds.update(self.redis.smembers('feeds')) - if self.feeds: - # subscribe to exist feeds retract and publish - for feed in self.feeds: - self.lredis.subscribe(self[feed].get_channels()) + feeds = self.get_feeds() + # subscribe to exist feeds retract and publish + for feed in self.feeds: + self.lredis.subscribe(self[feed].get_channels()) self.listen_ready.set() for event in self.lredis.listen(): From 6defac447f6ea73802c29e08da565d3977839893 Mon Sep 17 00:00:00 2001 From: Simon Hewitt Date: Wed, 27 Jul 2011 13:10:41 +0100 Subject: [PATCH 7/8] updated gitignore --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index 488838e..04712b9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ *.pyc .*.swp build/ +.project +.pydevproject +.settings From e4809d4b26b8b7b4cdf7cc5db6ce25c7437bb3ac Mon Sep 17 00:00:00 2001 From: Simon Hewitt Date: Wed, 3 Aug 2011 18:00:34 +0100 Subject: [PATCH 8/8] reverted job queue name to publishes to be inline with spec --- thoonk/feeds/job.py | 16 ++++++++-------- thoonk/pubsub.py | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/thoonk/feeds/job.py b/thoonk/feeds/job.py index bb4303b..c6d7c1d 100644 --- a/thoonk/feeds/job.py +++ b/thoonk/feeds/job.py @@ -92,7 +92,7 @@ def __init__(self, thoonk, feed, config=None): """ Queue.__init__(self, thoonk, feed, config=None) - #self.feed_publish = 'feed.publish:%s' % feed + self.feed_publishes = 'feed.publishes:%s' % feed self.feed_cancelled = 'feed.cancelled:%s' % feed self.feed_retried = 'feed.retried:%s' % feed self.feed_finished = 'feed.finished:%s' % feed @@ -102,7 +102,7 @@ def __init__(self, thoonk, feed, config=None): self.feed_job_running = 'feed.running:%s' % feed def get_channels(self): - return (self.feed_publish, self.feed_job_claimed, self.feed_job_stalled, + return (self.feed_publishes, self.feed_job_claimed, self.feed_job_stalled, self.feed_finished, self.feed_cancelled, self.feed_retried) def get_schemas(self): @@ -110,7 +110,7 @@ def get_schemas(self): schema = set((self.feed_job_claimed, self.feed_job_stalled, self.feed_job_running, - self.feed_publish, + self.feed_publishes, self.feed_cancelled)) for id in self.get_ids(): @@ -135,7 +135,7 @@ def retract(self, id): pipe = self.redis.pipeline() pipe.hdel(self.feed_items, id) pipe.hdel(self.feed_cancelled, id) - pipe.zrem(self.feed_publish, id) + pipe.zrem(self.feed_publishes, id) pipe.srem(self.feed_job_stalled, id) pipe.zrem(self.feed_job_claimed, id) pipe.lrem(self.feed_ids, 1, id) @@ -172,13 +172,13 @@ def put(self, item, priority=False): pipe.lpush(self.feed_ids, id) pipe.incr(self.feed_publishes) pipe.hset(self.feed_items, id, item) - pipe.zadd(self.feed_publish, id, time.time()) + pipe.zadd(self.feed_publishes, id, time.time()) results = pipe.execute() if results[-1]: # If zadd was successful - self.thoonk._publish(self.feed_publish, (id, item)) + self.thoonk._publish(self.feed_publishes, (id, item)) else: self.thoonk._publish(self.feed_edit, (id, item)) @@ -305,7 +305,7 @@ def stall(self, id): pipe.zrem(self.feed_job_claimed, id) pipe.hdel(self.feed_cancelled, id) pipe.sadd(self.feed_job_stalled, id) - pipe.zrem(self.feed_publish, id) + pipe.zrem(self.feed_publishes, id) try: pipe.execute() self.thoonk._publish(self.feed_job_stalled, (id,)) @@ -329,7 +329,7 @@ def retry(self, id): pipe = self.redis.pipeline() pipe.srem(self.feed_job_stalled, id) pipe.lpush(self.feed_ids, id) - pipe.zadd(self.feed_publish, time.time(), id) + pipe.zadd(self.feed_publishes, time.time(), id) try: results = pipe.execute() self.thoonk._publish(self.feed_retried, (id,)) diff --git a/thoonk/pubsub.py b/thoonk/pubsub.py index b8b0ded..84d0f27 100644 --- a/thoonk/pubsub.py +++ b/thoonk/pubsub.py @@ -388,7 +388,7 @@ def listen(self): self.feeds.add(name) config = self.get_config(name) if config["type"] == "job": - self.lredis.subscribe((self.feed_publish % name, + self.lredis.subscribe(("feed.publishes:%s" % name, "feed.cancelled:%s" % name, "feed.claimed:%s" % name, "feed.finished:%s" % name,