From 7e879e9106eb2c412dcb0884ed3a74cbbc8392a2 Mon Sep 17 00:00:00 2001 From: Kane610 Date: Fri, 1 May 2026 18:43:49 +0200 Subject: [PATCH 1/8] Add event-instance to initialized-event synthesis primitives --- axis/models/event.py | 29 +++++++++++++ axis/models/event_instance.py | 76 +++++++++++++++++++++++++++++++++-- 2 files changed, 101 insertions(+), 4 deletions(-) diff --git a/axis/models/event.py b/axis/models/event.py index e15bc829..cd011e60 100644 --- a/axis/models/event.py +++ b/axis/models/event.py @@ -91,6 +91,11 @@ def _missing_(cls, value: object) -> EventTopic: EventTopic.RELAY: "active", } +TOPIC_TO_INACTIVE_STATE = { + EventTopic.LIGHT_STATUS: "OFF", + EventTopic.RELAY: "inactive", +} + EVENT_OPERATION = "operation" EVENT_SOURCE = "source" EVENT_SOURCE_IDX = "source_idx" @@ -152,6 +157,30 @@ class Event: topic: str topic_base: EventTopic + @classmethod + def synthesize_initialized( + cls, + topic: str, + source: str = "", + source_idx: str = "", + value: str | None = None, + ) -> Self: + """Create an initialized event from normalized event-instance fields.""" + topic_base = EventTopic(topic) + state = value + if state in (None, ""): + state = TOPIC_TO_INACTIVE_STATE.get(topic_base, "0") + + return cls._decode_from_dict( + { + EVENT_OPERATION: EventOperation.INITIALIZED, + EVENT_TOPIC: topic, + EVENT_SOURCE: source, + EVENT_SOURCE_IDX: source_idx, + EVENT_VALUE: state, + } + ) + @classmethod def decode(cls, data: bytes | dict[str, Any]) -> Self: """Decode data to an event object.""" diff --git a/axis/models/event_instance.py b/axis/models/event_instance.py index 486132bb..cba6dded 100644 --- a/axis/models/event_instance.py +++ b/axis/models/event_instance.py @@ -6,7 +6,7 @@ import xmltodict from .api import ApiItem, ApiRequest, ApiResponse -from .event import traverse +from .event import Event, traverse EVENT_INSTANCE = ( "http://www.w3.org/2003/05/soap-envelope:Envelope", @@ -50,6 +50,59 @@ def get_events(data: dict[str, Any]) -> list[dict[str, Any]]: return events +def _as_simple_item_list( + data: object, +) -> list[dict[str, Any]]: + """Return a list representation for a simple-item payload.""" + if isinstance(data, list): + return data + if isinstance(data, dict): + return [data] + return [] + + +def _extract_source_values( + source: dict[str, Any] | list[dict[str, Any]], +) -> tuple[str, list[str]]: + """Extract the source name and source values. + + Keep behavior aligned with event stream parsing by selecting the first source item + when multiple source items exist. + """ + source_items = _as_simple_item_list(source) + if not source_items: + return "", [""] + + source_item = source_items[0] + source_name = str(source_item.get("@Name", "")) + values = source_item.get("Value", "") + if isinstance(values, list): + source_values = [str(value) for value in values] + return source_name, source_values or [""] + if values in (None, ""): + return source_name, [""] + return source_name, [str(values)] + + +def _extract_data_value(data: dict[str, Any] | list[dict[str, Any]]) -> str: + """Extract a representative state value from data definition. + + Prefer the "active" item when available to align with Event._decode_from_bytes(). + """ + data_items = _as_simple_item_list(data) + if not data_items: + return "" + + data_item = next( + (item for item in data_items if item.get("@Name", "") == "active"), + data_items[0], + ) + value = data_item.get("Value", "") + if isinstance(value, list): + return str(value[0]) if value else "" + return "" if value is None else str(value) + + @dataclass(frozen=True) class EventInstance(ApiItem): """Events are emitted when the Axis product detects an occurrence of some kind. @@ -108,7 +161,7 @@ class EventInstance(ApiItem): @classmethod def decode(cls, data: dict[str, Any]) -> Self: """Decode dict to class object.""" - message = data["data"]["MessageInstance"] + message = data["data"].get("MessageInstance", {}) return cls( id=data["topic"], topic=data["topic"], @@ -118,12 +171,27 @@ def decode(cls, data: dict[str, Any]) -> Self: is_available=data["data"]["@topic"] == "true", is_application_data=data["data"].get("@isApplicationData") == "true", name=data["data"].get("@NiceName", ""), - stateful=data["data"]["MessageInstance"].get("@isProperty") == "true", - stateless=data["data"]["MessageInstance"].get("@isProperty") != "true", + stateful=message.get("@isProperty") == "true", + stateless=message.get("@isProperty") != "true", source=message.get("SourceInstance", {}).get("SimpleItemInstance", {}), data=message.get("DataInstance", {}).get("SimpleItemInstance", {}), ) + def to_events(self) -> list[Event]: + """Synthesize initialized Event objects from event-instance schema data.""" + source_name, source_values = _extract_source_values(self.source) + state_value = _extract_data_value(self.data) + + return [ + Event.synthesize_initialized( + topic=self.topic, + source=source_name, + source_idx=source_value, + value=state_value, + ) + for source_value in source_values + ] + @dataclass class ListEventInstancesRequest(ApiRequest): From d634b335a5b6ad5157ebe0c0fbed97160ce94be3 Mon Sep 17 00:00:00 2001 From: Kane610 Date: Fri, 1 May 2026 18:43:59 +0200 Subject: [PATCH 2/8] Expose synthesized per-topic events from event instances --- axis/interfaces/event_instances.py | 15 ++++++++++++++- axis/interfaces/vapix.py | 3 +++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/axis/interfaces/event_instances.py b/axis/interfaces/event_instances.py index 36d0447b..6b292a77 100644 --- a/axis/interfaces/event_instances.py +++ b/axis/interfaces/event_instances.py @@ -1,13 +1,17 @@ """Event service and action service APIs available in Axis network device.""" -from typing import Any +from typing import TYPE_CHECKING, Any from ..models.event_instance import ( + EventInstance, ListEventInstancesRequest, ListEventInstancesResponse, ) from .api_handler import ApiHandler +if TYPE_CHECKING: + from ..models.event import Event + class EventInstanceHandler(ApiHandler[Any]): """Event instances for Axis devices.""" @@ -21,3 +25,12 @@ async def get_event_instances(self) -> dict[str, Any]: bytes_data = await self.vapix.api_request(ListEventInstancesRequest()) response = ListEventInstancesResponse.decode(bytes_data) return response.data + + def get_events_per_topic(self) -> dict[str, list[Event]]: + """Return synthesized initialized events grouped by event-instance topic.""" + grouped: dict[str, list[Event]] = {} + for item in self.values(): + if not isinstance(item, EventInstance): + continue + grouped[item.topic] = item.to_events() + return grouped diff --git a/axis/interfaces/vapix.py b/axis/interfaces/vapix.py index c30368df..16d77046 100644 --- a/axis/interfaces/vapix.py +++ b/axis/interfaces/vapix.py @@ -39,6 +39,7 @@ from ..device import AxisDevice from ..models.api import ApiRequest + from ..models.event import Event from ..models.stream_profile import StreamProfile LOGGER = logging.getLogger(__name__) @@ -94,6 +95,7 @@ def __init__(self, device: AxisDevice) -> None: self.vmd4 = Vmd4Handler(self) self.event_instances = EventInstanceHandler(self) + self.event_instance_events: dict[str, list[Event]] = {} @property def firmware_version(self) -> str: @@ -240,6 +242,7 @@ async def initialize_applications(self) -> None: async def initialize_event_instances(self) -> None: """Initialize event instances of what events are supported by the device.""" await self.event_instances.update() + self.event_instance_events = self.event_instances.get_events_per_topic() async def initialize_users(self) -> None: """Load device user data and initialize user management.""" From fb5b35568446b013a4383393bc530179ccafc393 Mon Sep 17 00:00:00 2001 From: Kane610 Date: Fri, 1 May 2026 18:44:18 +0200 Subject: [PATCH 3/8] Add parity tests for synthesized event-instance events --- tests/test_event_instances.py | 54 +++++++++++++++++++++++++++++++++++ tests/test_vapix.py | 2 ++ 2 files changed, 56 insertions(+) diff --git a/tests/test_event_instances.py b/tests/test_event_instances.py index 9379fcd1..f1fba11d 100644 --- a/tests/test_event_instances.py +++ b/tests/test_event_instances.py @@ -7,6 +7,7 @@ import pytest +from axis.models.event import Event from axis.models.event_instance import get_events from .event_fixtures import ( @@ -14,6 +15,9 @@ EVENT_INSTANCE_STORAGE_ALERT, EVENT_INSTANCE_VMD4_PROFILE1, EVENT_INSTANCES, + LIGHT_STATUS_INIT, + PIR_INIT, + VMD4_C1P1_INIT, ) if TYPE_CHECKING: @@ -269,3 +273,53 @@ async def test_single_event_instance( def test_get_events(input: dict, output: list): """Verify expected output of get_events.""" assert get_events(input) == output + + +@pytest.mark.parametrize( + "event_stream_bytes", + [PIR_INIT, LIGHT_STATUS_INIT, VMD4_C1P1_INIT], +) +async def test_event_instance_synthesized_event_matches_stream_content( + http_route_mock, + event_instances: EventInstanceHandler, + event_stream_bytes: bytes, +) -> None: + """Synthesize events from instances and verify stream-content parity fields.""" + http_route_mock.post("/vapix/services").respond( + text=EVENT_INSTANCES, + headers={"Content-Type": "application/soap+xml; charset=utf-8"}, + ) + + await event_instances.update() + + expected = Event.decode(event_stream_bytes) + per_topic = event_instances.get_events_per_topic() + actual = next( + event + for event in per_topic[expected.topic] + if event.source == expected.source and event.id == expected.id + ) + + assert actual.topic == expected.topic + assert actual.source == expected.source + assert actual.id == expected.id + assert actual.state == expected.state + assert actual.group == expected.group + + +async def test_event_instance_synthesizes_unknown_topics( + http_route_mock, event_instances +): + """Synthesis should include topics not represented in EventTopic enum.""" + http_route_mock.post("/vapix/services").respond( + text=EVENT_INSTANCES, + headers={"Content-Type": "application/soap+xml; charset=utf-8"}, + ) + + await event_instances.update() + + per_topic = event_instances.get_events_per_topic() + assert "tns1:Media/ProfileChanged" in per_topic + assert ( + per_topic["tns1:Media/ProfileChanged"][0].topic == "tns1:Media/ProfileChanged" + ) diff --git a/tests/test_vapix.py b/tests/test_vapix.py index aa920617..d1b9a840 100644 --- a/tests/test_vapix.py +++ b/tests/test_vapix.py @@ -528,6 +528,8 @@ async def test_initialize_event_instances(http_route_mock, vapix: Vapix): assert vapix.event_instances assert len(vapix.event_instances) == 44 + assert vapix.event_instance_events + assert "tns1:Device/tnsaxis:Sensor/PIR" in vapix.event_instance_events async def test_applications_dont_load_without_params(http_route_mock, vapix: Vapix): From 3c001e04cdbf0dd4a8477c0a719ba2d5a186d409 Mon Sep 17 00:00:00 2001 From: Kane610 Date: Fri, 1 May 2026 19:51:13 +0200 Subject: [PATCH 4/8] Refocus expected-event discovery on event instances --- axis/interfaces/event_instances.py | 24 +++++++++- axis/interfaces/vapix.py | 3 -- axis/models/event.py | 29 ------------ axis/models/event_instance.py | 53 ++++++++++++++++++--- tests/test_event_instances.py | 75 ++++++++++++++++++++++++++++-- tests/test_vapix.py | 3 +- 6 files changed, 141 insertions(+), 46 deletions(-) diff --git a/axis/interfaces/event_instances.py b/axis/interfaces/event_instances.py index 6b292a77..84de9712 100644 --- a/axis/interfaces/event_instances.py +++ b/axis/interfaces/event_instances.py @@ -4,10 +4,12 @@ from ..models.event_instance import ( EventInstance, + EventProtocol, ListEventInstancesRequest, ListEventInstancesResponse, ) from .api_handler import ApiHandler +from .event_manager import BLACK_LISTED_TOPICS if TYPE_CHECKING: from ..models.event import Event @@ -26,11 +28,29 @@ async def get_event_instances(self) -> dict[str, Any]: response = ListEventInstancesResponse.decode(bytes_data) return response.data - def get_events_per_topic(self) -> dict[str, list[Event]]: - """Return synthesized initialized events grouped by event-instance topic.""" + def get_expected_events_per_topic( + self, + protocol: EventProtocol | str | None = None, + include_internal_topics: bool = False, + ) -> dict[str, list[Event]]: + """Return normalized expected events grouped by topic. + + Event instances are the protocol-agnostic bootstrap source for expected events. + MQTT does not advertise topics during startup, so this API provides a unified + expectation surface across metadata stream, websocket, and MQTT consumers. + """ + if protocol is not None: + EventProtocol(protocol) + grouped: dict[str, list[Event]] = {} for item in self.values(): if not isinstance(item, EventInstance): continue + if not include_internal_topics and item.topic in BLACK_LISTED_TOPICS: + continue grouped[item.topic] = item.to_events() return grouped + + def get_events_per_topic(self) -> dict[str, list[Event]]: + """Backward-compatible alias for expected events grouped by topic.""" + return self.get_expected_events_per_topic() diff --git a/axis/interfaces/vapix.py b/axis/interfaces/vapix.py index 16d77046..c30368df 100644 --- a/axis/interfaces/vapix.py +++ b/axis/interfaces/vapix.py @@ -39,7 +39,6 @@ from ..device import AxisDevice from ..models.api import ApiRequest - from ..models.event import Event from ..models.stream_profile import StreamProfile LOGGER = logging.getLogger(__name__) @@ -95,7 +94,6 @@ def __init__(self, device: AxisDevice) -> None: self.vmd4 = Vmd4Handler(self) self.event_instances = EventInstanceHandler(self) - self.event_instance_events: dict[str, list[Event]] = {} @property def firmware_version(self) -> str: @@ -242,7 +240,6 @@ async def initialize_applications(self) -> None: async def initialize_event_instances(self) -> None: """Initialize event instances of what events are supported by the device.""" await self.event_instances.update() - self.event_instance_events = self.event_instances.get_events_per_topic() async def initialize_users(self) -> None: """Load device user data and initialize user management.""" diff --git a/axis/models/event.py b/axis/models/event.py index cd011e60..e15bc829 100644 --- a/axis/models/event.py +++ b/axis/models/event.py @@ -91,11 +91,6 @@ def _missing_(cls, value: object) -> EventTopic: EventTopic.RELAY: "active", } -TOPIC_TO_INACTIVE_STATE = { - EventTopic.LIGHT_STATUS: "OFF", - EventTopic.RELAY: "inactive", -} - EVENT_OPERATION = "operation" EVENT_SOURCE = "source" EVENT_SOURCE_IDX = "source_idx" @@ -157,30 +152,6 @@ class Event: topic: str topic_base: EventTopic - @classmethod - def synthesize_initialized( - cls, - topic: str, - source: str = "", - source_idx: str = "", - value: str | None = None, - ) -> Self: - """Create an initialized event from normalized event-instance fields.""" - topic_base = EventTopic(topic) - state = value - if state in (None, ""): - state = TOPIC_TO_INACTIVE_STATE.get(topic_base, "0") - - return cls._decode_from_dict( - { - EVENT_OPERATION: EventOperation.INITIALIZED, - EVENT_TOPIC: topic, - EVENT_SOURCE: source, - EVENT_SOURCE_IDX: source_idx, - EVENT_VALUE: state, - } - ) - @classmethod def decode(cls, data: bytes | dict[str, Any]) -> Self: """Decode data to an event object.""" diff --git a/axis/models/event_instance.py b/axis/models/event_instance.py index cba6dded..19341706 100644 --- a/axis/models/event_instance.py +++ b/axis/models/event_instance.py @@ -1,12 +1,23 @@ """Event service and action service APIs available in Axis network device.""" from dataclasses import dataclass +import enum from typing import Any, Self import xmltodict from .api import ApiItem, ApiRequest, ApiResponse -from .event import Event, traverse +from .event import ( + EVENT_OPERATION, + EVENT_SOURCE, + EVENT_SOURCE_IDX, + EVENT_TOPIC, + EVENT_VALUE, + Event, + EventOperation, + EventTopic, + traverse, +) EVENT_INSTANCE = ( "http://www.w3.org/2003/05/soap-envelope:Envelope", @@ -103,6 +114,25 @@ def _extract_data_value(data: dict[str, Any] | list[dict[str, Any]]) -> str: return "" if value is None else str(value) +TOPIC_TO_INACTIVE_STATE = { + EventTopic.LIGHT_STATUS.value: "OFF", + EventTopic.RELAY.value: "inactive", +} + + +class EventProtocol(enum.StrEnum): + """Protocols that consume normalized expected events.""" + + METADATA_STREAM = "metadata_stream" + WEBSOCKET = "websocket" + MQTT = "mqtt" + + +def _default_inactive_state(topic: str) -> str: + """Return a default inactive state for expected event synthesis.""" + return TOPIC_TO_INACTIVE_STATE.get(topic, "0") + + @dataclass(frozen=True) class EventInstance(ApiItem): """Events are emitted when the Axis product detects an occurrence of some kind. @@ -178,16 +208,25 @@ def decode(cls, data: dict[str, Any]) -> Self: ) def to_events(self) -> list[Event]: - """Synthesize initialized Event objects from event-instance schema data.""" + """Synthesize normalized expected events from event-instance schema data. + + Topics are preserved exactly as they are declared by event instances so topic + representation stays identical to emitted event topics. + """ source_name, source_values = _extract_source_values(self.source) state_value = _extract_data_value(self.data) + if state_value == "": + state_value = _default_inactive_state(self.topic) return [ - Event.synthesize_initialized( - topic=self.topic, - source=source_name, - source_idx=source_value, - value=state_value, + Event.decode( + { + EVENT_OPERATION: EventOperation.INITIALIZED, + EVENT_TOPIC: self.topic, + EVENT_SOURCE: source_name, + EVENT_SOURCE_IDX: source_value, + EVENT_VALUE: state_value, + } ) for source_value in source_values ] diff --git a/tests/test_event_instances.py b/tests/test_event_instances.py index f1fba11d..f82ef28a 100644 --- a/tests/test_event_instances.py +++ b/tests/test_event_instances.py @@ -8,7 +8,7 @@ import pytest from axis.models.event import Event -from axis.models.event_instance import get_events +from axis.models.event_instance import EventInstance, EventProtocol, get_events from .event_fixtures import ( EVENT_INSTANCE_PIR_SENSOR, @@ -293,7 +293,7 @@ async def test_event_instance_synthesized_event_matches_stream_content( await event_instances.update() expected = Event.decode(event_stream_bytes) - per_topic = event_instances.get_events_per_topic() + per_topic = event_instances.get_expected_events_per_topic() actual = next( event for event in per_topic[expected.topic] @@ -318,8 +318,77 @@ async def test_event_instance_synthesizes_unknown_topics( await event_instances.update() - per_topic = event_instances.get_events_per_topic() + per_topic = event_instances.get_expected_events_per_topic() assert "tns1:Media/ProfileChanged" in per_topic assert ( per_topic["tns1:Media/ProfileChanged"][0].topic == "tns1:Media/ProfileChanged" ) + + +async def test_expected_events_protocol_normalization(http_route_mock, event_instances): + """All protocol calls should expose the same expected-event topic set.""" + http_route_mock.post("/vapix/services").respond( + text=EVENT_INSTANCES, + headers={"Content-Type": "application/soap+xml; charset=utf-8"}, + ) + + await event_instances.update() + + metadata_topics = set( + event_instances.get_expected_events_per_topic(EventProtocol.METADATA_STREAM) + ) + websocket_topics = set( + event_instances.get_expected_events_per_topic(EventProtocol.WEBSOCKET) + ) + mqtt_topics = set(event_instances.get_expected_events_per_topic(EventProtocol.MQTT)) + + assert metadata_topics == websocket_topics + assert websocket_topics == mqtt_topics + + +def test_expected_events_invalid_protocol(event_instances): + """Invalid protocol value should fail fast.""" + with pytest.raises(ValueError, match="EventProtocol"): + event_instances.get_expected_events_per_topic("invalid") + + +def test_expected_events_internal_topic_filtering(event_instances): + """Internal-only topics are excluded by default and available on request.""" + internal_topic = "tnsaxis:CameraApplicationPlatform/VMD/xinternal_data" + normal_topic = "tns1:Device/tnsaxis:Sensor/PIR" + + event_instances._items = { + internal_topic: EventInstance( + id=internal_topic, + topic=internal_topic, + topic_filter="axis:CameraApplicationPlatform/VMD/xinternal_data", + is_available=True, + is_application_data=False, + name="internal", + stateful=True, + stateless=False, + source={}, + data={}, + ), + normal_topic: EventInstance( + id=normal_topic, + topic=normal_topic, + topic_filter="onvif:Device/axis:Sensor/PIR", + is_available=True, + is_application_data=False, + name="pir", + stateful=True, + stateless=False, + source={}, + data={}, + ), + } + + filtered = event_instances.get_expected_events_per_topic() + unfiltered = event_instances.get_expected_events_per_topic( + include_internal_topics=True + ) + + assert internal_topic not in filtered + assert internal_topic in unfiltered + assert normal_topic in filtered diff --git a/tests/test_vapix.py b/tests/test_vapix.py index d1b9a840..887ffffa 100644 --- a/tests/test_vapix.py +++ b/tests/test_vapix.py @@ -528,8 +528,7 @@ async def test_initialize_event_instances(http_route_mock, vapix: Vapix): assert vapix.event_instances assert len(vapix.event_instances) == 44 - assert vapix.event_instance_events - assert "tns1:Device/tnsaxis:Sensor/PIR" in vapix.event_instance_events + assert "tns1:Device/tnsaxis:Sensor/PIR" in vapix.event_instances async def test_applications_dont_load_without_params(http_route_mock, vapix: Vapix): From b05ae4f355d4868304514da704ee3cfd7fd00604 Mon Sep 17 00:00:00 2001 From: Kane610 Date: Fri, 1 May 2026 19:59:03 +0200 Subject: [PATCH 5/8] Add dedicated websocket and MQTT parity test suite --- tests/test_event_instances_protocol_parity.py | 120 ++++++++++++++++++ 1 file changed, 120 insertions(+) create mode 100644 tests/test_event_instances_protocol_parity.py diff --git a/tests/test_event_instances_protocol_parity.py b/tests/test_event_instances_protocol_parity.py new file mode 100644 index 00000000..f7055dc5 --- /dev/null +++ b/tests/test_event_instances_protocol_parity.py @@ -0,0 +1,120 @@ +"""Protocol parity tests for event-instance expected-event discovery. + +pytest --cov-report term-missing --cov=axis.event_instances tests/test_event_instances_protocol_parity.py +""" + +from typing import TYPE_CHECKING + +import orjson +import pytest + +from axis.interfaces.mqtt import mqtt_json_to_event +from axis.models.event import Event +from axis.models.event_instance import EventProtocol +from axis.websocket import _parse_ws_notification + +from .event_fixtures import EVENT_INSTANCES, LIGHT_STATUS_INIT, PIR_INIT, VMD4_C1P1_INIT + +if TYPE_CHECKING: + from axis.interfaces.event_instances import EventInstanceHandler + + +@pytest.fixture +def event_instances(axis_device) -> EventInstanceHandler: + """Return event_instances handler from mocked device.""" + return axis_device.vapix.event_instances + + +def _mqtt_topic(topic: str) -> str: + """Convert internal topic format to MQTT topic namespace format.""" + return topic.replace("tns1", "onvif").replace("tnsaxis", "axis") + + +def _event_identity(event: Event) -> tuple[str, str, str, str, str]: + """Return identity fields used for cross-protocol parity assertions.""" + return (event.topic, event.source, event.id, event.state, event.group.value) + + +@pytest.mark.parametrize( + "event_stream_bytes", + [PIR_INIT, LIGHT_STATUS_INIT, VMD4_C1P1_INIT], +) +async def test_expected_events_match_websocket_shape( + http_route_mock, + event_instances: EventInstanceHandler, + event_stream_bytes: bytes, +) -> None: + """Websocket notify payloads should align with expected-event identities.""" + http_route_mock.post("/vapix/services").respond( + text=EVENT_INSTANCES, + headers={"Content-Type": "application/soap+xml; charset=utf-8"}, + ) + + await event_instances.update() + + expected = Event.decode(event_stream_bytes) + notification = { + "topic": expected.topic, + "message": { + "source": ( + {expected.source: expected.id} + if expected.source and expected.id != "" + else ({expected.source: expected.id} if expected.source else {}) + ), + "key": {}, + "data": {expected.data.get("type", "state"): expected.state}, + }, + } + + parsed = Event.decode(_parse_ws_notification(notification)) + expected_events = event_instances.get_expected_events_per_topic( + protocol=EventProtocol.WEBSOCKET + ) + + expected_identities = { + _event_identity(event) for event in expected_events[expected.topic] + } + assert _event_identity(parsed) in expected_identities + + +@pytest.mark.parametrize( + "event_stream_bytes", + [PIR_INIT, LIGHT_STATUS_INIT, VMD4_C1P1_INIT], +) +async def test_expected_events_match_mqtt_shape( + http_route_mock, + event_instances: EventInstanceHandler, + event_stream_bytes: bytes, +) -> None: + """MQTT notify payloads should align with expected-event identities.""" + http_route_mock.post("/vapix/services").respond( + text=EVENT_INSTANCES, + headers={"Content-Type": "application/soap+xml; charset=utf-8"}, + ) + + await event_instances.update() + + expected = Event.decode(event_stream_bytes) + mqtt_msg = { + "timestamp": 0, + "topic": _mqtt_topic(expected.topic), + "message": { + "source": ( + {expected.source: expected.id} + if expected.source and expected.id != "" + else ({expected.source: expected.id} if expected.source else {}) + ), + "key": {}, + "data": {expected.data.get("type", "state"): expected.state}, + }, + } + + parsed = Event.decode(mqtt_json_to_event(orjson.dumps(mqtt_msg))) + expected_events = event_instances.get_expected_events_per_topic( + protocol=EventProtocol.MQTT + ) + + expected_identities = { + _event_identity(event) for event in expected_events[expected.topic] + } + assert _event_identity(parsed) in expected_identities From 26e05a4480f8acf06cf5e9fbdc6b52949b3d58f9 Mon Sep 17 00:00:00 2001 From: Kane610 Date: Sun, 3 May 2026 20:17:59 +0200 Subject: [PATCH 6/8] Handle empty event instance message nodes safely --- axis/models/event_instance.py | 23 ++++++--- tests/test_event_instances.py | 88 +++++++++++++++++++++++++++++++++++ 2 files changed, 105 insertions(+), 6 deletions(-) diff --git a/axis/models/event_instance.py b/axis/models/event_instance.py index 19341706..f389ba4d 100644 --- a/axis/models/event_instance.py +++ b/axis/models/event_instance.py @@ -72,6 +72,13 @@ def _as_simple_item_list( return [] +def _as_dict(data: object) -> dict[str, Any]: + """Return dict for mapping-like payloads and normalize other values to empty.""" + if isinstance(data, dict): + return data + return {} + + def _extract_source_values( source: dict[str, Any] | list[dict[str, Any]], ) -> tuple[str, list[str]]: @@ -191,20 +198,24 @@ class EventInstance(ApiItem): @classmethod def decode(cls, data: dict[str, Any]) -> Self: """Decode dict to class object.""" - message = data["data"].get("MessageInstance", {}) + event_data = _as_dict(data.get("data")) + message = _as_dict(event_data.get("MessageInstance")) + source_instance = _as_dict(message.get("SourceInstance")) + data_instance = _as_dict(message.get("DataInstance")) + return cls( id=data["topic"], topic=data["topic"], topic_filter=data["topic"] .replace("tns1", "onvif") .replace("tnsaxis", "axis"), - is_available=data["data"]["@topic"] == "true", - is_application_data=data["data"].get("@isApplicationData") == "true", - name=data["data"].get("@NiceName", ""), + is_available=event_data.get("@topic") == "true", + is_application_data=event_data.get("@isApplicationData") == "true", + name=event_data.get("@NiceName", ""), stateful=message.get("@isProperty") == "true", stateless=message.get("@isProperty") != "true", - source=message.get("SourceInstance", {}).get("SimpleItemInstance", {}), - data=message.get("DataInstance", {}).get("SimpleItemInstance", {}), + source=source_instance.get("SimpleItemInstance", {}), + data=data_instance.get("SimpleItemInstance", {}), ) def to_events(self) -> list[Event]: diff --git a/tests/test_event_instances.py b/tests/test_event_instances.py index f82ef28a..cfd255bf 100644 --- a/tests/test_event_instances.py +++ b/tests/test_event_instances.py @@ -392,3 +392,91 @@ def test_expected_events_internal_topic_filtering(event_instances): assert internal_topic not in filtered assert internal_topic in unfiltered assert normal_topic in filtered + + +@pytest.mark.parametrize( + "raw_event", + [ + { + "topic": "tns1:Configuration/tnsaxis:Intercom/Changed", + "data": { + "@topic": "true", + "@NiceName": "Intercom Configuration changed", + "MessageInstance": None, + }, + }, + { + "topic": "tns1:Device/Trigger/Relay", + "data": { + "@topic": "true", + "MessageInstance": { + "@isProperty": "true", + "SourceInstance": None, + "DataInstance": None, + }, + }, + }, + { + "topic": "tns1:Device/Trigger/Relay", + "data": { + "@topic": "true", + "MessageInstance": { + "@isProperty": "true", + "SourceInstance": { + "SimpleItemInstance": { + "@Name": "RelayToken", + "Value": "3", + } + }, + "DataInstance": None, + }, + }, + }, + ], +) +def test_event_instance_decode_handles_none_shapes(raw_event: dict) -> None: + """EventInstance.decode should normalize None-shaped nested objects safely.""" + event = EventInstance.decode(raw_event) + + assert event.topic == raw_event["topic"] + assert isinstance(event.source, (dict, list)) + assert isinstance(event.data, (dict, list)) + + +async def test_event_instances_empty_message_instance_xml( + http_route_mock, event_instances +): + """Empty MessageInstance XML nodes should not crash event instance parsing.""" + response = """ + + + + + + + + + + + + + + + +""" + http_route_mock.post("/vapix/services").respond( + text=response, + headers={"Content-Type": "application/soap+xml; charset=utf-8"}, + ) + + await event_instances.update() + + topic = "tns1:Configuration/tnsaxis:Intercom/Changed" + assert topic in event_instances + event = event_instances[topic] + assert event.source == {} + assert event.data == {} From deffe2a4cb15d87bd52978111d50db8345553e83 Mon Sep 17 00:00:00 2001 From: Kane610 Date: Sun, 3 May 2026 21:17:18 +0200 Subject: [PATCH 7/8] Consolidate event instance expected-events API --- axis/interfaces/event_instances.py | 17 ++++---------- tests/test_event_instances.py | 22 +++++-------------- tests/test_event_instances_protocol_parity.py | 9 ++------ 3 files changed, 11 insertions(+), 37 deletions(-) diff --git a/axis/interfaces/event_instances.py b/axis/interfaces/event_instances.py index 84de9712..5ab7cba5 100644 --- a/axis/interfaces/event_instances.py +++ b/axis/interfaces/event_instances.py @@ -4,7 +4,6 @@ from ..models.event_instance import ( EventInstance, - EventProtocol, ListEventInstancesRequest, ListEventInstancesResponse, ) @@ -30,18 +29,14 @@ async def get_event_instances(self) -> dict[str, Any]: def get_expected_events_per_topic( self, - protocol: EventProtocol | str | None = None, include_internal_topics: bool = False, ) -> dict[str, list[Event]]: - """Return normalized expected events grouped by topic. + """Return expected startup events grouped by topic. - Event instances are the protocol-agnostic bootstrap source for expected events. - MQTT does not advertise topics during startup, so this API provides a unified - expectation surface across metadata stream, websocket, and MQTT consumers. + Event instances are the protocol-agnostic bootstrap source for startup + predeclaration. Returned events are synthesized from schema data and represent + expected event identity/state (operation=Initialized), not live stream updates. """ - if protocol is not None: - EventProtocol(protocol) - grouped: dict[str, list[Event]] = {} for item in self.values(): if not isinstance(item, EventInstance): @@ -50,7 +45,3 @@ def get_expected_events_per_topic( continue grouped[item.topic] = item.to_events() return grouped - - def get_events_per_topic(self) -> dict[str, list[Event]]: - """Backward-compatible alias for expected events grouped by topic.""" - return self.get_expected_events_per_topic() diff --git a/tests/test_event_instances.py b/tests/test_event_instances.py index cfd255bf..16490d6a 100644 --- a/tests/test_event_instances.py +++ b/tests/test_event_instances.py @@ -8,7 +8,7 @@ import pytest from axis.models.event import Event -from axis.models.event_instance import EventInstance, EventProtocol, get_events +from axis.models.event_instance import EventInstance, get_events from .event_fixtures import ( EVENT_INSTANCE_PIR_SENSOR, @@ -326,7 +326,7 @@ async def test_event_instance_synthesizes_unknown_topics( async def test_expected_events_protocol_normalization(http_route_mock, event_instances): - """All protocol calls should expose the same expected-event topic set.""" + """Expected-event discovery is protocol-agnostic and deterministic.""" http_route_mock.post("/vapix/services").respond( text=EVENT_INSTANCES, headers={"Content-Type": "application/soap+xml; charset=utf-8"}, @@ -334,22 +334,10 @@ async def test_expected_events_protocol_normalization(http_route_mock, event_ins await event_instances.update() - metadata_topics = set( - event_instances.get_expected_events_per_topic(EventProtocol.METADATA_STREAM) - ) - websocket_topics = set( - event_instances.get_expected_events_per_topic(EventProtocol.WEBSOCKET) - ) - mqtt_topics = set(event_instances.get_expected_events_per_topic(EventProtocol.MQTT)) - - assert metadata_topics == websocket_topics - assert websocket_topics == mqtt_topics - + topics_first = set(event_instances.get_expected_events_per_topic()) + topics_second = set(event_instances.get_expected_events_per_topic()) -def test_expected_events_invalid_protocol(event_instances): - """Invalid protocol value should fail fast.""" - with pytest.raises(ValueError, match="EventProtocol"): - event_instances.get_expected_events_per_topic("invalid") + assert topics_first == topics_second def test_expected_events_internal_topic_filtering(event_instances): diff --git a/tests/test_event_instances_protocol_parity.py b/tests/test_event_instances_protocol_parity.py index f7055dc5..3fa1ad87 100644 --- a/tests/test_event_instances_protocol_parity.py +++ b/tests/test_event_instances_protocol_parity.py @@ -10,7 +10,6 @@ from axis.interfaces.mqtt import mqtt_json_to_event from axis.models.event import Event -from axis.models.event_instance import EventProtocol from axis.websocket import _parse_ws_notification from .event_fixtures import EVENT_INSTANCES, LIGHT_STATUS_INIT, PIR_INIT, VMD4_C1P1_INIT @@ -67,9 +66,7 @@ async def test_expected_events_match_websocket_shape( } parsed = Event.decode(_parse_ws_notification(notification)) - expected_events = event_instances.get_expected_events_per_topic( - protocol=EventProtocol.WEBSOCKET - ) + expected_events = event_instances.get_expected_events_per_topic() expected_identities = { _event_identity(event) for event in expected_events[expected.topic] @@ -110,9 +107,7 @@ async def test_expected_events_match_mqtt_shape( } parsed = Event.decode(mqtt_json_to_event(orjson.dumps(mqtt_msg))) - expected_events = event_instances.get_expected_events_per_topic( - protocol=EventProtocol.MQTT - ) + expected_events = event_instances.get_expected_events_per_topic() expected_identities = { _event_identity(event) for event in expected_events[expected.topic] From 5f9bc8b8bb705c6e1908a45595dd8c159578a0e3 Mon Sep 17 00:00:00 2001 From: Kane610 Date: Sun, 3 May 2026 21:28:58 +0200 Subject: [PATCH 8/8] Type EventInstanceHandler with EventInstance items --- axis/interfaces/event_instances.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/axis/interfaces/event_instances.py b/axis/interfaces/event_instances.py index 5ab7cba5..142e88a6 100644 --- a/axis/interfaces/event_instances.py +++ b/axis/interfaces/event_instances.py @@ -1,6 +1,6 @@ """Event service and action service APIs available in Axis network device.""" -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING from ..models.event_instance import ( EventInstance, @@ -14,14 +14,14 @@ from ..models.event import Event -class EventInstanceHandler(ApiHandler[Any]): +class EventInstanceHandler(ApiHandler[EventInstance]): """Event instances for Axis devices.""" - async def _api_request(self) -> dict[str, Any]: + async def _api_request(self) -> dict[str, EventInstance]: """Get default data of API discovery.""" return await self.get_event_instances() - async def get_event_instances(self) -> dict[str, Any]: + async def get_event_instances(self) -> dict[str, EventInstance]: """List all event instances.""" bytes_data = await self.vapix.api_request(ListEventInstancesRequest()) response = ListEventInstancesResponse.decode(bytes_data) @@ -39,8 +39,6 @@ def get_expected_events_per_topic( """ grouped: dict[str, list[Event]] = {} for item in self.values(): - if not isinstance(item, EventInstance): - continue if not include_internal_topics and item.topic in BLACK_LISTED_TOPICS: continue grouped[item.topic] = item.to_events()