From 39af8d14d8303f95c70c5c7eb32bb032fab90656 Mon Sep 17 00:00:00 2001 From: Kane610 Date: Thu, 7 May 2026 10:09:07 +0200 Subject: [PATCH 1/4] Improve websocket SSL handling and runtime fallback --- axis/stream_manager.py | 26 ++++++++++++++ axis/websocket.py | 80 ++++++++++++++++++++++++++++++++++++------ 2 files changed, 95 insertions(+), 11 deletions(-) diff --git a/axis/stream_manager.py b/axis/stream_manager.py index de5b4edf..5c329b69 100644 --- a/axis/stream_manager.py +++ b/axis/stream_manager.py @@ -37,6 +37,7 @@ def __init__(self, device: AxisDevice) -> None: self.background_tasks: set[asyncio.Task[None]] = set() self.retry_timer: asyncio.TimerHandle | None = None self._starting = False + self._websocket_temporarily_disabled = False @property def stream_url(self) -> str: @@ -84,6 +85,11 @@ def use_websocket(self) -> bool: """Use websocket transport when event websocket API is available.""" if not self.event: return False + if ( + self._websocket_temporarily_disabled + and not self.device.config.websocket_force + ): + return False if self.device.config.websocket_force: return True return ( @@ -91,6 +97,25 @@ def use_websocket(self) -> bool: and WebSocketClient.supported_by_device(self.device) ) + def _handle_websocket_failure(self) -> None: + """Disable websocket for runtime when TLS certificate validation fails.""" + if self.device.config.websocket_force: + return + + if self.stream is None: + return + + if not getattr(self.stream, "should_disable_runtime_websocket", False): + return + + if not self._websocket_temporarily_disabled: + _LOGGER.warning( + "Disabling websocket events for %s until restart after certificate verification failure", + self.device.config.host, + ) + + self._websocket_temporarily_disabled = True + @property def _is_stream_stopped(self) -> bool: """Return True when stream is missing or currently stopped.""" @@ -124,6 +149,7 @@ def session_callback(self, signal: Signal) -> None: self.device.event.handler(self.data) elif signal == Signal.FAILED: + self._handle_websocket_failure() self.retry() if signal in (Signal.PLAYING, Signal.FAILED): diff --git a/axis/websocket.py b/axis/websocket.py index 222efc44..f302908a 100644 --- a/axis/websocket.py +++ b/axis/websocket.py @@ -13,7 +13,9 @@ import asyncio from collections import deque +import enum import logging +import ssl from time import time from typing import TYPE_CHECKING, Any @@ -48,6 +50,46 @@ BUFFER_SIZE = 200 +class WebSocketFailureReason(enum.StrEnum): + """Classified websocket startup failure reason.""" + + NONE = "none" + SSL_CERTIFICATE = "ssl_certificate" + OTHER = "other" + + +def _walk_exception_chain(err: BaseException) -> list[BaseException]: + """Return exceptions in causal chain for robust error classification.""" + chain: list[BaseException] = [] + seen: set[int] = set() + current: BaseException | None = err + + while current is not None and id(current) not in seen: + chain.append(current) + seen.add(id(current)) + current = current.__cause__ or current.__context__ + + return chain + + +def _classify_connect_error(err: BaseException) -> WebSocketFailureReason: + """Classify websocket connect failure for fallback decisions.""" + for exc in _walk_exception_chain(err): + if isinstance( + exc, + ( + ssl.SSLCertVerificationError, + aiohttp.ClientConnectorCertificateError, + ), + ): + return WebSocketFailureReason.SSL_CERTIFICATE + + if "CERTIFICATE_VERIFY_FAILED" in str(exc): + return WebSocketFailureReason.SSL_CERTIFICATE + + return WebSocketFailureReason.OTHER + + def _parse_ws_notification(notification: dict[str, Any]) -> dict[str, Any]: """Parse a VAPIX events:notify notification into the internal event dict format. @@ -135,12 +177,14 @@ def __init__( self._data: deque[dict[str, Any]] = deque(maxlen=BUFFER_SIZE) self._ws_session: aiohttp.ClientSession | None = None + self._owns_ws_session = False self._ws: aiohttp.ClientWebSocketResponse | None = None self._receiver_task: asyncio.Task[None] | None = None self._close_task: asyncio.Task[None] | None = None self._stopped = False self._starting = False self._start_time: float | None = None + self._last_failure_reason = WebSocketFailureReason.NONE @classmethod def supported_by_device(cls, device: AxisDevice) -> bool: @@ -155,6 +199,11 @@ def data(self) -> dict[str, Any]: except IndexError: return {} + @property + def should_disable_runtime_websocket(self) -> bool: + """Return true if websocket should be disabled for this runtime.""" + return self._last_failure_reason == WebSocketFailureReason.SSL_CERTIFICATE + async def _get_session_token(self) -> str | None: """Obtain a short-lived session token for websocket authentication. @@ -178,32 +227,39 @@ async def start(self) -> None: self._stopped = False self.session.state = State.STARTING self._start_time = time() + self._last_failure_reason = WebSocketFailureReason.NONE try: if self._close_task is not None: await asyncio.shield(self._close_task) token = await self._get_session_token() + self._ws_session = self.device.config.session + self._owns_ws_session = False + + ws_connect_kwargs: dict[str, Any] = { + "heartbeat": HEARTBEAT_INTERVAL, + "timeout": self._ws_timeout, + } + if not self.device.config.verify_ssl: + ws_connect_kwargs["ssl"] = False + if token: connect_url = f"{self.url}&wssession={token}" - self._ws_session = aiohttp.ClientSession() else: # Fall back to HTTP Basic auth in the upgrade handshake. connect_url = self.url - self._ws_session = aiohttp.ClientSession( - auth=aiohttp.BasicAuth( - self.device.config.username, - self.device.config.password, - ), + ws_connect_kwargs["auth"] = aiohttp.BasicAuth( + self.device.config.username, + self.device.config.password, ) self._ws = await self._ws_session.ws_connect( - connect_url, - heartbeat=HEARTBEAT_INTERVAL, - timeout=self._ws_timeout, + connect_url, **ws_connect_kwargs ) except (aiohttp.ClientError, TimeoutError, OSError) as err: _LOGGER.warning("Websocket connect failed: %s", err) + self._last_failure_reason = _classify_connect_error(err) await self._close() self.session.state = State.STOPPED self._signal(Signal.FAILED) @@ -345,9 +401,11 @@ async def _close(self) -> None: await self._ws.close() self._ws = None - if self._ws_session is not None: + if self._ws_session is not None and self._owns_ws_session: await self._ws_session.close() - self._ws_session = None + + self._ws_session = None + self._owns_ws_session = False def _signal(self, signal: Signal) -> None: """Invoke the signal callback, swallowing any exceptions.""" From 7c66f7fbf797e3ca7fecaa38a3b9b1ba75b78278 Mon Sep 17 00:00:00 2001 From: Kane610 Date: Thu, 7 May 2026 10:09:27 +0200 Subject: [PATCH 2/4] Add websocket SSL fallback regression tests --- tests/test_stream_manager.py | 41 +++++++++++++++++++ tests/test_websocket.py | 78 ++++++++++++++++++++++-------------- 2 files changed, 89 insertions(+), 30 deletions(-) diff --git a/tests/test_stream_manager.py b/tests/test_stream_manager.py index 6e09b886..77ab6a9c 100644 --- a/tests/test_stream_manager.py +++ b/tests/test_stream_manager.py @@ -11,6 +11,7 @@ from axis.models.api_discovery import ApiId from axis.rtsp import Signal, State from axis.stream_manager import RETRY_TIMER, StreamManager +from axis.websocket import WebSocketClient, WebSocketFailureReason from .conftest import HOST from .event_fixtures import AUDIO_INIT @@ -262,3 +263,43 @@ async def test_retry_without_active_stream_does_not_call_stop(stream_manager): existing_stream.stop.assert_not_called() assert stream_manager.stream is None mock_loop.call_later.assert_called_once_with(RETRY_TIMER, stream_manager.start) + + +async def test_failed_websocket_cert_error_disables_websocket_runtime(stream_manager): + """Verify certificate failures disable websocket for runtime fallback.""" + stream_manager.event = True + stream_manager.device.config.websocket_enabled = True + stream_manager.device.vapix.api_discovery._items[ + ApiId.EVENT_STREAMING_OVER_WEBSOCKET + ] = MagicMock() + + ws_client = object.__new__(WebSocketClient) + ws_client._last_failure_reason = WebSocketFailureReason.SSL_CERTIFICATE + ws_client.session = SimpleNamespace(state=State.STOPPED) + stream_manager.stream = ws_client + + mock_loop = MagicMock() + with patch("axis.stream_manager.asyncio.get_running_loop", return_value=mock_loop): + stream_manager.session_callback(Signal.FAILED) + + assert stream_manager._websocket_temporarily_disabled is True + assert stream_manager.use_websocket is False + + +async def test_failed_websocket_cert_error_keeps_websocket_when_forced(stream_manager): + """Verify forced websocket mode ignores runtime disable on cert failure.""" + stream_manager.event = True + stream_manager.device.config.websocket_enabled = True + stream_manager.device.config.websocket_force = True + + ws_client = object.__new__(WebSocketClient) + ws_client._last_failure_reason = WebSocketFailureReason.SSL_CERTIFICATE + ws_client.session = SimpleNamespace(state=State.STOPPED) + stream_manager.stream = ws_client + + mock_loop = MagicMock() + with patch("axis.stream_manager.asyncio.get_running_loop", return_value=mock_loop): + stream_manager.session_callback(Signal.FAILED) + + assert stream_manager._websocket_temporarily_disabled is False + assert stream_manager.use_websocket is True diff --git a/tests/test_websocket.py b/tests/test_websocket.py index cbec5bae..7f5cae30 100644 --- a/tests/test_websocket.py +++ b/tests/test_websocket.py @@ -4,6 +4,7 @@ """ import asyncio +import ssl from types import SimpleNamespace from unittest.mock import ANY, AsyncMock, MagicMock, patch @@ -151,12 +152,10 @@ async def test_websocket_stream_receives_data(axis_device): ], ) - ws_session = AsyncMock() - ws_session.ws_connect.return_value = ws - axis_device.vapix.request = AsyncMock(return_value=b"token123") + ws_connect = AsyncMock(return_value=ws) - with patch("axis.websocket.aiohttp.ClientSession", return_value=ws_session): + with patch.object(axis_device.config.session, "ws_connect", ws_connect): client = WebSocketClient( axis_device, "ws://127.0.0.1:80/vapix/ws-data-stream?sources=events", @@ -178,9 +177,10 @@ async def test_websocket_stream_receives_data(axis_device): assert client.session.state == State.STOPPED ws.send_json.assert_called_once_with(client._configure_payload) - ws_session.ws_connect.assert_called_once_with( + ws_connect.assert_called_once_with( "ws://127.0.0.1:80/vapix/ws-data-stream?sources=events&wssession=token123", heartbeat=15, + ssl=False, timeout=ANY, ) @@ -189,12 +189,10 @@ async def test_websocket_configure_failure(axis_device): """Verify websocket client reports failed configure.""" callback = MagicMock() ws = MockWebSocket(_configure_error_msg(), []) - ws_session = AsyncMock() - ws_session.ws_connect.return_value = ws - axis_device.vapix.request = AsyncMock(return_value=b"token123") + ws_connect = AsyncMock(return_value=ws) - with patch("axis.websocket.aiohttp.ClientSession", return_value=ws_session): + with patch.object(axis_device.config.session, "ws_connect", ws_connect): client = WebSocketClient( axis_device, "ws://127.0.0.1:80/vapix/ws-data-stream?sources=events", @@ -214,11 +212,10 @@ async def test_websocket_binary_configure_frame_is_rejected(axis_device): [], ) - ws_session = AsyncMock() - ws_session.ws_connect.return_value = ws axis_device.vapix.request = AsyncMock(return_value=b"token123") + ws_connect = AsyncMock(return_value=ws) - with patch("axis.websocket.aiohttp.ClientSession", return_value=ws_session): + with patch.object(axis_device.config.session, "ws_connect", ws_connect): client = WebSocketClient( axis_device, "ws://127.0.0.1:80/vapix/ws-data-stream?sources=events", @@ -234,11 +231,10 @@ async def test_websocket_stop_is_idempotent(axis_device): """Verify stop() can be called repeatedly without failed callback.""" callback = MagicMock() ws = BlockingWebSocket(_configure_ok_msg()) - ws_session = AsyncMock() - ws_session.ws_connect.return_value = ws axis_device.vapix.request = AsyncMock(return_value=b"token123") + ws_connect = AsyncMock(return_value=ws) - with patch("axis.websocket.aiohttp.ClientSession", return_value=ws_session): + with patch.object(axis_device.config.session, "ws_connect", ws_connect): client = WebSocketClient( axis_device, "ws://127.0.0.1:80/vapix/ws-data-stream?sources=events", @@ -262,11 +258,10 @@ async def test_websocket_fallback_to_basic_auth_when_no_token(axis_device): _configure_ok_msg(), [SimpleNamespace(type=aiohttp.WSMsgType.CLOSED, data=None)], ) - ws_session = AsyncMock() - ws_session.ws_connect.return_value = ws axis_device.vapix.request = AsyncMock(side_effect=RuntimeError("no token")) + ws_connect = AsyncMock(return_value=ws) - with patch("axis.websocket.aiohttp.ClientSession", return_value=ws_session): + with patch.object(axis_device.config.session, "ws_connect", ws_connect): client = WebSocketClient( axis_device, "ws://127.0.0.1:80/vapix/ws-data-stream?sources=events", @@ -275,9 +270,14 @@ async def test_websocket_fallback_to_basic_auth_when_no_token(axis_device): await client.start() await client._receiver_task - ws_session.ws_connect.assert_called_once_with( + ws_connect.assert_called_once_with( "ws://127.0.0.1:80/vapix/ws-data-stream?sources=events", + auth=aiohttp.BasicAuth( + axis_device.config.username, + axis_device.config.password, + ), heartbeat=15, + ssl=False, timeout=ANY, ) @@ -303,10 +303,10 @@ async def test_websocket_supported_by_device_and_empty_data(axis_device): async def test_websocket_start_guard_returns_early(axis_device): """Verify start() short-circuits if already starting or non-stopped state.""" callback = MagicMock() - ws_session = AsyncMock() axis_device.vapix.request = AsyncMock(return_value=b"token123") + ws_connect = AsyncMock() - with patch("axis.websocket.aiohttp.ClientSession", return_value=ws_session): + with patch.object(axis_device.config.session, "ws_connect", ws_connect): client = WebSocketClient( axis_device, "ws://127.0.0.1:80/vapix/ws-data-stream?sources=events", @@ -321,7 +321,7 @@ async def test_websocket_start_guard_returns_early(axis_device): await client.start() axis_device.vapix.request.assert_not_called() - ws_session.ws_connect.assert_not_called() + ws_connect.assert_not_called() async def test_websocket_start_awaits_close_task(axis_device): @@ -331,11 +331,10 @@ async def test_websocket_start_awaits_close_task(axis_device): _configure_ok_msg(), [SimpleNamespace(type=aiohttp.WSMsgType.CLOSED, data=None)], ) - ws_session = AsyncMock() - ws_session.ws_connect.return_value = ws axis_device.vapix.request = AsyncMock(return_value=b"token123") + ws_connect = AsyncMock(return_value=ws) - with patch("axis.websocket.aiohttp.ClientSession", return_value=ws_session): + with patch.object(axis_device.config.session, "ws_connect", ws_connect): client = WebSocketClient( axis_device, "ws://127.0.0.1:80/vapix/ws-data-stream?sources=events", @@ -348,18 +347,16 @@ async def test_websocket_start_awaits_close_task(axis_device): await client._receiver_task assert close_task.done() - ws_session.ws_connect.assert_called_once() + ws_connect.assert_called_once() async def test_websocket_connect_failure(axis_device): """Verify websocket client reports failed connect errors.""" callback = MagicMock() - ws_session = AsyncMock() - ws_session.ws_connect.side_effect = aiohttp.ClientError("boom") - axis_device.vapix.request = AsyncMock(return_value=b"token123") + ws_connect = AsyncMock(side_effect=aiohttp.ClientError("boom")) - with patch("axis.websocket.aiohttp.ClientSession", return_value=ws_session): + with patch.object(axis_device.config.session, "ws_connect", ws_connect): client = WebSocketClient( axis_device, "ws://127.0.0.1:80/vapix/ws-data-stream?sources=events", @@ -373,6 +370,27 @@ async def test_websocket_connect_failure(axis_device): assert client._ws_session is None +async def test_websocket_connect_failure_sets_ssl_cert_reason(axis_device): + """Verify certificate verification failures are marked for websocket fallback.""" + callback = MagicMock() + cert_err = ssl.SSLCertVerificationError( + "[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed" + ) + ws_connect = AsyncMock(side_effect=cert_err) + axis_device.vapix.request = AsyncMock(return_value=b"token123") + + with patch.object(axis_device.config.session, "ws_connect", ws_connect): + client = WebSocketClient( + axis_device, + "wss://127.0.0.1:443/vapix/ws-data-stream?sources=events", + callback, + ) + await client.start() + + callback.assert_called_once_with(Signal.FAILED) + assert client.should_disable_runtime_websocket + + async def test_websocket_configure_with_no_ws(axis_device): """Verify configure returns cleanly when websocket is missing.""" client = WebSocketClient( From b588251766cb3a99df7bdff16d130034c818780d Mon Sep 17 00:00:00 2001 From: Kane610 Date: Thu, 7 May 2026 10:09:52 +0200 Subject: [PATCH 3/4] Add HA Core notes for Axis SSL websocket regression --- .github/ha-core-notes-169953.md | 39 +++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 .github/ha-core-notes-169953.md diff --git a/.github/ha-core-notes-169953.md b/.github/ha-core-notes-169953.md new file mode 100644 index 00000000..daf589f2 --- /dev/null +++ b/.github/ha-core-notes-169953.md @@ -0,0 +1,39 @@ +# HA Core Notes: Axis Websocket SSL Regression (Issue #169953) + +## Summary + +Home Assistant Core 2026.5 enabled websocket event usage for Axis devices when supported. +In HTTPS setups that use self-signed or private-CA certificates, websocket startup could fail with certificate verification errors and repeatedly retry without falling back to RTSP event transport. + +## Root Cause + +The websocket transport created a separate aiohttp session for websocket connections. +That path did not reliably inherit SSL/certificate behavior from the configured Axis device session used by the rest of the integration. + +## Changes in This Patch + +1. Websocket now reuses the existing configured aiohttp session. +2. Runtime websocket connect failures are classified. +3. Certificate verification failures trigger runtime websocket disable and fallback to RTSP event transport (unless websocket is forced). +4. Websocket force mode remains authoritative and does not auto-downgrade. + +## Expected Runtime Behavior + +- If websocket startup succeeds, websocket event transport is used. +- If websocket startup fails due to SSL certificate verification and websocket is not forced, Axis falls back to RTSP event stream behavior for the remainder of the runtime. +- If websocket is forced, retries continue on websocket as configured. + +## Integration Guidance for Home Assistant Core + +1. Preserve current user-facing SSL semantics in config flows and options. +2. Consider exposing a repair/diagnostic message when fallback is caused by certificate verification failure. +3. Include host and reason in diagnostics to aid troubleshooting of private CA trust chains. +4. Keep websocket-force behavior opt-in and explicit. + +## Validation + +Targeted validation used for this patch: + +- `uv run pytest tests/test_websocket.py tests/test_stream_manager.py` +- `uv run ruff check axis tests` +- `uv run ruff format --check axis tests` From 7954f07d520743da38bf835647429fa520805169 Mon Sep 17 00:00:00 2001 From: Kane610 Date: Thu, 7 May 2026 11:31:21 +0200 Subject: [PATCH 4/4] Increase coverage for websocket SSL fallback branches --- tests/test_stream_manager.py | 34 +++++++++++++++++++++++++ tests/test_websocket.py | 49 ++++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+) diff --git a/tests/test_stream_manager.py b/tests/test_stream_manager.py index 77ab6a9c..fb3e57bd 100644 --- a/tests/test_stream_manager.py +++ b/tests/test_stream_manager.py @@ -303,3 +303,37 @@ async def test_failed_websocket_cert_error_keeps_websocket_when_forced(stream_ma assert stream_manager._websocket_temporarily_disabled is False assert stream_manager.use_websocket is True + + +async def test_failed_signal_without_stream_does_not_disable_websocket(stream_manager): + """Verify missing stream branch does not toggle runtime websocket disable.""" + stream_manager.stream = None + + mock_loop = MagicMock() + with patch("axis.stream_manager.asyncio.get_running_loop", return_value=mock_loop): + stream_manager.session_callback(Signal.FAILED) + + assert stream_manager._websocket_temporarily_disabled is False + + +async def test_failed_signal_without_ssl_reason_does_not_disable_websocket( + stream_manager, +): + """Verify non-SSL websocket failures do not disable websocket runtime usage.""" + stream_manager.event = True + stream_manager.device.config.websocket_enabled = True + stream_manager.device.vapix.api_discovery._items[ + ApiId.EVENT_STREAMING_OVER_WEBSOCKET + ] = MagicMock() + + ws_client = object.__new__(WebSocketClient) + ws_client._last_failure_reason = WebSocketFailureReason.OTHER + ws_client.session = SimpleNamespace(state=State.STOPPED) + stream_manager.stream = ws_client + + mock_loop = MagicMock() + with patch("axis.stream_manager.asyncio.get_running_loop", return_value=mock_loop): + stream_manager.session_callback(Signal.FAILED) + + assert stream_manager._websocket_temporarily_disabled is False + assert stream_manager.use_websocket is True diff --git a/tests/test_websocket.py b/tests/test_websocket.py index 7f5cae30..b1b9acff 100644 --- a/tests/test_websocket.py +++ b/tests/test_websocket.py @@ -391,6 +391,28 @@ async def test_websocket_connect_failure_sets_ssl_cert_reason(axis_device): assert client.should_disable_runtime_websocket +async def test_websocket_connect_failure_sets_ssl_reason_from_error_string(axis_device): + """Verify fallback SSL classification also works from error text.""" + callback = MagicMock() + ws_connect = AsyncMock( + side_effect=OSError( + "[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed" + ) + ) + axis_device.vapix.request = AsyncMock(return_value=b"token123") + + with patch.object(axis_device.config.session, "ws_connect", ws_connect): + client = WebSocketClient( + axis_device, + "wss://127.0.0.1:443/vapix/ws-data-stream?sources=events", + callback, + ) + await client.start() + + callback.assert_called_once_with(Signal.FAILED) + assert client.should_disable_runtime_websocket + + async def test_websocket_configure_with_no_ws(axis_device): """Verify configure returns cleanly when websocket is missing.""" client = WebSocketClient( @@ -402,6 +424,17 @@ async def test_websocket_configure_with_no_ws(axis_device): await client._configure() +async def test_websocket_send_configure_payload_with_no_ws(axis_device): + """Verify direct configure send helper returns when websocket is missing.""" + client = WebSocketClient( + axis_device, + "ws://127.0.0.1:80/vapix/ws-data-stream?sources=events", + MagicMock(), + ) + client._ws = None + await client._send_configure_payload({}) + + async def test_websocket_configure_unexpected_message_type(axis_device): """Verify configure fails on non-text response.""" ws = AsyncMock() @@ -556,3 +589,19 @@ async def test_websocket_close_with_no_session(axis_device): assert client._ws is None assert client._ws_session is None + + +async def test_websocket_close_owned_session_is_closed(axis_device): + """Verify close() shuts down owned websocket sessions.""" + client = WebSocketClient( + axis_device, + "ws://127.0.0.1:80/vapix/ws-data-stream?sources=events", + MagicMock(), + ) + owned_session = AsyncMock() + client._ws_session = owned_session + client._owns_ws_session = True + + await client._close() + + owned_session.close.assert_awaited_once()