|
1 |
| -import pickle |
2 |
| -from abc import abstractmethod |
3 | 1 | from logging import getLogger
|
4 | 2 | from typing import Any, AsyncGenerator, Callable, Optional, TypeVar
|
5 | 3 |
|
@@ -49,68 +47,66 @@ def __init__(
|
49 | 47 |
|
50 | 48 | async def shutdown(self) -> None:
|
51 | 49 | """Closes redis connection pool."""
|
| 50 | + await super().shutdown() |
52 | 51 | await self.connection_pool.disconnect()
|
53 | 52 |
|
54 |
| - async def listen(self) -> AsyncGenerator[BrokerMessage, None]: |
55 |
| - """ |
56 |
| - Listen redis queue for new messages. |
57 | 53 |
|
58 |
| - This function listens to the queue |
59 |
| - and yields new messages if they have BrokerMessage type. |
| 54 | +class PubSubBroker(BaseRedisBroker): |
| 55 | + """Broker that works with Redis and broadcasts tasks to all workers.""" |
60 | 56 |
|
61 |
| - :yields: broker messages. |
62 |
| - """ |
63 |
| - async for message in self._listen_to_raw_messages(): |
64 |
| - try: |
65 |
| - redis_message = pickle.loads(message) |
66 |
| - if isinstance(redis_message, BrokerMessage): |
67 |
| - yield redis_message |
68 |
| - except ( |
69 |
| - TypeError, |
70 |
| - AttributeError, |
71 |
| - pickle.UnpicklingError, |
72 |
| - ) as exc: |
73 |
| - logger.debug( |
74 |
| - "Cannot read broker message %s", |
75 |
| - exc, |
76 |
| - exc_info=True, |
77 |
| - ) |
78 |
| - |
79 |
| - @abstractmethod |
80 |
| - async def _listen_to_raw_messages(self) -> AsyncGenerator[bytes, None]: |
| 57 | + async def kick(self, message: BrokerMessage) -> None: |
81 | 58 | """
|
82 |
| - Generator for reading raw data from Redis. |
| 59 | + Publish message over PUBSUB channel. |
83 | 60 |
|
84 |
| - :yields: raw data. |
| 61 | + :param message: message to send. |
85 | 62 | """
|
86 |
| - yield # type: ignore |
87 |
| - |
| 63 | + async with Redis(connection_pool=self.connection_pool) as redis_conn: |
| 64 | + await redis_conn.publish(self.queue_name, message.message) |
88 | 65 |
|
89 |
| -class PubSubBroker(BaseRedisBroker): |
90 |
| - """Broker that works with Redis and broadcasts tasks to all workers.""" |
| 66 | + async def listen(self) -> AsyncGenerator[bytes, None]: |
| 67 | + """ |
| 68 | + Listen redis queue for new messages. |
91 | 69 |
|
92 |
| - async def kick(self, message: BrokerMessage) -> None: # noqa: D102 |
93 |
| - async with Redis(connection_pool=self.connection_pool) as redis_conn: |
94 |
| - await redis_conn.publish(self.queue_name, pickle.dumps(message)) |
| 70 | + This function listens to the pubsub channel |
| 71 | + and yields all messages with proper types. |
95 | 72 |
|
96 |
| - async def _listen_to_raw_messages(self) -> AsyncGenerator[bytes, None]: |
| 73 | + :yields: broker messages. |
| 74 | + """ |
97 | 75 | async with Redis(connection_pool=self.connection_pool) as redis_conn:
|
98 | 76 | redis_pubsub_channel = redis_conn.pubsub()
|
99 | 77 | await redis_pubsub_channel.subscribe(self.queue_name)
|
100 | 78 | async for message in redis_pubsub_channel.listen():
|
101 | 79 | if not message:
|
102 | 80 | continue
|
| 81 | + if message["type"] != "message": |
| 82 | + logger.debug("Received non-message from redis: %s", message) |
| 83 | + continue |
103 | 84 | yield message["data"]
|
104 | 85 |
|
105 | 86 |
|
106 | 87 | class ListQueueBroker(BaseRedisBroker):
|
107 | 88 | """Broker that works with Redis and distributes tasks between workers."""
|
108 | 89 |
|
109 |
| - async def kick(self, message: BrokerMessage) -> None: # noqa: D102 |
| 90 | + async def kick(self, message: BrokerMessage) -> None: |
| 91 | + """ |
| 92 | + Put a message in a list. |
| 93 | +
|
| 94 | + This method appends a message to the list of all messages. |
| 95 | +
|
| 96 | + :param message: message to append. |
| 97 | + """ |
110 | 98 | async with Redis(connection_pool=self.connection_pool) as redis_conn:
|
111 |
| - await redis_conn.lpush(self.queue_name, pickle.dumps(message)) |
| 99 | + await redis_conn.lpush(self.queue_name, message.message) |
112 | 100 |
|
113 |
| - async def _listen_to_raw_messages(self) -> AsyncGenerator[bytes, None]: |
| 101 | + async def listen(self) -> AsyncGenerator[bytes, None]: |
| 102 | + """ |
| 103 | + Listen redis queue for new messages. |
| 104 | +
|
| 105 | + This function listens to the queue |
| 106 | + and yields new messages if they have BrokerMessage type. |
| 107 | +
|
| 108 | + :yields: broker messages. |
| 109 | + """ |
114 | 110 | redis_brpop_data_position = 1
|
115 | 111 | async with Redis(connection_pool=self.connection_pool) as redis_conn:
|
116 | 112 | while True: # noqa: WPS457
|
|
0 commit comments