diff --git a/switchbot/adv_parsers/relay_switch.py b/switchbot/adv_parsers/relay_switch.py index bbac74ef..a44c253e 100644 --- a/switchbot/adv_parsers/relay_switch.py +++ b/switchbot/adv_parsers/relay_switch.py @@ -54,12 +54,18 @@ def process_relay_switch_2pm( 1: { **process_relay_switch_common_data(data, mfr_data), "power": parse_power_data(mfr_data, 10), + "mode": mfr_data[9] & 0b00001111, + "position": mfr_data[14], + "calibration": bool(mfr_data[8] & 0b01000000), }, 2: { "switchMode": True, # for compatibility, useless "sequence_number": mfr_data[6], "isOn": bool(mfr_data[7] & 0b01000000), "power": parse_power_data(mfr_data, 12), + "mode": mfr_data[9] & 0b11110000 >> 4, + "position": mfr_data[14], + "calibration": bool(mfr_data[8] & 0b01000000), }, "sequence_number": mfr_data[6], } diff --git a/switchbot/devices/base_cover.py b/switchbot/devices/base_cover.py index a721c8a4..d127a618 100644 --- a/switchbot/devices/base_cover.py +++ b/switchbot/devices/base_cover.py @@ -33,8 +33,9 @@ class SwitchbotBaseCover(SwitchbotDevice): """Representation of a Switchbot Cover devices for both curtains and tilt blinds.""" - def __init__(self, reverse: bool, *args: Any, **kwargs: Any) -> None: + def __init__(self, *args: Any, **kwargs: Any) -> None: """Switchbot Cover device constructor.""" + reverse: bool = kwargs.pop("reverse", False) super().__init__(*args, **kwargs) self._reverse = reverse self._settings: dict[str, Any] = {} diff --git a/switchbot/devices/blind_tilt.py b/switchbot/devices/blind_tilt.py index 3fbfea86..d708e592 100644 --- a/switchbot/devices/blind_tilt.py +++ b/switchbot/devices/blind_tilt.py @@ -45,7 +45,7 @@ class SwitchbotBlindTilt(SwitchbotBaseCover, SwitchbotSequenceDevice): def __init__(self, *args: Any, **kwargs: Any) -> None: """Switchbot Blind Tilt/woBlindTilt constructor.""" self._reverse: bool = kwargs.pop("reverse_mode", False) - super().__init__(self._reverse, *args, **kwargs) + super().__init__(*args, reverse=self._reverse, **kwargs) def _set_parsed_data( self, advertisement: SwitchBotAdvertisement, data: dict[str, Any] diff --git a/switchbot/devices/curtain.py b/switchbot/devices/curtain.py index 877aa994..a8c6d7e4 100644 --- a/switchbot/devices/curtain.py +++ b/switchbot/devices/curtain.py @@ -47,10 +47,9 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: # the definition of position is the same as in Home Assistant. self._reverse: bool = kwargs.pop("reverse_mode", True) - super().__init__(self._reverse, *args, **kwargs) + super().__init__(*args, reverse=self._reverse, **kwargs) self._settings: dict[str, Any] = {} self.ext_info_sum: dict[str, Any] = {} - self.ext_info_adv: dict[str, Any] = {} def _set_parsed_data( self, advertisement: SwitchBotAdvertisement, data: dict[str, Any] diff --git a/switchbot/devices/relay_switch.py b/switchbot/devices/relay_switch.py index 0e9593b5..97d0bb43 100644 --- a/switchbot/devices/relay_switch.py +++ b/switchbot/devices/relay_switch.py @@ -4,6 +4,8 @@ from bleak.backends.device import BLEDevice +from switchbot.devices.base_cover import SwitchbotBaseCover + from ..const import SwitchbotModel from ..helpers import parse_power_data, parse_uint24_be from ..models import SwitchBotAdvertisement @@ -55,6 +57,12 @@ } } +# roller mode command +COMMAND_OPEN = f"{COMMAND_CONTROL}0D040001" +COMMAND_CLOSE = f"{COMMAND_CONTROL}0D046401" +COMMAND_POSITION = f"{COMMAND_CONTROL}0D04{{}}01" +COMMAND_STOP = f"{COMMAND_CONTROL}0D00" + class SwitchbotRelaySwitch(SwitchbotSequenceDevice, SwitchbotEncryptedDevice): """Representation of a Switchbot relay switch 1pm.""" @@ -94,10 +102,12 @@ def _reset_power_data(self, data: dict[str, Any]) -> None: def _parse_common_data(self, raw_data: bytes) -> dict[str, Any]: """Parse common data from raw bytes.""" return { - "sequence_number": raw_data[1], "isOn": bool(raw_data[2] & SWITCH1_ON_MASK), "firmware": raw_data[16] / 10.0, "channel2_isOn": bool(raw_data[2] & SWITCH2_ON_MASK), + "calibration": bool(raw_data[3] & 0b01000000), + "mode": raw_data[4] & 0b00001111, + "position": raw_data[9] >> 1, } def _parse_user_data(self, raw_data: bytes) -> dict[str, Any]: @@ -171,7 +181,9 @@ async def get_basic_info(self) -> dict[str, Any] | None: return None _LOGGER.debug( - "on-off hex: %s, channel1_hex_data: %s", _data.hex(), _channel1_data.hex() + "get_basic_info raw: %s, channel1 raw: %s", + _data.hex(), + _channel1_data.hex(), ) common_data = self._parse_common_data(_data) @@ -229,25 +241,73 @@ def __init__( super().__init__(device, key_id, encryption_key, interface, model, **kwargs) -class SwitchbotRelaySwitch2PM(SwitchbotRelaySwitch): +class SwitchbotRelaySwitch2PM(SwitchbotRelaySwitch, SwitchbotBaseCover): """Representation of a Switchbot relay switch 2pm.""" - def __init__( + def __init__( # noqa: PLR0913 self, device: BLEDevice, key_id: str, encryption_key: str, interface: int = 0, model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_2PM, + reverse: bool = False, **kwargs: Any, ) -> None: - super().__init__(device, key_id, encryption_key, interface, model, **kwargs) + super().__init__( + device, key_id, encryption_key, interface, model, reverse=reverse, **kwargs + ) self._channel = 2 @property def channel(self) -> int: return self._channel + @property + def position(self) -> int | None: + """Return position.""" + return self._get_adv_value("position", channel=1) + + @property + def mode(self) -> int | None: + """Return mode.""" + return self._get_adv_value("mode", channel=1) + + @update_after_operation + async def open(self) -> bool: + """Send open command. 0 - performance mode, 1 - unfelt mode.""" + self._is_opening = True + self._is_closing = False + result = await self._send_command(COMMAND_OPEN) + return self._check_command_result(result, 0, {1}) + + @update_after_operation + async def close(self) -> bool: + """Send close command. 0 - performance mode, 1 - unfelt mode.""" + self._is_closing = True + self._is_opening = False + result = await self._send_command(COMMAND_CLOSE) + return self._check_command_result(result, 0, {1}) + + @update_after_operation + async def stop(self) -> bool: + """Send stop command to device.""" + self._is_opening = self._is_closing = False + result = await self._send_command(COMMAND_STOP) + return self._check_command_result(result, 0, {1}) + + @update_after_operation + async def set_position(self, position: int, mode: int = 0) -> bool: + """Send position command (0-100) to device. 0 - performance mode, 1 - unfelt mode.""" + prev = self._get_adv_value("position", channel=1) + self._update_motion_direction( + True, + (100 - prev) if prev is not None else None, + 100 - position, + ) + result = await self._send_command(COMMAND_POSITION.format(f"{position:02X}")) + return self._check_command_result(result, 0, {1}) + def get_parsed_data(self, channel: int | None = None) -> dict[str, Any]: """Return parsed device data, optionally for a specific channel.""" data = self.data.get("data") or {} @@ -257,10 +317,18 @@ async def get_basic_info(self): current_time_hex, current_day_start_time_hex = ( self.get_current_time_and_start_time() ) - if not (common_data := await super().get_basic_info()): + if not (_basic_raw := await self._get_basic_info(COMMAND_GET_BASIC_INFO)): + return None + if not ( + _channel1_raw := await self._get_basic_info( + COMMAND_GET_CHANNEL1_INFO.format( + current_time_hex, current_day_start_time_hex + ) + ) + ): return None if not ( - _channel2_data := await self._get_basic_info( + _channel2_raw := await self._get_basic_info( COMMAND_GET_CHANNEL2_INFO.format( current_time_hex, current_day_start_time_hex ) @@ -268,18 +336,35 @@ async def get_basic_info(self): ): return None - _LOGGER.debug("channel2_hex_data: %s", _channel2_data.hex()) + _LOGGER.debug( + "get_basic_info 2PM basic_raw: %s, channel1_raw: %s, channel2_raw: %s", + _basic_raw.hex(), + _channel1_raw.hex(), + _channel2_raw.hex(), + ) + + parsed = self._parse_common_data(_basic_raw) - channel2_data = self._parse_user_data(_channel2_data) - channel2_data["isOn"] = common_data["channel2_isOn"] + channel1_data = self._parse_user_data(_channel1_raw) + channel1_data["isOn"] = parsed["isOn"] + channel1_data["firmware"] = parsed["firmware"] + channel1_data["calibration"] = parsed["calibration"] + channel1_data["mode"] = parsed["mode"] + channel1_data["position"] = parsed["position"] + + if not channel1_data["isOn"]: + self._reset_power_data(channel1_data) + + channel2_data = self._parse_user_data(_channel2_raw) + channel2_data["isOn"] = parsed["channel2_isOn"] if not channel2_data["isOn"]: self._reset_power_data(channel2_data) _LOGGER.debug( - "channel1_data: %s, channel2_data: %s", common_data, channel2_data + "channel1_data: %s, channel2_data: %s", channel1_data, channel2_data ) - return {1: common_data, 2: channel2_data} + return {1: channel1_data, 2: channel2_data} @update_after_operation async def turn_on(self, channel: int) -> bool: @@ -312,3 +397,17 @@ def is_on(self, channel: int) -> bool | None: def switch_mode(self, channel: int) -> bool | None: """Return true or false from cache.""" return self._get_adv_value("switchMode", channel) + + def _update_motion_direction( + self, in_motion: bool, previous_position: int | None, new_position: int + ) -> None: + """Update opening/closing status based on movement.""" + if previous_position is None: + return + if in_motion is False: + self._is_closing = self._is_opening = False + return + + if new_position != previous_position: + self._is_opening = new_position > previous_position + self._is_closing = new_position < previous_position diff --git a/switchbot/devices/roller_shade.py b/switchbot/devices/roller_shade.py index 879213cf..9444dcc6 100644 --- a/switchbot/devices/roller_shade.py +++ b/switchbot/devices/roller_shade.py @@ -36,7 +36,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: # the definition of position is the same as in Home Assistant. self._reverse: bool = kwargs.pop("reverse_mode", True) - super().__init__(self._reverse, *args, **kwargs) + super().__init__(*args, reverse=self._reverse, **kwargs) def _set_parsed_data( self, advertisement: SwitchBotAdvertisement, data: dict[str, Any] diff --git a/tests/test_adv_parser.py b/tests/test_adv_parser.py index c71c3b30..77fd5b25 100644 --- a/tests/test_adv_parser.py +++ b/tests/test_adv_parser.py @@ -3401,12 +3401,18 @@ def test_humidifer_with_empty_data() -> None: "sequence_number": 138, "switchMode": True, "power": 0.0, + "mode": 0, + "position": 0, + "calibration": False, }, 2: { "isOn": True, "sequence_number": 138, "switchMode": True, "power": 70.0, + "mode": 0, + "position": 0, + "calibration": False, }, "sequence_number": 138, }, @@ -3787,12 +3793,18 @@ def test_adv_active(test_case: AdvTestCase) -> None: "sequence_number": 138, "switchMode": True, "power": 0.0, + "mode": 0, + "position": 0, + "calibration": False, }, 2: { "isOn": True, "sequence_number": 138, "switchMode": True, "power": 70.0, + "mode": 0, + "position": 0, + "calibration": False, }, "sequence_number": 138, }, diff --git a/tests/test_base_cover.py b/tests/test_base_cover.py index 071ac79f..6653d293 100644 --- a/tests/test_base_cover.py +++ b/tests/test_base_cover.py @@ -11,7 +11,7 @@ def create_device_for_command_testing(position=50, calibration=True): ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any") - base_cover_device = base_cover.SwitchbotBaseCover(False, ble_device) + base_cover_device = base_cover.SwitchbotBaseCover(ble_device, reverse=False) base_cover_device.update_from_advertisement( make_advertisement_data(ble_device, True, position, calibration) ) @@ -50,7 +50,7 @@ def make_advertisement_data( @pytest.mark.asyncio async def test_send_multiple_commands(): ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any") - base_cover_device = base_cover.SwitchbotBaseCover(False, ble_device) + base_cover_device = base_cover.SwitchbotBaseCover(ble_device, reverse=False) base_cover_device.update_from_advertisement( make_advertisement_data(ble_device, True, 50, True) ) diff --git a/tests/test_relay_switch.py b/tests/test_relay_switch.py index 6dd38926..0971ad5d 100644 --- a/tests/test_relay_switch.py +++ b/tests/test_relay_switch.py @@ -501,3 +501,172 @@ async def test_press(): ) await device.press() device._send_command.assert_awaited_once_with(device._press_command) + + +def create_2pm_device_with_position(position: int = 50, calibration: bool = True): + """Create a 2PM device with position/calibration data for cover testing.""" + return create_device_for_command_testing( + b"\x00\x00\x00\x00\x00\x00", + SwitchbotModel.RELAY_SWITCH_2PM, + { + 1: { + "switchMode": True, + "sequence_number": 99, + "isOn": True, + "position": position, + "calibration": calibration, + "mode": 0, + }, + 2: { + "switchMode": True, + "sequence_number": 99, + "isOn": False, + "position": position, + "calibration": calibration, + "mode": 0, + }, + }, + ) + + +@pytest.mark.asyncio +async def test_2pm_open(): + """Test open command for 2PM roller mode.""" + device = create_2pm_device_with_position() + await device.open() + device._send_command.assert_called_with(relay_switch.COMMAND_OPEN) + assert device.is_opening() is True + assert device.is_closing() is False + + +@pytest.mark.asyncio +async def test_2pm_close(): + """Test close command for 2PM roller mode.""" + device = create_2pm_device_with_position() + await device.close() + device._send_command.assert_called_with(relay_switch.COMMAND_CLOSE) + assert device.is_opening() is False + assert device.is_closing() is True + + +@pytest.mark.asyncio +async def test_2pm_stop(): + """Test stop command for 2PM roller mode.""" + device = create_2pm_device_with_position() + await device.stop() + device._send_command.assert_called_with(relay_switch.COMMAND_STOP) + assert device.is_opening() is False + assert device.is_closing() is False + + +@pytest.mark.asyncio +async def test_2pm_set_position_closing(): + """Test set_position to a higher device position (closing in HA terms).""" + device = create_2pm_device_with_position(position=30) + await device.set_position(80) + device._send_command.assert_called_with( + relay_switch.COMMAND_POSITION.format(f"{80:02X}") + ) + assert device.is_opening() is False + assert device.is_closing() is True + + +@pytest.mark.asyncio +async def test_2pm_set_position_opening(): + """Test set_position to a lower device position (opening in HA terms).""" + device = create_2pm_device_with_position(position=80) + await device.set_position(20) + device._send_command.assert_called_with( + relay_switch.COMMAND_POSITION.format(f"{20:02X}") + ) + assert device.is_opening() is True + assert device.is_closing() is False + + +@pytest.mark.asyncio +async def test_2pm_set_position_passthrough(): + """Test set_position sends position directly without transformation.""" + ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any") + device = relay_switch.SwitchbotRelaySwitch2PM( + ble_device, "ff", "ffffffffffffffffffffffffffffffff" + ) + device.update_from_advertisement( + make_advertisement_data( + ble_device, + b"\x00\x00\x00\x00\x00\x00", + SwitchbotModel.RELAY_SWITCH_2PM, + { + 1: { + "switchMode": True, + "sequence_number": 99, + "isOn": True, + "position": 30, + "calibration": True, + "mode": 0, + }, + 2: { + "switchMode": True, + "sequence_number": 99, + "isOn": False, + "position": 30, + "calibration": True, + "mode": 0, + }, + }, + ) + ) + device._send_command = AsyncMock() + device._check_command_result = MagicMock() + device.update = AsyncMock() + + await device.set_position(40) + # position sent directly as-is + device._send_command.assert_called_with( + relay_switch.COMMAND_POSITION.format(f"{40:02X}") + ) + + +def test_2pm_position_property(): + """Test position property returns value from channel 1.""" + device = create_2pm_device_with_position(position=42) + assert device.position == 42 + + +def test_2pm_mode_property(): + """Test mode property returns value from channel 1.""" + device = create_2pm_device_with_position() + assert device.mode == 0 + + +def test_2pm_update_motion_direction_no_previous(): + """Test _update_motion_direction with no previous position does nothing.""" + device = create_2pm_device_with_position() + device._update_motion_direction(True, None, 80) + assert device.is_opening() is False + assert device.is_closing() is False + + +def test_2pm_update_motion_direction_stop(): + """Test _update_motion_direction with in_motion=False clears both flags.""" + device = create_2pm_device_with_position() + device._is_opening = True + device._is_closing = True + device._update_motion_direction(False, 50, 80) + assert device.is_opening() is False + assert device.is_closing() is False + + +def test_2pm_update_motion_direction_opening(): + """Test _update_motion_direction detects opening.""" + device = create_2pm_device_with_position() + device._update_motion_direction(True, 30, 70) + assert device.is_opening() is True + assert device.is_closing() is False + + +def test_2pm_update_motion_direction_closing(): + """Test _update_motion_direction detects closing.""" + device = create_2pm_device_with_position() + device._update_motion_direction(True, 70, 30) + assert device.is_opening() is False + assert device.is_closing() is True