-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathconsumer.py
More file actions
62 lines (50 loc) · 1.58 KB
/
consumer.py
File metadata and controls
62 lines (50 loc) · 1.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# -*- coding: utf-8 -*-
"""
@author: Vitor Villar <vitor.luis98@gmail.com>
"""
import urllib
from threading import Thread
import requests
import sseclient
from .message import Message
class Consumer(Thread):
mercure_hub = None
topics = None
sse_client = None
callback = None
def __init__(self, mercure_hub, topics, callback):
super().__init__()
self.mercure_hub = mercure_hub
self.topics = topics
self.callback = callback
def start_consumption(self):
"""
Consumes the message into a new thread
:return:
"""
response = self._create_request()
self.sse_client = sseclient.SSEClient(response)
self.start()
def run(self) -> None:
"""
Start the event listening
"""
for event in self.sse_client.events():
# Create a new message object for each new income message
msg = Message(self.topics, event.data, message_id=event.id, event_type=event.event)
self.callback(msg)
def _create_request(self):
"""
Creates the request needed to get the messages
:return requests.api: The response object
"""
url = "{}?{}".format(self.mercure_hub, self._create_consumer_query_string())
return requests.get(url, stream=True)
def _create_consumer_query_string(self):
"""
Creates the url with the topic parameters
:return str: the whole url needed to call mercure
"""
return urllib.parse.urlencode({
'topic': self.topics
}, True)