From 654ba8a956722e387e0b867a5d2707bc95981f7e Mon Sep 17 00:00:00 2001 From: vladvildanov Date: Tue, 23 Dec 2025 13:56:43 +0200 Subject: [PATCH 1/4] Fixed potential race condition between call_later() and run_forever() --- redis/auth/token_manager.py | 53 +++++++++++++++++++------------------ 1 file changed, 27 insertions(+), 26 deletions(-) diff --git a/redis/auth/token_manager.py b/redis/auth/token_manager.py index dd8d16233d..a4474add7f 100644 --- a/redis/auth/token_manager.py +++ b/redis/auth/token_manager.py @@ -145,20 +145,36 @@ def start( except RuntimeError: # Run loop in a separate thread to unblock main thread. loop = asyncio.new_event_loop() - thread = threading.Thread( - target=_start_event_loop_in_thread, args=(loop,), daemon=True - ) + + # Use threading.Event to signal when loop is ready + loop_ready = threading.Event() + + def start_loop(): + asyncio.set_event_loop(loop) + loop_ready.set() # Signal that loop is ready + loop.run_forever() + + thread = threading.Thread(target=start_loop, daemon=True) thread.start() - # Event to block for initial execution. - init_event = asyncio.Event() - self._init_timer = loop.call_later( - 0, self._renew_token, skip_initial, init_event - ) + # Wait for the loop to be ready before scheduling + loop_ready.wait() + + # Use thread-safe Event for cross-thread synchronization + init_done = threading.Event() + + def renew_with_callback(): + try: + self._renew_token(skip_initial) + finally: + init_done.set() + + # Schedule using call_soon_threadsafe for thread-safe scheduling + loop.call_soon_threadsafe(renew_with_callback) logger.info("Token manager started") - # Blocks in thread-safe manner. - asyncio.run_coroutine_threadsafe(init_event.wait(), loop).result() + # Blocks using thread-safe Event + init_done.wait() return self.stop async def start_async( @@ -248,7 +264,7 @@ def _delay_for_ratio_refresh(self, expire_date: float, issue_date: float): ) def _renew_token( - self, skip_initial: bool = False, init_event: asyncio.Event = None + self, skip_initial: bool = False ): """ Task to renew token from identity provider. @@ -289,9 +305,6 @@ def _renew_token( raise e self._listener.on_error(e) - finally: - if init_event: - init_event.set() async def _renew_token_async( self, skip_initial: bool = False, init_event: asyncio.Event = None @@ -356,15 +369,3 @@ def wrapped(): asyncio.ensure_future(coro_func(*args, **kwargs), loop=loop) return wrapped - - -def _start_event_loop_in_thread(event_loop: asyncio.AbstractEventLoop): - """ - Starts event loop in a thread. - Used to be able to schedule tasks using loop.call_later. - - :param event_loop: - :return: - """ - asyncio.set_event_loop(event_loop) - event_loop.run_forever() From 9d6f4ec7b877c3a5e106ae4956a8f73371897464 Mon Sep 17 00:00:00 2001 From: vladvildanov Date: Tue, 23 Dec 2025 14:00:40 +0200 Subject: [PATCH 2/4] Codestyle changes --- redis/auth/token_manager.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/redis/auth/token_manager.py b/redis/auth/token_manager.py index a4474add7f..d79a86c451 100644 --- a/redis/auth/token_manager.py +++ b/redis/auth/token_manager.py @@ -263,9 +263,7 @@ def _delay_for_ratio_refresh(self, expire_date: float, issue_date: float): - (datetime.now(timezone.utc).timestamp() * 1000) ) - def _renew_token( - self, skip_initial: bool = False - ): + def _renew_token(self, skip_initial: bool = False): """ Task to renew token from identity provider. Schedules renewal tasks based on token TTL. From 6ac3639085c173770cce4c31946c20e194fb97d1 Mon Sep 17 00:00:00 2001 From: Vladyslav Vildanov <117659936+vladvildanov@users.noreply.github.com> Date: Tue, 23 Dec 2025 14:02:55 +0200 Subject: [PATCH 3/4] Update redis/auth/token_manager.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- redis/auth/token_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redis/auth/token_manager.py b/redis/auth/token_manager.py index d79a86c451..91651d6adc 100644 --- a/redis/auth/token_manager.py +++ b/redis/auth/token_manager.py @@ -170,7 +170,7 @@ def renew_with_callback(): init_done.set() # Schedule using call_soon_threadsafe for thread-safe scheduling - loop.call_soon_threadsafe(renew_with_callback) + self._init_timer = loop.call_soon_threadsafe(renew_with_callback) logger.info("Token manager started") # Blocks using thread-safe Event From ff4360963ddbde58bfce7fd1a4fccf8b8504b0cc Mon Sep 17 00:00:00 2001 From: Vladyslav Vildanov <117659936+vladvildanov@users.noreply.github.com> Date: Tue, 23 Dec 2025 14:03:49 +0200 Subject: [PATCH 4/4] Update redis/auth/token_manager.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- redis/auth/token_manager.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/redis/auth/token_manager.py b/redis/auth/token_manager.py index 91651d6adc..8b62e6f83f 100644 --- a/redis/auth/token_manager.py +++ b/redis/auth/token_manager.py @@ -150,8 +150,12 @@ def start( loop_ready = threading.Event() def start_loop(): + # This runs in the background thread. First, bind the event loop to + # this thread, then signal that the loop is ready so the calling + # thread can safely schedule work (via call_soon_threadsafe) before + # we block in run_forever(). asyncio.set_event_loop(loop) - loop_ready.set() # Signal that loop is ready + loop_ready.set() # Signal that loop is ready for cross-thread use loop.run_forever() thread = threading.Thread(target=start_loop, daemon=True)