Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions switchbot/adv_parsers/relay_switch.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,18 @@ def process_relay_switch_2pm(
1: {
**process_relay_switch_common_data(data, mfr_data),
"power": parse_power_data(mfr_data, 10),
"mode": mfr_data[9] & 0b00001111,
"position": mfr_data[14],
"calibration": bool(mfr_data[8] & 0b01000000),
},
2: {
"switchMode": True, # for compatibility, useless
"sequence_number": mfr_data[6],
"isOn": bool(mfr_data[7] & 0b01000000),
"power": parse_power_data(mfr_data, 12),
"mode": mfr_data[9] & 0b11110000 >> 4,
"position": mfr_data[14],
"calibration": bool(mfr_data[8] & 0b01000000),
},
"sequence_number": mfr_data[6],
}
3 changes: 2 additions & 1 deletion switchbot/devices/base_cover.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@
class SwitchbotBaseCover(SwitchbotDevice):
"""Representation of a Switchbot Cover devices for both curtains and tilt blinds."""

def __init__(self, reverse: bool, *args: Any, **kwargs: Any) -> None:
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Switchbot Cover device constructor."""
reverse: bool = kwargs.pop("reverse", False)
super().__init__(*args, **kwargs)
self._reverse = reverse
self._settings: dict[str, Any] = {}
Expand Down
2 changes: 1 addition & 1 deletion switchbot/devices/blind_tilt.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class SwitchbotBlindTilt(SwitchbotBaseCover, SwitchbotSequenceDevice):
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Switchbot Blind Tilt/woBlindTilt constructor."""
self._reverse: bool = kwargs.pop("reverse_mode", False)
super().__init__(self._reverse, *args, **kwargs)
super().__init__(*args, reverse=self._reverse, **kwargs)

def _set_parsed_data(
self, advertisement: SwitchBotAdvertisement, data: dict[str, Any]
Expand Down
3 changes: 1 addition & 2 deletions switchbot/devices/curtain.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,9 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
# the definition of position is the same as in Home Assistant.

self._reverse: bool = kwargs.pop("reverse_mode", True)
super().__init__(self._reverse, *args, **kwargs)
super().__init__(*args, reverse=self._reverse, **kwargs)
self._settings: dict[str, Any] = {}
self.ext_info_sum: dict[str, Any] = {}
self.ext_info_adv: dict[str, Any] = {}

def _set_parsed_data(
self, advertisement: SwitchBotAdvertisement, data: dict[str, Any]
Expand Down
123 changes: 111 additions & 12 deletions switchbot/devices/relay_switch.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from bleak.backends.device import BLEDevice

from switchbot.devices.base_cover import SwitchbotBaseCover

from ..const import SwitchbotModel
from ..helpers import parse_power_data, parse_uint24_be
from ..models import SwitchBotAdvertisement
Expand Down Expand Up @@ -55,6 +57,12 @@
}
}

# roller mode command
COMMAND_OPEN = f"{COMMAND_CONTROL}0D040001"
COMMAND_CLOSE = f"{COMMAND_CONTROL}0D046401"
COMMAND_POSITION = f"{COMMAND_CONTROL}0D04{{}}01"
COMMAND_STOP = f"{COMMAND_CONTROL}0D00"


class SwitchbotRelaySwitch(SwitchbotSequenceDevice, SwitchbotEncryptedDevice):
"""Representation of a Switchbot relay switch 1pm."""
Expand Down Expand Up @@ -94,10 +102,12 @@ def _reset_power_data(self, data: dict[str, Any]) -> None:
def _parse_common_data(self, raw_data: bytes) -> dict[str, Any]:
"""Parse common data from raw bytes."""
return {
"sequence_number": raw_data[1],
"isOn": bool(raw_data[2] & SWITCH1_ON_MASK),
"firmware": raw_data[16] / 10.0,
"channel2_isOn": bool(raw_data[2] & SWITCH2_ON_MASK),
"calibration": bool(raw_data[3] & 0b01000000),
"mode": raw_data[4] & 0b00001111,
"position": raw_data[9] >> 1,
}

