diff --git a/switchbot/devices/meter_pro.py b/switchbot/devices/meter_pro.py index 4f4fbbda..6b34ed71 100644 --- a/switchbot/devices/meter_pro.py +++ b/switchbot/devices/meter_pro.py @@ -3,17 +3,39 @@ from ..helpers import parse_uint24_be from .device import SwitchbotDevice, SwitchbotOperationError -COMMAND_SET_TIME_OFFSET = "570f680506" +SETTINGS_HEADER = "570f68" +COMMAND_SHOW_BATTERY_LEVEL = f"{SETTINGS_HEADER}070108" +COMMAND_DATE_FORMAT = f"{SETTINGS_HEADER}070107" + +COMMAND_TEMPERATURE_UPDATE_INTERVAL = f"{SETTINGS_HEADER}070105" +COMMAND_CO2_UPDATE_INTERVAL = f"{SETTINGS_HEADER}0b06" +COMMAND_FORCE_NEW_CO2_MEASUREMENT = f"{SETTINGS_HEADER}0b04" +COMMAND_CO2_THRESHOLDS = f"{SETTINGS_HEADER}020302" +COMMAND_COMFORTLEVEL = f"{SETTINGS_HEADER}020188" + +COMMAND_BUTTON_FUNCTION = f"{SETTINGS_HEADER}070106" +COMMAND_CALIBRATE_CO2_SENSOR = f"{SETTINGS_HEADER}0b02" + +COMMAND_ALERT_SOUND = f"{SETTINGS_HEADER}0204" +COMMAND_ALERT_TEMPERATURE_HUMIDITY = "570f44" +COMMAND_ALERT_CO2 = f"{SETTINGS_HEADER}020301" + +COMMAND_SET_TIME_OFFSET = f"{SETTINGS_HEADER}0506" COMMAND_GET_TIME_OFFSET = "570f690506" MAX_TIME_OFFSET = (1 << 24) - 1 COMMAND_GET_DEVICE_DATETIME = "570f6901" COMMAND_SET_DEVICE_DATETIME = "57000503" -COMMAND_SET_DISPLAY_FORMAT = "570f680505" +COMMAND_SET_DISPLAY_FORMAT = f"{SETTINGS_HEADER}0505" class SwitchbotMeterProCO2(SwitchbotDevice): - """API to control Switchbot Meter Pro CO2.""" + """ + API to control Switchbot Meter Pro CO2. + + The assumptions that the original app has for each value are noted at the respective method. + Which of them are actually required by the device is unknown. + """ async def get_time_offset(self) -> int: """ @@ -157,6 +179,127 @@ async def set_time_display_format(self, is_12h_mode: bool = False) -> None: result = await self._send_command(payload) self._validate_result("set_time_display_format", result) + async def show_battery_level(self, show_battery: bool): + """Show or hide battery level on the display.""" + show_battery_byte = "01" if show_battery else "00" + await self._send_command(COMMAND_SHOW_BATTERY_LEVEL + show_battery_byte) + + async def set_co2_thresholds(self, lower: int, upper: int): + """ + Sets the thresholds to define Air Quality for depiction on display as follows: + co2 < lower => Good (Green) + lower < co2 < upper => Moderate (Orange) + upper < co2 => Poor (Red) + + Original App assumes: + 500 <= lower < upper <= 1900 + lower and upper are multiples of 100 + """ + if lower >= upper: + raise ValueError("Lower should be smaller than upper") + await self._send_command( + COMMAND_CO2_THRESHOLDS + f"{lower:04x}" + f"{upper:04x}" + ) + + async def set_comfortlevel(self, cold: float, hot: float, dry: int, wet: int): + """ + Sets the Thresholds for comfortable temperature (in C) and humidity to display comfort-level. + The supported values in the original App are as following: + Temperature is -20C to 80C in 0.5C steps + Humidity is 1% to 99% in 1% steps + """ + if cold >= hot: + raise ValueError("Cold should be smaller than Hot") + if dry >= wet: + raise ValueError("Dry should be smaller than Wet") + + point_five = self._get_point_five_byte(cold, hot) + cold_byte = self._encode_temperature(int(cold)) + hot_byte = self._encode_temperature(int(hot)) + + await self._send_command( + COMMAND_COMFORTLEVEL + + hot_byte + + f"{wet:02x}" + + point_five + + cold_byte + + f"{dry:02x}" + ) + + async def set_alert_co2(self, on: bool, co2_low: int, co2_high: int, reverse: bool): + """ + Sets the CO2-Alert. + on: Turn CO2-Alert on or off + lower and upper: The provided range (between 400ppm and 2000ppm in 100ppm steps) + reverse: If False: Alert if measured value is outside of provided range. + If True: Alert if measured value is inside of provided range. + """ + if co2_high < co2_low: + raise ValueError( + "Upper value should bigger than the lower value. Do you want to use reverse instead?" + ) + + mode = 0x00 if not on else (0x04 if reverse else 0x03) + await self._send_command( + COMMAND_ALERT_CO2 + f"{mode:02x}" + f"{co2_high:04x}" + f"{co2_low:04x}" + ) + + async def set_temperature_update_interval(self, minutes: int): + """ + Sets the interval in which temperature and humidity are measured in battery powered mode. + Original App assumes minutes in {5, 10, 30} + """ + seconds = minutes * 60 + await self._send_command(COMMAND_TEMPERATURE_UPDATE_INTERVAL + f"{seconds:04x}") + + async def set_co2_update_interval(self, minutes: int): + """ + Sets the interval in which co2 levels are measured in battery powered mode. + Original App assumes minutes in {5, 10, 30} + """ + seconds = minutes * 60 + await self._send_command(COMMAND_CO2_UPDATE_INTERVAL + f"{seconds:04x}") + + async def set_button_function(self, change_unit: bool, change_data_source: bool): + """ + Sets the function of the top button: + Default (both options false): Only update data + changeUnit: switch between ℃ and ℉ + changeDataSource: switch between display of indoor and outdoor temperature + """ + change_unit_byte = ( + "00" if change_unit else "01" + ) # yes, it has to be reversed like this! + change_data_source_byte = ( + "01" if change_data_source else "00" + ) # yes, it has to be reversed like this! + await self._send_command( + COMMAND_BUTTON_FUNCTION + change_unit_byte + change_data_source_byte + ) + + async def force_new_co2_measurement(self): + """Requests a new CO2 measurement, regardless of update interval""" + await self._send_command(COMMAND_FORCE_NEW_CO2_MEASUREMENT) + + async def calibrate_co2_sensor(self): + """ + Calibrate CO2-Sensor. + Place your device in a well-ventilated area for 1 minute before calling this. + After calling this the calibration runs for about 5 minutes. + Keep the device still during this process. + """ + await self._send_command(COMMAND_CALIBRATE_CO2_SENSOR) + + async def set_alert_sound(self, sound_on: bool, volume: int): + """ + Sets the Alert-Mode. + If soundOn is False the display flashes. + If soundOn is True the device additionally beeps. + The volume is expected to be in {2,3,4} (2: low, 3: medium, 4: high) + """ + sound_on_byte = "02" if sound_on else "01" + await self._send_command(COMMAND_ALERT_SOUND + f"{volume:02x}" + sound_on_byte) + def _validate_result( self, op_name: str, result: bytes | None, min_length: int | None = None ) -> bytes: @@ -170,3 +313,21 @@ def _validate_result( f"{self.name}: Unexpected response len for {op_name}, wanted at least {min_length} (result={result.hex() if result else 'None'} rssi={self.rssi})" ) return result + + def _get_point_five_byte(self, cold: float, hot: float): + """Represents if either of the temperatures has a .5 decimalplace""" + point_five = 0x00 + if int(cold * 10) % 10 == 5: + point_five += 0x05 + if int(hot * 10) % 10 == 5: + point_five += 0x50 + return f"{point_five:02x}" + + def _encode_temperature(self, temp: int): + # The encoding for a negative temperature is the value as hex + # The encoding for a positive temperature is the value + 128 as hex + if temp > 0: + temp += 128 + else: + temp *= -1 + return f"{temp:02x}" diff --git a/tests/test_meter_pro.py b/tests/test_meter_pro.py index 449a8636..2f537814 100644 --- a/tests/test_meter_pro.py +++ b/tests/test_meter_pro.py @@ -247,3 +247,204 @@ async def test_set_time_display_format_failure(): with pytest.raises(SwitchbotOperationError): await device.set_time_display_format(is_12h_mode=True) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + ("show_battery", "expected_payload"), + [ + (True, "01"), + (False, "00"), + ], +) +async def test_show_battery_level(show_battery: bool, expected_payload: str): + device = create_device() + device._send_command.return_value = bytes.fromhex("01") + + await device.show_battery_level(show_battery=show_battery) + device._send_command.assert_called_with("570f68070108" + expected_payload) + + +@pytest.mark.asyncio +async def test_set_co2_thresholds(): + device = create_device() + device._send_command.return_value = bytes.fromhex("01") + + await device.set_co2_thresholds(lower=500, upper=1000) + device._send_command.assert_called_with("570f6802030201f403e8") + + +@pytest.mark.asyncio +async def test_set_co2_thresholds_throws_on_invalid_input(): + device = create_device() + device._send_command.return_value = bytes.fromhex("01") + + # Error if lower >= upper + with pytest.raises(ValueError, match="Lower should be smaller than upper"): + await device.set_co2_thresholds(lower=500, upper=400) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + ("cold", "hot", "dry", "wet", "expected_payload"), + [ + (10.0, 20.0, 40, 80, "9450008a28"), + (-20.0, -10.0, 40, 80, "0a50001428"), + (-20.0, 70, 40, 80, "c650001428"), + (0.5, 22, 40, 82, "9652050028"), + (0, 22, 40, 82, "9652000028"), + (14, 37.5, 30, 70, "a546508e1e"), + ], +) +async def test_set_comfortlevel( + cold: float, hot: float, dry: int, wet: int, expected_payload: str +): + device = create_device() + device._send_command.return_value = bytes.fromhex("01") + + await device.set_comfortlevel(cold, hot, dry, wet) + device._send_command.assert_called_with("570f68020188" + expected_payload) + + +@pytest.mark.asyncio +async def test_set_comfortlevel_throws_on_invalid_input(): + device = create_device() + device._send_command.return_value = bytes.fromhex("01") + + # Error if cold >= hot + with pytest.raises(ValueError, match="Cold should be smaller than Hot"): + await device.set_comfortlevel(16, 15, 10, 20) + + # Error if dry >= wet + with pytest.raises(ValueError, match="Dry should be smaller than Wet"): + await device.set_comfortlevel(15, 16, 20, 10) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + ("on", "co2_low", "co2_high", "reverse", "expected_payload"), + [ + (False, 1000, 2000, False, "0007d003e8"), + (True, 1000, 2000, False, "0307d003e8"), + (True, 700, 2000, False, "0307d002bc"), + (True, 700, 1500, False, "0305dc02bc"), + (True, 700, 1500, True, "0405dc02bc"), + ], +) +async def test_set_alert_co2( + on: bool, co2_low: int, co2_high: int, reverse: bool, expected_payload: str +): + # Values based on actual measurements from the app + + device = create_device() + device._send_command.return_value = bytes.fromhex("01") + + await device.set_alert_co2(on, co2_low, co2_high, reverse) + device._send_command.assert_called_with("570f68020301" + expected_payload) + + +@pytest.mark.asyncio +async def test_set_alert_co2_throws_on_invalid_input(): + device = create_device() + device._send_command.return_value = bytes.fromhex("01") + + # Error if lower >= upper + with pytest.raises( + ValueError, + match=r"Upper value should bigger than the lower value. Do you want to use reverse instead\?", + ): + await device.set_alert_co2(True, 500, 400, True) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + ("minutes", "expected_payload"), + [ + (5, "012c"), + (10, "0258"), + (30, "0708"), + ], +) +async def test_set_temperature_update_interval(minutes: int, expected_payload: str): + device = create_device() + device._send_command.return_value = bytes.fromhex("01") + + await device.set_temperature_update_interval(minutes) + device._send_command.assert_called_with("570f68070105" + expected_payload) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + ("minutes", "expected_payload"), + [ + (5, "012c"), + (10, "0258"), + (30, "0708"), + ], +) +async def test_set_co2_update_interval(minutes: int, expected_payload: str): + device = create_device() + device._send_command.return_value = bytes.fromhex("01") + + await device.set_co2_update_interval(minutes) + device._send_command.assert_called_with("570f680b06" + expected_payload) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + ("change_unit", "change_data_source", "expected_payload"), + [ + (True, True, "0001"), + (True, False, "0000"), + (False, True, "0101"), + (False, False, "0100"), + ], +) +async def test_set_button_function( + change_unit: bool, change_data_source: bool, expected_payload: str +): + # Values based on actual measurements from the app + + device = create_device() + device._send_command.return_value = bytes.fromhex("01") + + await device.set_button_function(change_unit, change_data_source) + device._send_command.assert_called_with("570f68070106" + expected_payload) + + +@pytest.mark.asyncio +async def test_force_new_co2_measurement(): + device = create_device() + device._send_command.return_value = bytes.fromhex("01") + + await device.force_new_co2_measurement() + device._send_command.assert_called_with("570f680b04") + + +@pytest.mark.asyncio +async def test_calibrate_co2_sensor(): + device = create_device() + device._send_command.return_value = bytes.fromhex("01") + + await device.calibrate_co2_sensor() + device._send_command.assert_called_with("570f680b02") + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + ("sound_on", "volume", "expected_payload"), + [ + (False, 4, "0401"), + (True, 2, "0202"), + (True, 3, "0302"), + (True, 4, "0402"), + ], +) +async def test_set_alert_sound(sound_on: bool, volume: int, expected_payload: str): + # Values based on actual measurements from the app + + device = create_device() + device._send_command.return_value = bytes.fromhex("01") + + await device.set_alert_sound(sound_on, volume) + device._send_command.assert_called_with("570f680204" + expected_payload)