Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/pyvlx/node_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ async def process_frame_status_request_notification(
if position_lower_curtain.position <= Parameter.MAX:
node_changed |= _set_node_property(node, "position_lower_curtain", position_lower_curtain)

elif isinstance(node, OpeningDevice):
if NodeParameter(0) in frame.parameter_data: # MP
position = Position(frame.parameter_data[NodeParameter(0)])
if self._is_concrete_position(position):
node_changed |= _set_node_property(node, "position", position)

if node_changed:
await node.after_update()

Expand Down
313 changes: 309 additions & 4 deletions test/node_updater_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@

from pyvlx import Node, OpeningDevice, PyVLX
from pyvlx.api.frames import (
FrameCommandRunStatusNotification, FrameGetAllNodesInformationNotification,
FrameBase, FrameCommandRunStatusNotification,
FrameGetAllNodesInformationNotification,
FrameNodeStatePositionChangedNotification, FrameStatusRequestNotification)
from pyvlx.connection import Connection
from pyvlx.const import NodeParameter, OperatingState, RunStatus, StatusReply
from pyvlx.const import (
Command, NodeParameter, OperatingState, RunStatus, StatusReply)
from pyvlx.dimmable_device import DimmableDevice
from pyvlx.node_updater import NodeUpdater
from pyvlx.node_updater import NodeUpdater, _set_node_property
from pyvlx.on_off_switch import OnOffSwitch
from pyvlx.opening_device import Blind, DualRollerShutter
from pyvlx.opening_device import (
Blind, DualRollerShutter, GarageDoor, Gate, RollerShutter)
from pyvlx.parameter import Intensity, Parameter, Position, SwitchParameter


Expand All @@ -27,6 +30,18 @@ def setUp(self) -> None:
self.node_updater = NodeUpdater(self.pyvlx)
self.pyvlx.nodes = {}

async def test_set_node_property_logs_unchanged_value(self) -> None:
"""Test that unchanged property logging still returns False."""
node = OpeningDevice(
pyvlx=self.pyvlx, node_id=1, name="Test device"
)

changed = _set_node_property(
node, "position", node.position, log_unchanged=True
)

self.assertFalse(changed)

async def test_last_frame_state_set_on_node_state_position_changed(self) -> None:
"""Test that last_frame_state is set when FrameNodeStatePositionChangedNotification is received."""
# Create a test node
Expand Down Expand Up @@ -290,6 +305,60 @@ async def test_blind_missing_params_but_status_reply_changed(self) -> None:
self.assertEqual(blind.last_frame_status_reply, StatusReply.BATTERY_LEVEL)
blind.after_update.assert_awaited_once()

async def test_blind_unavailable_position_does_not_replace_existing_position(self) -> None:
"""Test that unavailable Blind MP data does not replace an existing position."""
blind = Blind(
pyvlx=self.pyvlx, node_id=1, name="Test blind", serial_number=None
)
blind.position = Position(position_percent=20)
blind.orientation = Position(position_percent=0)
blind.last_frame_status_reply = StatusReply.UNKNOWN_STATUS_REPLY
blind.last_frame_run_status = RunStatus.EXECUTION_COMPLETED
blind.after_update = AsyncMock() # type: ignore[method-assign]
self.pyvlx.nodes[1] = blind

frame = FrameStatusRequestNotification()
frame.node_id = 1
frame.status_reply = StatusReply.UNKNOWN_STATUS_REPLY
frame.run_status = RunStatus.EXECUTION_COMPLETED
frame.parameter_data = {
NodeParameter(0): Parameter(Position(position=Parameter.UNKNOWN_VALUE).raw),
NodeParameter(3): Parameter(Position(position_percent=80).raw),
}

await self.node_updater.process_frame_status_request_notification(frame)

self.assertEqual(blind.position, Position(position_percent=20))
self.assertEqual(blind.orientation, Position(position_percent=80))
blind.after_update.assert_awaited_once()

async def test_blind_unavailable_orientation_does_not_replace_existing_orientation(self) -> None:
"""Test that unavailable Blind FP3 data does not replace an existing orientation."""
blind = Blind(
pyvlx=self.pyvlx, node_id=1, name="Test blind", serial_number=None
)
blind.position = Position(position_percent=20)
blind.orientation = Position(position_percent=30)
blind.last_frame_status_reply = StatusReply.UNKNOWN_STATUS_REPLY
blind.last_frame_run_status = RunStatus.EXECUTION_COMPLETED
blind.after_update = AsyncMock() # type: ignore[method-assign]
self.pyvlx.nodes[1] = blind

frame = FrameStatusRequestNotification()
frame.node_id = 1
frame.status_reply = StatusReply.UNKNOWN_STATUS_REPLY
frame.run_status = RunStatus.EXECUTION_COMPLETED
frame.parameter_data = {
NodeParameter(0): Parameter(Position(position_percent=40).raw),
NodeParameter(3): Parameter(Position(position=Parameter.UNKNOWN_VALUE).raw),
}

await self.node_updater.process_frame_status_request_notification(frame)

self.assertEqual(blind.position, Position(position_percent=40))
self.assertEqual(blind.orientation, Position(position_percent=30))
blind.after_update.assert_awaited_once()

async def test_dual_roller_shutter_position_changed_triggers_after_update(self) -> None:
"""Test that a DualRollerShutter frame with changed positions triggers after_update() once."""
shutter = DualRollerShutter(
Expand Down Expand Up @@ -374,6 +443,161 @@ async def test_dual_roller_shutter_missing_params_but_status_reply_changed(self)
self.assertEqual(shutter.last_frame_status_reply, StatusReply.BATTERY_LEVEL)
shutter.after_update.assert_awaited_once()

async def test_dual_roller_shutter_ignores_unavailable_positions(self) -> None:
"""Test that unavailable DualRollerShutter status values are ignored."""
shutter = DualRollerShutter(
pyvlx=self.pyvlx, node_id=2, name="Test shutter", serial_number=None
)
shutter.position = Position(position_percent=30)
shutter.position_upper_curtain = Position(position_percent=40)
shutter.position_lower_curtain = Position(position_percent=60)
shutter.last_frame_status_reply = StatusReply.UNKNOWN_STATUS_REPLY
shutter.last_frame_run_status = RunStatus.EXECUTION_COMPLETED
shutter.after_update = AsyncMock() # type: ignore[method-assign]
self.pyvlx.nodes[2] = shutter

frame = FrameStatusRequestNotification()
frame.node_id = 2
frame.status_reply = StatusReply.UNKNOWN_STATUS_REPLY
frame.run_status = RunStatus.EXECUTION_COMPLETED
frame.parameter_data = {
NodeParameter(0): Parameter(Position(position=Parameter.UNKNOWN_VALUE).raw),
NodeParameter(1): Parameter(Position(position=Parameter.UNKNOWN_VALUE).raw),
NodeParameter(2): Parameter(Position(position=Parameter.UNKNOWN_VALUE).raw),
}

await self.node_updater.process_frame_status_request_notification(frame)

self.assertEqual(shutter.position, Position(position_percent=30))
self.assertEqual(shutter.position_upper_curtain, Position(position_percent=40))
self.assertEqual(shutter.position_lower_curtain, Position(position_percent=60))
shutter.after_update.assert_not_awaited()

async def test_gate_position_recovers_from_status_request_notification(self) -> None:
"""Test that a Gate updates its position from status request MP data."""
gate = Gate(
pyvlx=self.pyvlx, node_id=3, name="Test gate", serial_number=None
)
gate.position = Position(position=Parameter.UNKNOWN_VALUE)
gate.last_frame_status_reply = StatusReply.UNKNOWN_STATUS_REPLY
gate.last_frame_run_status = RunStatus.EXECUTION_COMPLETED
gate.after_update = AsyncMock() # type: ignore[method-assign]
self.pyvlx.nodes[3] = gate

frame = FrameStatusRequestNotification()
frame.node_id = 3
frame.status_reply = StatusReply.UNKNOWN_STATUS_REPLY
frame.run_status = RunStatus.EXECUTION_COMPLETED
frame.parameter_data = {
NodeParameter(0): Parameter(Position(position_percent=100).raw),
}

await self.node_updater.process_frame_status_request_notification(frame)

self.assertEqual(gate.position, Position(position_percent=100))
gate.after_update.assert_awaited_once()

async def test_garage_door_position_recovers_from_status_request_notification(self) -> None:
"""Test that a GarageDoor updates its position from status request MP data."""
garage_door = GarageDoor(
pyvlx=self.pyvlx, node_id=4, name="Test garage door", serial_number=None
)
garage_door.position = Position(position=Parameter.UNKNOWN_VALUE)
garage_door.last_frame_status_reply = StatusReply.UNKNOWN_STATUS_REPLY
garage_door.last_frame_run_status = RunStatus.EXECUTION_COMPLETED
garage_door.after_update = AsyncMock() # type: ignore[method-assign]
self.pyvlx.nodes[4] = garage_door

frame = FrameStatusRequestNotification()
frame.node_id = 4
frame.status_reply = StatusReply.UNKNOWN_STATUS_REPLY
frame.run_status = RunStatus.EXECUTION_COMPLETED
frame.parameter_data = {
NodeParameter(0): Parameter(Position(position_percent=0).raw),
}

await self.node_updater.process_frame_status_request_notification(frame)

self.assertEqual(garage_door.position, Position(position_percent=0))
garage_door.after_update.assert_awaited_once()

async def test_roller_shutter_position_recovers_from_status_request_notification(self) -> None:
"""Test that a RollerShutter updates its position from status request MP data."""
roller_shutter = RollerShutter(
pyvlx=self.pyvlx, node_id=6, name="Test roller shutter", serial_number=None
)
roller_shutter.position = Position(position=Parameter.UNKNOWN_VALUE)
roller_shutter.last_frame_status_reply = StatusReply.UNKNOWN_STATUS_REPLY
roller_shutter.last_frame_run_status = RunStatus.EXECUTION_COMPLETED
roller_shutter.after_update = AsyncMock() # type: ignore[method-assign]
self.pyvlx.nodes[6] = roller_shutter

frame = FrameStatusRequestNotification()
frame.node_id = 6
frame.status_reply = StatusReply.UNKNOWN_STATUS_REPLY
frame.run_status = RunStatus.EXECUTION_COMPLETED
frame.parameter_data = {
NodeParameter(0): Parameter(Position(position_percent=75).raw),
}

await self.node_updater.process_frame_status_request_notification(frame)

self.assertEqual(roller_shutter.position, Position(position_percent=75))
roller_shutter.after_update.assert_awaited_once()

async def test_opening_device_ignores_unavailable_status_request_position(self) -> None:
"""Test that unavailable status request MP data does not replace a concrete position."""
gate = Gate(
pyvlx=self.pyvlx, node_id=5, name="Test gate", serial_number=None
)
gate.position = Position(position_percent=50)
gate.last_frame_status_reply = StatusReply.UNKNOWN_STATUS_REPLY
gate.last_frame_run_status = RunStatus.EXECUTION_COMPLETED
gate.after_update = AsyncMock() # type: ignore[method-assign]
self.pyvlx.nodes[5] = gate

frame = FrameStatusRequestNotification()
frame.node_id = 5
frame.status_reply = StatusReply.UNKNOWN_STATUS_REPLY
frame.run_status = RunStatus.EXECUTION_COMPLETED
frame.parameter_data = {
NodeParameter(0): Parameter(Position(position=Parameter.UNKNOWN_VALUE).raw),
}

await self.node_updater.process_frame_status_request_notification(frame)

self.assertEqual(gate.position, Position(position_percent=50))
gate.after_update.assert_not_awaited()

async def test_opening_device_missing_status_request_position_does_not_update(self) -> None:
"""Test that missing status request MP data leaves an OpeningDevice unchanged."""
gate = Gate(
pyvlx=self.pyvlx, node_id=7, name="Test gate", serial_number=None
)
gate.position = Position(position_percent=50)
gate.last_frame_status_reply = StatusReply.UNKNOWN_STATUS_REPLY
gate.last_frame_run_status = RunStatus.EXECUTION_COMPLETED
gate.after_update = AsyncMock() # type: ignore[method-assign]
self.pyvlx.nodes[7] = gate

frame = FrameStatusRequestNotification()
frame.node_id = 7
frame.status_reply = StatusReply.UNKNOWN_STATUS_REPLY
frame.run_status = RunStatus.EXECUTION_COMPLETED
frame.parameter_data = {}

await self.node_updater.process_frame_status_request_notification(frame)

self.assertEqual(gate.position, Position(position_percent=50))
gate.after_update.assert_not_awaited()

async def test_status_request_notification_for_unknown_node_returns_early(self) -> None:
"""Test that status request notifications for unknown nodes are ignored."""
frame = FrameStatusRequestNotification()
frame.node_id = 99

await self.node_updater.process_frame_status_request_notification(frame)

# ── process_frame: after_update called when individual properties change ──

async def test_after_update_called_when_position_changes(self) -> None:
Expand Down Expand Up @@ -919,6 +1143,28 @@ async def test_after_update_called_when_parameter_changes_from_on_to_off_switch(
self.assertTrue(switch.parameter.is_off())
switch.after_update.assert_awaited_once()

async def test_on_off_switch_ignores_mismatched_target(self) -> None:
"""Test that OnOffSwitch ignores state frames where current and target differ."""
switch = OnOffSwitch(
pyvlx=self.pyvlx, node_id=13, name="Test switch", serial_number=None
)
switch.parameter = SwitchParameter(state=Parameter.OFF)
switch.last_frame_state = OperatingState.DONE
switch.after_update = AsyncMock() # type: ignore[method-assign]
self.pyvlx.nodes[13] = switch

frame = FrameNodeStatePositionChangedNotification()
frame.node_id = 13
frame.state = OperatingState.DONE
frame.current_position = Parameter(Parameter.from_int(Parameter.ON))
frame.target = Parameter(Parameter.from_int(Parameter.OFF))
frame.remaining_time = 0

await self.node_updater.process_frame(frame)

self.assertTrue(switch.parameter.is_off())
switch.after_update.assert_not_awaited()

async def test_after_update_called_when_intensity_changes_dimmable(self) -> None:
"""Test that after_update() is called when intensity changes on a DimmableDevice."""
device = DimmableDevice(
Expand All @@ -941,6 +1187,28 @@ async def test_after_update_called_when_intensity_changes_dimmable(self) -> None
self.assertEqual(device.intensity, Intensity(intensity_percent=75))
device.after_update.assert_awaited_once()

async def test_dimmable_device_ignores_unavailable_intensity(self) -> None:
"""Test that unavailable intensity values do not replace concrete intensity."""
device = DimmableDevice(
pyvlx=self.pyvlx, node_id=14, name="Test light", serial_number=None
)
device.intensity = Intensity(intensity_percent=25)
device.last_frame_state = OperatingState.DONE
device.after_update = AsyncMock() # type: ignore[method-assign]
self.pyvlx.nodes[14] = device

frame = FrameNodeStatePositionChangedNotification()
frame.node_id = 14
frame.state = OperatingState.DONE
frame.current_position = Intensity(intensity=Parameter.UNKNOWN_VALUE)
frame.target = Intensity(intensity=Parameter.UNKNOWN_VALUE)
frame.remaining_time = 0

await self.node_updater.process_frame(frame)

self.assertEqual(device.intensity, Intensity(intensity_percent=25))
device.after_update.assert_not_awaited()

async def test_after_update_called_when_is_opening_stops(self) -> None:
"""Test that after_update() is called when is_opening transitions from True to False."""
device = OpeningDevice(
Expand Down Expand Up @@ -1012,6 +1280,43 @@ async def test_process_command_run_status_notification(self) -> None:
self.assertEqual(mocked_node.last_frame_status_reply, StatusReply.COMMAND_COMPLETED_OK)
mocked_node.after_update.assert_awaited_once()

async def test_process_frame_dispatches_status_request_notification(self) -> None:
"""Test that process_frame dispatches FrameStatusRequestNotification."""
gate = Gate(
pyvlx=self.pyvlx, node_id=15, name="Test gate", serial_number=None
)
gate.position = Position(position=Parameter.UNKNOWN_VALUE)
gate.last_frame_status_reply = StatusReply.UNKNOWN_STATUS_REPLY
gate.last_frame_run_status = RunStatus.EXECUTION_COMPLETED
gate.after_update = AsyncMock() # type: ignore[method-assign]
self.pyvlx.nodes[15] = gate

frame = FrameStatusRequestNotification()
frame.node_id = 15
frame.status_reply = StatusReply.UNKNOWN_STATUS_REPLY
frame.run_status = RunStatus.EXECUTION_COMPLETED
frame.parameter_data = {
NodeParameter(0): Parameter(Position(position_percent=25).raw),
}

await self.node_updater.process_frame(frame)

self.assertEqual(gate.position, Position(position_percent=25))
gate.after_update.assert_awaited_once()

async def test_process_frame_ignores_unknown_frame_type(self) -> None:
"""Test that process_frame ignores frame types without node update handling."""
frame = FrameBase(Command.GW_GET_STATE_REQ)

await self.node_updater.process_frame(frame)

async def test_node_state_frame_for_unknown_node_returns_early(self) -> None:
"""Test that node state frames for unknown nodes are ignored."""
frame = FrameNodeStatePositionChangedNotification()
frame.node_id = 99

await self.node_updater.process_frame(frame)

async def test_process_command_run_status_notification_run_status_changed(self) -> None:
"""Test that FrameCommandRunStatusNotification with only run_status change triggers after_update."""
mocked_pyvlx = MagicMock(spec=PyVLX)
Expand Down
Loading