From 385e93c6fcedf1311576aafc19a6c22eca38f08f Mon Sep 17 00:00:00 2001 From: Joseph Dorn <79379455+quartzar@users.noreply.github.com> Date: Thu, 17 Apr 2025 16:13:11 +0100 Subject: [PATCH 1/3] refactor(deprecation): Replace deprecated datetime.datetime.utcnow() with datetime.UTC --- bokeh_django/consumers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bokeh_django/consumers.py b/bokeh_django/consumers.py index 4bbebaa..8d65f62 100644 --- a/bokeh_django/consumers.py +++ b/bokeh_django/consumers.py @@ -231,7 +231,7 @@ async def connect(self): self.close() raise RuntimeError("No token received in subprotocol header") - now = calendar.timegm(dt.datetime.utcnow().utctimetuple()) + now = calendar.timegm(dt.datetime.now(dt.UTC).utctimetuple()) payload = get_token_payload(token) if 'session_expiry' not in payload: self.close() From 963ba630d9d0567dab06e9c8653c4532c53c0516 Mon Sep 17 00:00:00 2001 From: Joseph Dorn <79379455+quartzar@users.noreply.github.com> Date: Thu, 17 Apr 2025 16:15:29 +0100 Subject: [PATCH 2/3] refactor: Await un-awaited coroutines --- bokeh_django/consumers.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/bokeh_django/consumers.py b/bokeh_django/consumers.py index 8d65f62..0e3c387 100644 --- a/bokeh_django/consumers.py +++ b/bokeh_django/consumers.py @@ -223,21 +223,21 @@ async def connect(self): subprotocols = self.scope["subprotocols"] if len(subprotocols) != 2 or subprotocols[0] != 'bokeh': - self.close() + await self.close() raise RuntimeError("Subprotocol header is not 'bokeh'") token = subprotocols[1] if token is None: - self.close() + await self.close() raise RuntimeError("No token received in subprotocol header") now = calendar.timegm(dt.datetime.now(dt.UTC).utctimetuple()) payload = get_token_payload(token) if 'session_expiry' not in payload: - self.close() + await self.close() raise RuntimeError("Session expiry has not been provided") elif now >= payload['session_expiry']: - self.close() + await self.close() raise RuntimeError("Token is expired.") elif not check_token_signature(token, signed=False, @@ -292,7 +292,7 @@ async def _async_open(self, token: str) -> None: except Exception as e: log.error("Could not create new server session, reason: %s", e) - self.close() + await self.close() raise e msg = self.connection.protocol.create('ACK') From 3eaa8435ed898a5f955ac5002652adcd79150f9e Mon Sep 17 00:00:00 2001 From: Joseph Dorn <79379455+quartzar@users.noreply.github.com> Date: Thu, 17 Apr 2025 16:34:45 +0100 Subject: [PATCH 3/3] fix(websockets): Explicitly initialise the io_loop if it doesn't exist (fixes #15) This fixes an issue in multi-worker environments where io_loop may be initialised on a worker other than the worker that originally handled the WebSocket connection --- bokeh_django/consumers.py | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/bokeh_django/consumers.py b/bokeh_django/consumers.py index 0e3c387..adf385d 100644 --- a/bokeh_django/consumers.py +++ b/bokeh_django/consumers.py @@ -214,8 +214,11 @@ def application_context(self) -> ApplicationContext: if self._application_context is None: self._application_context = self.scope["url_route"]["kwargs"]["app_context"] - if self._application_context.io_loop is None: - raise RuntimeError("io_loop should already been set") + # Explicitly set io_loop here (likely running in multi-worker environment) + if self._application_context._loop is None: + self._application_context._loop = IOLoop.current() + log.debug("io_loop has been re-set") + return self._application_context async def connect(self): @@ -252,7 +255,7 @@ def on_fully_opened(future): # this isn't really an error (unless we have a # bug), it just means a client disconnected # immediately, most likely. - log.debug("Failed to fully open connlocksection %r", e) + log.debug("Failed to fully open connection %r", e) future = self._async_open(token) @@ -263,7 +266,9 @@ def on_fully_opened(future): await self.accept("bokeh") async def disconnect(self, close_code): - self.connection.session.destroy() + if hasattr(self, "connection"): + self.connection.session.destroy() + await super().disconnect(close_code) async def receive(self, text_data) -> None: fragment = text_data @@ -277,8 +282,19 @@ async def receive(self, text_data) -> None: async def _async_open(self, token: str) -> None: try: session_id = get_session_id(token) - await self.application_context.create_session_if_needed(session_id, self.request, token) - session = self.application_context.get_session(session_id) + + # Ensure io_loop is set before creating session (likely running in multi-worker environment) + if self._application_context._loop is None: + self._application_context._loop = IOLoop.current() + log.debug("io_loop has been re-set") + + # Try to create or get session + try: + session = await self.application_context.create_session_if_needed(session_id, self.request, token) + + except Exception as e: + log.error("Error creating session: %s", e) + raise e protocol = Protocol() self.receiver = Receiver(protocol)