Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 12 additions & 23 deletions talker/reactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import redis.exceptions

from easypy.concurrency import _check_exiting, concurrent, _run_with_exception_logging, raise_in_main_thread
from easypy.timing import wait, Timer
from easypy.timing import wait, Timer, TimeoutException
from easypy.units import MINUTE

from talker.errors import NoResponseForRedisCommand, RedisConnectionError, RedisTimeoutError
Expand All @@ -19,17 +19,15 @@ class TalkerReactor():
ASYNC_COMMANDS = {'rpush', 'expire'}
BLOCKING_COMMANDS = {'blpop', 'brpop', 'get'}

CmdItem = namedtuple("CmdItem", "cmd_idx cmd_id cmd args kwargs callback event results")
CmdItem = namedtuple("CmdItem", "cmd_id cmd args kwargs callback event results")

def __init__(self, talker):
self.talker = talker
self._max_workers = 5
self._executors = ThreadPoolExecutor(max_workers=self._max_workers)
self._commands_queue = Queue()
self._commands = dict()
self._lock = RLock()
self._current_workers = Semaphore(self._max_workers)
self._cmd_idx = 0

reactor_loop = _logger.context(host="TLKR-reactor-loop")(self._get_main_loop)
self._main_loop = concurrent(func=reactor_loop, threadname="TLKR-reactor")
Expand Down Expand Up @@ -86,39 +84,30 @@ def _send_data(self, items):
self._current_workers.release()

def send(self, cmd, *args, _async=False, _callback=None, _cmd_id=None, **kwargs):
with self._lock:
self._cmd_idx += 1
cmd_idx = self._cmd_idx

event = None if cmd in self.ASYNC_COMMANDS else Event()
item = self.CmdItem(
cmd_idx=cmd_idx, cmd_id=_cmd_id, cmd=cmd,
cmd_id=_cmd_id, cmd=cmd,
args=args, kwargs=kwargs, callback=_callback,
event=event, results=[])

if cmd not in self.ASYNC_COMMANDS:
self._commands[cmd_idx] = item

if cmd not in self.BLOCKING_COMMANDS:
self._log_cmd(cmd, _cmd_id)

self._commands_queue.put(item)

if _async:
return cmd_idx
if event is not None:
return self.get_response(cmd_idx)

def get_response(self, cmd_idx):
item = self._commands.pop(cmd_idx)
if event is not None: # not async
return self.get_response(item)

def get_response(self, item):
def has_response():
_check_exiting()
if not item.event.wait(timeout=0.5):
raise NoResponseForRedisCommand(talker=self.talker, **item._asdict())
return True
return item.event.wait(timeout=0.5)

try:
wait(REDIS_SOCKET_TIMEOUT + MINUTE, has_response, message=False, progressbar=False, sleep=0)
except TimeoutException:
raise NoResponseForRedisCommand(talker=self.talker, **item._asdict())

wait(REDIS_SOCKET_TIMEOUT + MINUTE, has_response, message=False, progressbar=False, sleep=0)
[response] = item.results
return response

Expand Down