Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions .github/ha-core-notes-169953.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# HA Core Notes: Axis Websocket SSL Regression (Issue #169953)

## Summary

Home Assistant Core 2026.5 enabled websocket event usage for Axis devices when supported.
In HTTPS setups that use self-signed or private-CA certificates, websocket startup could fail with certificate verification errors and repeatedly retry without falling back to RTSP event transport.

## Root Cause

The websocket transport created a separate aiohttp session for websocket connections.
That path did not reliably inherit SSL/certificate behavior from the configured Axis device session used by the rest of the integration.

## Changes in This Patch

1. Websocket now reuses the existing configured aiohttp session.
2. Runtime websocket connect failures are classified.
3. Certificate verification failures trigger runtime websocket disable and fallback to RTSP event transport (unless websocket is forced).
4. Websocket force mode remains authoritative and does not auto-downgrade.

## Expected Runtime Behavior

- If websocket startup succeeds, websocket event transport is used.
- If websocket startup fails due to SSL certificate verification and websocket is not forced, Axis falls back to RTSP event stream behavior for the remainder of the runtime.
- If websocket is forced, retries continue on websocket as configured.

## Integration Guidance for Home Assistant Core

1. Preserve current user-facing SSL semantics in config flows and options.
2. Consider exposing a repair/diagnostic message when fallback is caused by certificate verification failure.
3. Include host and reason in diagnostics to aid troubleshooting of private CA trust chains.
4. Keep websocket-force behavior opt-in and explicit.

## Validation

Targeted validation used for this patch:

- `uv run pytest tests/test_websocket.py tests/test_stream_manager.py`
- `uv run ruff check axis tests`
- `uv run ruff format --check axis tests`
26 changes: 26 additions & 0 deletions axis/stream_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def __init__(self, device: AxisDevice) -> None:
self.background_tasks: set[asyncio.Task[None]] = set()
self.retry_timer: asyncio.TimerHandle | None = None
self._starting = False
self._websocket_temporarily_disabled = False

@property
def stream_url(self) -> str:
Expand Down Expand Up @@ -84,13 +85,37 @@ def use_websocket(self) -> bool:
"""Use websocket transport when event websocket API is available."""
if not self.event:
return False
if (
self._websocket_temporarily_disabled
and not self.device.config.websocket_force
):
return False
if self.device.config.websocket_force:
return True
return (
self.device.config.websocket_enabled
and WebSocketClient.supported_by_device(self.device)
)

def _handle_websocket_failure(self) -> None:
"""Disable websocket for runtime when TLS certificate validation fails."""
if self.device.config.websocket_force:
return

if self.stream is None:
return

if not getattr(self.stream, "should_disable_runtime_websocket", False):
return

if not self._websocket_temporarily_disabled:
_LOGGER.warning(
"Disabling websocket events for %s until restart after certificate verification failure",
self.device.config.host,
)

self._websocket_temporarily_disabled = True

@property
def _is_stream_stopped(self) -> bool:
"""Return True when stream is missing or currently stopped."""
Expand Down Expand Up @@ -124,6 +149,7 @@ def session_callback(self, signal: Signal) -> None:
self.device.event.handler(self.data)

elif signal == Signal.FAILED:
self._handle_websocket_failure()
self.retry()

if signal in (Signal.PLAYING, Signal.FAILED):
Expand Down
80 changes: 69 additions & 11 deletions axis/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@

import asyncio
from collections import deque
import enum
import logging
import ssl
from time import time
from typing import TYPE_CHECKING, Any

Expand Down Expand Up @@ -48,6 +50,46 @@
BUFFER_SIZE = 200


class WebSocketFailureReason(enum.StrEnum):
"""Classified websocket startup failure reason."""

NONE = "none"
SSL_CERTIFICATE = "ssl_certificate"
OTHER = "other"


