Skip to content
This repository was archived by the owner on Jan 10, 2019. It is now read-only.
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion thoonk/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from thoonk import feeds, cache
from thoonk.exceptions import FeedExists, FeedDoesNotExist, NotListening


class Thoonk(object):

"""
Expand Down Expand Up @@ -154,7 +155,6 @@ def startclass(feed, config=None):
pass
return self._feeds[feed]


setattr(self, feedtype, startclass)

def register_handler(self, name, handler):
Expand Down Expand Up @@ -300,19 +300,26 @@ def run(self):
"""
# listener redis object
self._pubsub = self.redis.pubsub()
#if self._pubsub.connection is None:
# self._pubsub.connection = self._pubsub.connection_pool.get_connection(
# 'pubsub'
# )
# subscribe to feed activities channel
self._pubsub.subscribe((self._finish_channel, 'newfeed', 'delfeed', 'conffeed'))
self._pubsub.parse_response()

# subscribe to exist feeds retract and publish
for feed in self.redis.smembers("feeds"):
self._pubsub.subscribe(self.thoonk._feeds[feed].get_channels())
self._pubsub.parse_response()

self.ready.set()
for event in self._pubsub.listen():
type = event.pop("type")
if event["channel"] == self._finish_channel:
if self._pubsub.subscription_count:
self._pubsub.unsubscribe()
self._pubsub.parse_response()
elif type == 'message':
self._handle_message(**event)
elif type == 'pmessage':
Expand All @@ -326,6 +333,7 @@ def _handle_message(self, channel, data, pattern=None):
name, _ = data.split('\x00')
self._pubsub.subscribe(("feed.publish:"+name, "feed.edit:"+name,
"feed.retract:"+name, "feed.position:"+name, "job.finish:"+name))
self._pubsub.parse_response()
self.emit("create", name)

elif channel == 'delfeed':
Expand Down