def _parse_user_data(self, raw_data: bytes) -> dict[str, Any]:
Expand Down Expand Up @@ -171,7 +181,9 @@ async def get_basic_info(self) -> dict[str, Any] | None:
return None

_LOGGER.debug(
"on-off hex: %s, channel1_hex_data: %s", _data.hex(), _channel1_data.hex()
"get_basic_info raw: %s, channel1 raw: %s",
_data.hex(),
_channel1_data.hex(),
)

common_data = self._parse_common_data(_data)
Expand Down Expand Up @@ -229,25 +241,73 @@ def __init__(
super().__init__(device, key_id, encryption_key, interface, model, **kwargs)


class SwitchbotRelaySwitch2PM(SwitchbotRelaySwitch):
class SwitchbotRelaySwitch2PM(SwitchbotRelaySwitch, SwitchbotBaseCover):
"""Representation of a Switchbot relay switch 2pm."""

def __init__(
def __init__( # noqa: PLR0913
self,
device: BLEDevice,
key_id: str,
encryption_key: str,
interface: int = 0,
model: SwitchbotModel = SwitchbotModel.RELAY_SWITCH_2PM,
reverse: bool = False,
**kwargs: Any,
) -> None:
super().__init__(device, key_id, encryption_key, interface, model, **kwargs)
super().__init__(
device, key_id, encryption_key, interface, model, reverse=reverse, **kwargs
)
self._channel = 2

@property
def channel(self) -> int:
return self._channel

@property
def position(self) -> int | None:
"""Return position."""
return self._get_adv_value("position", channel=1)

@property
def mode(self) -> int | None:
"""Return mode."""
return self._get_adv_value("mode", channel=1)

@update_after_operation
async def open(self) -> bool:
"""Send open command. 0 - performance mode, 1 - unfelt mode."""
self._is_opening = True
self._is_closing = False
result = await self._send_command(COMMAND_OPEN)
return self._check_command_result(result, 0, {1})

@update_after_operation
async def close(self) -> bool:
"""Send close command. 0 - performance mode, 1 - unfelt mode."""
self._is_closing = True
self._is_opening = False
result = await self._send_command(COMMAND_CLOSE)
return self._check_command_result(result, 0, {1})

@update_after_operation
async def stop(self) -> bool:
"""Send stop command to device."""
self._is_opening = self._is_closing = False
result = await self._send_command(COMMAND_STOP)
return self._check_command_result(result, 0, {1})

@update_after_operation
async def set_position(self, position: int, mode: int = 0) -> bool:
"""Send position command (0-100) to device. 0 - performance mode, 1 - unfelt mode."""
prev = self._get_adv_value("position", channel=1)
self._update_motion_direction(
True,
(100 - prev) if prev is not None else None,
100 - position,
)
result = await self._send_command(COMMAND_POSITION.format(f"{position:02X}"))
return self._check_command_result(result, 0, {1})

def get_parsed_data(self, channel: int | None = None) -> dict[str, Any]:
"""Return parsed device data, optionally for a specific channel."""
data = self.data.get("data") or {}
Expand All @@ -257,29 +317,54 @@ async def get_basic_info(self):
current_time_hex, current_day_start_time_hex = (
self.get_current_time_and_start_time()
)
if not (common_data := await super().get_basic_info()):
if not (_basic_raw := await self._get_basic_info(COMMAND_GET_BASIC_INFO)):
return None
if not (
_channel1_raw := await self._get_basic_info(
COMMAND_GET_CHANNEL1_INFO.format(
current_time_hex, current_day_start_time_hex
)
)
):
return None
if not (
_channel2_data := await self._get_basic_info(
_channel2_raw := await self._get_basic_info(
COMMAND_GET_CHANNEL2_INFO.format(
current_time_hex, current_day_start_time_hex
)
)
):
return None

_LOGGER.debug("channel2_hex_data: %s", _channel2_data.hex())
_LOGGER.debug(
"get_basic_info 2PM basic_raw: %s, channel1_raw: %s, channel2_raw: %s",
_basic_raw.hex(),
_channel1_raw.hex(),
_channel2_raw.hex(),
)

parsed = self._parse_common_data(_basic_raw)

channel2_data = self._parse_user_data(_channel2_data)
channel2_data["isOn"] = common_data["channel2_isOn"]
channel1_data = self._parse_user_data(_channel1_raw)
channel1_data["isOn"] = parsed["isOn"]
channel1_data["firmware"] = parsed["firmware"]
channel1_data["calibration"] = parsed["calibration"]
channel1_data["mode"] = parsed["mode"]
channel1_data["position"] = parsed["position"]

if not channel1_data["isOn"]:
self._reset_power_data(channel1_data)

channel2_data = self._parse_user_data(_channel2_raw)
channel2_data["isOn"] = parsed["channel2_isOn"]

if not channel2_data["isOn"]:
self._reset_power_data(channel2_data)

_LOGGER.debug(
"channel1_data: %s, channel2_data: %s", common_data, channel2_data
"channel1_data: %s, channel2_data: %s", channel1_data, channel2_data
)
return {1: common_data, 2: channel2_data}
return {1: channel1_data, 2: channel2_data}

@update_after_operation
async def turn_on(self, channel: int) -> bool:
Expand Down Expand Up @@ -312,3 +397,17 @@ def is_on(self, channel: int) -> bool | None:
def switch_mode(self, channel: int) -> bool | None:
"""Return true or false from cache."""
return self._get_adv_value("switchMode", channel)

def _update_motion_direction(
self, in_motion: bool, previous_position: int | None, new_position: int
) -> None:
"""Update opening/closing status based on movement."""
if previous_position is None:
return
if in_motion is False:
self._is_closing = self._is_opening = False
return

if new_position != previous_position:
self._is_opening = new_position > previous_position
self._is_closing = new_position < previous_position
2 changes: 1 addition & 1 deletion switchbot/devices/roller_shade.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
# the definition of position is the same as in Home Assistant.

self._reverse: bool = kwargs.pop("reverse_mode", True)
super().__init__(self._reverse, *args, **kwargs)
super().__init__(*args, reverse=self._reverse, **kwargs)

def _set_parsed_data(
self, advertisement: SwitchBotAdvertisement, data: dict[str, Any]
Expand Down
12 changes: 12 additions & 0 deletions tests/test_adv_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -3401,12 +3401,18 @@ def test_humidifer_with_empty_data() -> None:
"sequence_number": 138,
"switchMode": True,
"power": 0.0,
"mode": 0,
"position": 0,
"calibration": False,
},
2: {
"isOn": True,
"sequence_number": 138,
"switchMode": True,
"power": 70.0,
"mode": 0,
"position": 0,
"calibration": False,
},
"sequence_number": 138,
},
Expand Down Expand Up @@ -3787,12 +3793,18 @@ def test_adv_active(test_case: AdvTestCase) -> None:
"sequence_number": 138,
"switchMode": True,
"power": 0.0,
"mode": 0,
"position": 0,
"calibration": False,
},
2: {
"isOn": True,
"sequence_number": 138,
"switchMode": True,
"power": 70.0,
"mode": 0,
"position": 0,
"calibration": False,
},
"sequence_number": 138,
},
Expand Down
4 changes: 2 additions & 2 deletions tests/test_base_cover.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

def create_device_for_command_testing(position=50, calibration=True):
ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
base_cover_device = base_cover.SwitchbotBaseCover(False, ble_device)
base_cover_device = base_cover.SwitchbotBaseCover(ble_device, reverse=False)
base_cover_device.update_from_advertisement(
make_advertisement_data(ble_device, True, position, calibration)
)
Expand Down Expand Up @@ -50,7 +50,7 @@ def make_advertisement_data(
@pytest.mark.asyncio
async def test_send_multiple_commands():
ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
base_cover_device = base_cover.SwitchbotBaseCover(False, ble_device)
base_cover_device = base_cover.SwitchbotBaseCover(ble_device, reverse=False)
base_cover_device.update_from_advertisement(
make_advertisement_data(ble_device, True, 50, True)
)
Expand Down
Loading
Loading