def _walk_exception_chain(err: BaseException) -> list[BaseException]:
"""Return exceptions in causal chain for robust error classification."""
chain: list[BaseException] = []
seen: set[int] = set()
current: BaseException | None = err

while current is not None and id(current) not in seen:
chain.append(current)
seen.add(id(current))
current = current.__cause__ or current.__context__

return chain


def _classify_connect_error(err: BaseException) -> WebSocketFailureReason:
"""Classify websocket connect failure for fallback decisions."""
for exc in _walk_exception_chain(err):
if isinstance(
exc,
(
ssl.SSLCertVerificationError,
aiohttp.ClientConnectorCertificateError,
),
):
return WebSocketFailureReason.SSL_CERTIFICATE

if "CERTIFICATE_VERIFY_FAILED" in str(exc):
return WebSocketFailureReason.SSL_CERTIFICATE

return WebSocketFailureReason.OTHER


def _parse_ws_notification(notification: dict[str, Any]) -> dict[str, Any]:
"""Parse a VAPIX events:notify notification into the internal event dict format.

Expand Down Expand Up @@ -135,12 +177,14 @@ def __init__(
self._data: deque[dict[str, Any]] = deque(maxlen=BUFFER_SIZE)

self._ws_session: aiohttp.ClientSession | None = None
self._owns_ws_session = False
self._ws: aiohttp.ClientWebSocketResponse | None = None
self._receiver_task: asyncio.Task[None] | None = None
self._close_task: asyncio.Task[None] | None = None
self._stopped = False
self._starting = False
self._start_time: float | None = None
self._last_failure_reason = WebSocketFailureReason.NONE

@classmethod
def supported_by_device(cls, device: AxisDevice) -> bool:
Expand All @@ -155,6 +199,11 @@ def data(self) -> dict[str, Any]:
except IndexError:
return {}

@property
def should_disable_runtime_websocket(self) -> bool:
"""Return true if websocket should be disabled for this runtime."""
return self._last_failure_reason == WebSocketFailureReason.SSL_CERTIFICATE

async def _get_session_token(self) -> str | None:
"""Obtain a short-lived session token for websocket authentication.

Expand All @@ -178,32 +227,39 @@ async def start(self) -> None:
self._stopped = False
self.session.state = State.STARTING
self._start_time = time()
self._last_failure_reason = WebSocketFailureReason.NONE

try:
if self._close_task is not None:
await asyncio.shield(self._close_task)

token = await self._get_session_token()
self._ws_session = self.device.config.session
self._owns_ws_session = False

ws_connect_kwargs: dict[str, Any] = {
"heartbeat": HEARTBEAT_INTERVAL,
"timeout": self._ws_timeout,
}
if not self.device.config.verify_ssl:
ws_connect_kwargs["ssl"] = False

