diff --git a/talker/reactor.py b/talker/reactor.py index 0ad8ce3..0f7a20e 100644 --- a/talker/reactor.py +++ b/talker/reactor.py @@ -8,7 +8,7 @@ import redis.exceptions from easypy.concurrency import _check_exiting, concurrent, _run_with_exception_logging, raise_in_main_thread -from easypy.timing import wait, Timer +from easypy.timing import wait, Timer, TimeoutException from easypy.units import MINUTE from talker.errors import NoResponseForRedisCommand, RedisConnectionError, RedisTimeoutError @@ -19,17 +19,15 @@ class TalkerReactor(): ASYNC_COMMANDS = {'rpush', 'expire'} BLOCKING_COMMANDS = {'blpop', 'brpop', 'get'} - CmdItem = namedtuple("CmdItem", "cmd_idx cmd_id cmd args kwargs callback event results") + CmdItem = namedtuple("CmdItem", "cmd_id cmd args kwargs callback event results") def __init__(self, talker): self.talker = talker self._max_workers = 5 self._executors = ThreadPoolExecutor(max_workers=self._max_workers) self._commands_queue = Queue() - self._commands = dict() self._lock = RLock() self._current_workers = Semaphore(self._max_workers) - self._cmd_idx = 0 reactor_loop = _logger.context(host="TLKR-reactor-loop")(self._get_main_loop) self._main_loop = concurrent(func=reactor_loop, threadname="TLKR-reactor") @@ -86,39 +84,30 @@ def _send_data(self, items): self._current_workers.release() def send(self, cmd, *args, _async=False, _callback=None, _cmd_id=None, **kwargs): - with self._lock: - self._cmd_idx += 1 - cmd_idx = self._cmd_idx - event = None if cmd in self.ASYNC_COMMANDS else Event() item = self.CmdItem( - cmd_idx=cmd_idx, cmd_id=_cmd_id, cmd=cmd, + cmd_id=_cmd_id, cmd=cmd, args=args, kwargs=kwargs, callback=_callback, event=event, results=[]) - if cmd not in self.ASYNC_COMMANDS: - self._commands[cmd_idx] = item - if cmd not in self.BLOCKING_COMMANDS: self._log_cmd(cmd, _cmd_id) self._commands_queue.put(item) - if _async: - return cmd_idx - if event is not None: - return self.get_response(cmd_idx) - - def get_response(self, cmd_idx): - item = self._commands.pop(cmd_idx) + if event is not None: # not async + return self.get_response(item) + def get_response(self, item): def has_response(): _check_exiting() - if not item.event.wait(timeout=0.5): - raise NoResponseForRedisCommand(talker=self.talker, **item._asdict()) - return True + return item.event.wait(timeout=0.5) + + try: + wait(REDIS_SOCKET_TIMEOUT + MINUTE, has_response, message=False, progressbar=False, sleep=0) + except TimeoutException: + raise NoResponseForRedisCommand(talker=self.talker, **item._asdict()) - wait(REDIS_SOCKET_TIMEOUT + MINUTE, has_response, message=False, progressbar=False, sleep=0) [response] = item.results return response