if token:
connect_url = f"{self.url}&wssession={token}"
self._ws_session = aiohttp.ClientSession()
else:
# Fall back to HTTP Basic auth in the upgrade handshake.
connect_url = self.url
self._ws_session = aiohttp.ClientSession(
auth=aiohttp.BasicAuth(
self.device.config.username,
self.device.config.password,
),
ws_connect_kwargs["auth"] = aiohttp.BasicAuth(
self.device.config.username,
self.device.config.password,
)

self._ws = await self._ws_session.ws_connect(
connect_url,
heartbeat=HEARTBEAT_INTERVAL,
timeout=self._ws_timeout,
connect_url, **ws_connect_kwargs
)
except (aiohttp.ClientError, TimeoutError, OSError) as err:
_LOGGER.warning("Websocket connect failed: %s", err)
self._last_failure_reason = _classify_connect_error(err)
await self._close()
self.session.state = State.STOPPED
self._signal(Signal.FAILED)
Expand Down Expand Up @@ -345,9 +401,11 @@ async def _close(self) -> None:
await self._ws.close()
self._ws = None

if self._ws_session is not None:
if self._ws_session is not None and self._owns_ws_session:
await self._ws_session.close()
self._ws_session = None

self._ws_session = None
self._owns_ws_session = False

def _signal(self, signal: Signal) -> None:
"""Invoke the signal callback, swallowing any exceptions."""
Expand Down
75 changes: 75 additions & 0 deletions tests/test_stream_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from axis.models.api_discovery import ApiId
from axis.rtsp import Signal, State
from axis.stream_manager import RETRY_TIMER, StreamManager
from axis.websocket import WebSocketClient, WebSocketFailureReason

from .conftest import HOST
from .event_fixtures import AUDIO_INIT
Expand Down Expand Up @@ -262,3 +263,77 @@ async def test_retry_without_active_stream_does_not_call_stop(stream_manager):
existing_stream.stop.assert_not_called()
assert stream_manager.stream is None
mock_loop.call_later.assert_called_once_with(RETRY_TIMER, stream_manager.start)


async def test_failed_websocket_cert_error_disables_websocket_runtime(stream_manager):
"""Verify certificate failures disable websocket for runtime fallback."""
stream_manager.event = True
stream_manager.device.config.websocket_enabled = True
stream_manager.device.vapix.api_discovery._items[
ApiId.EVENT_STREAMING_OVER_WEBSOCKET
] = MagicMock()

ws_client = object.__new__(WebSocketClient)
ws_client._last_failure_reason = WebSocketFailureReason.SSL_CERTIFICATE
ws_client.session = SimpleNamespace(state=State.STOPPED)
stream_manager.stream = ws_client

mock_loop = MagicMock()
with patch("axis.stream_manager.asyncio.get_running_loop", return_value=mock_loop):
stream_manager.session_callback(Signal.FAILED)

assert stream_manager._websocket_temporarily_disabled is True
assert stream_manager.use_websocket is False


async def test_failed_websocket_cert_error_keeps_websocket_when_forced(stream_manager):
"""Verify forced websocket mode ignores runtime disable on cert failure."""
stream_manager.event = True
stream_manager.device.config.websocket_enabled = True
stream_manager.device.config.websocket_force = True

ws_client = object.__new__(WebSocketClient)
ws_client._last_failure_reason = WebSocketFailureReason.SSL_CERTIFICATE
ws_client.session = SimpleNamespace(state=State.STOPPED)
stream_manager.stream = ws_client

mock_loop = MagicMock()
with patch("axis.stream_manager.asyncio.get_running_loop", return_value=mock_loop):
stream_manager.session_callback(Signal.FAILED)

assert stream_manager._websocket_temporarily_disabled is False
assert stream_manager.use_websocket is True


async def test_failed_signal_without_stream_does_not_disable_websocket(stream_manager):
"""Verify missing stream branch does not toggle runtime websocket disable."""
stream_manager.stream = None

mock_loop = MagicMock()
with patch("axis.stream_manager.asyncio.get_running_loop", return_value=mock_loop):
stream_manager.session_callback(Signal.FAILED)

assert stream_manager._websocket_temporarily_disabled is False


async def test_failed_signal_without_ssl_reason_does_not_disable_websocket(
stream_manager,
):
"""Verify non-SSL websocket failures do not disable websocket runtime usage."""
stream_manager.event = True
stream_manager.device.config.websocket_enabled = True
stream_manager.device.vapix.api_discovery._items[
ApiId.EVENT_STREAMING_OVER_WEBSOCKET
] = MagicMock()

ws_client = object.__new__(WebSocketClient)
ws_client._last_failure_reason = WebSocketFailureReason.OTHER
ws_client.session = SimpleNamespace(state=State.STOPPED)
stream_manager.stream = ws_client

mock_loop = MagicMock()
with patch("axis.stream_manager.asyncio.get_running_loop", return_value=mock_loop):
stream_manager.session_callback(Signal.FAILED)

assert stream_manager._websocket_temporarily_disabled is False
assert stream_manager.use_websocket is True
Loading