Skip to content
167 changes: 164 additions & 3 deletions switchbot/devices/meter_pro.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,39 @@
from ..helpers import parse_uint24_be
from .device import SwitchbotDevice, SwitchbotOperationError

COMMAND_SET_TIME_OFFSET = "570f680506"
SETTINGS_HEADER = "570f68"
COMMAND_SHOW_BATTERY_LEVEL = f"{SETTINGS_HEADER}070108"
COMMAND_DATE_FORMAT = f"{SETTINGS_HEADER}070107"

COMMAND_TEMPERATURE_UPDATE_INTERVAL = f"{SETTINGS_HEADER}070105"
COMMAND_CO2_UPDATE_INTERVAL = f"{SETTINGS_HEADER}0b06"
COMMAND_FORCE_NEW_CO2_MEASUREMENT = f"{SETTINGS_HEADER}0b04"
COMMAND_CO2_THRESHOLDS = f"{SETTINGS_HEADER}020302"
COMMAND_COMFORTLEVEL = f"{SETTINGS_HEADER}020188"

COMMAND_BUTTON_FUNCTION = f"{SETTINGS_HEADER}070106"
COMMAND_CALIBRATE_CO2_SENSOR = f"{SETTINGS_HEADER}0b02"

COMMAND_ALERT_SOUND = f"{SETTINGS_HEADER}0204"
COMMAND_ALERT_TEMPERATURE_HUMIDITY = "570f44"
COMMAND_ALERT_CO2 = f"{SETTINGS_HEADER}020301"

COMMAND_SET_TIME_OFFSET = f"{SETTINGS_HEADER}0506"
COMMAND_GET_TIME_OFFSET = "570f690506"
MAX_TIME_OFFSET = (1 << 24) - 1

COMMAND_GET_DEVICE_DATETIME = "570f6901"
COMMAND_SET_DEVICE_DATETIME = "57000503"
COMMAND_SET_DISPLAY_FORMAT = "570f680505"
COMMAND_SET_DISPLAY_FORMAT = f"{SETTINGS_HEADER}0505"


class SwitchbotMeterProCO2(SwitchbotDevice):
"""API to control Switchbot Meter Pro CO2."""
"""
API to control Switchbot Meter Pro CO2.

The assumptions that the original app has for each value are noted at the respective method.
Which of them are actually required by the device is unknown.
"""

async def get_time_offset(self) -> int:
"""
Expand Down Expand Up @@ -157,6 +179,127 @@ async def set_time_display_format(self, is_12h_mode: bool = False) -> None:
result = await self._send_command(payload)
self._validate_result("set_time_display_format", result)

async def show_battery_level(self, show_battery: bool):
"""Show or hide battery level on the display."""
show_battery_byte = "01" if show_battery else "00"
await self._send_command(COMMAND_SHOW_BATTERY_LEVEL + show_battery_byte)

async def set_co2_thresholds(self, lower: int, upper: int):
"""
Sets the thresholds to define Air Quality for depiction on display as follows:
co2 < lower => Good (Green)
lower < co2 < upper => Moderate (Orange)
upper < co2 => Poor (Red)

Original App assumes:
500 <= lower < upper <= 1900
lower and upper are multiples of 100
"""
if lower >= upper:
raise ValueError("Lower should be smaller than upper")
await self._send_command(
COMMAND_CO2_THRESHOLDS + f"{lower:04x}" + f"{upper:04x}"
)

async def set_comfortlevel(self, cold: float, hot: float, dry: int, wet: int):
"""
Sets the Thresholds for comfortable temperature (in C) and humidity to display comfort-level.
The supported values in the original App are as following:
Temperature is -20C to 80C in 0.5C steps
Humidity is 1% to 99% in 1% steps
"""
if cold >= hot:
raise ValueError("Cold should be smaller than Hot")
if dry >= wet:
raise ValueError("Dry should be smaller than Wet")

point_five = self._get_point_five_byte(cold, hot)
cold_byte = self._encode_temperature(int(cold))
hot_byte = self._encode_temperature(int(hot))

await self._send_command(
COMMAND_COMFORTLEVEL
+ hot_byte
+ f"{wet:02x}"
+ point_five
+ cold_byte
+ f"{dry:02x}"
)

async def set_alert_co2(self, on: bool, co2_low: int, co2_high: int, reverse: bool):
"""
Sets the CO2-Alert.
on: Turn CO2-Alert on or off
lower and upper: The provided range (between 400ppm and 2000ppm in 100ppm steps)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

between 400ppm and 2000ppm in 100ppm steps: worth validating this values in the code?

reverse: If False: Alert if measured value is outside of provided range.
If True: Alert if measured value is inside of provided range.
"""
if co2_high < co2_low:
raise ValueError(
"Upper value should bigger than the lower value. Do you want to use reverse instead?"
)

mode = 0x00 if not on else (0x04 if reverse else 0x03)
await self._send_command(
COMMAND_ALERT_CO2 + f"{mode:02x}" + f"{co2_high:04x}" + f"{co2_low:04x}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to send these limits if on is False? I haven't tried this with the device, but I'd assume that it expects all zeroes. If so, then we can probably make set default values for co2_high and co2_low args to 0 and drop their validation when on is False.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this would be a reasonable design, but is not what the original app does.
The test I pushed for this method contains only payloads from the original App.
When a mode gets turned off, all values remain set as they were before. And all of them get transmitted.
I tried to stick as close to this as possible. I did not test how the device would react to your proposal.

)

async def set_temperature_update_interval(self, minutes: int):
"""
Sets the interval in which temperature and humidity are measured in battery powered mode.
Original App assumes minutes in {5, 10, 30}
"""
seconds = minutes * 60
Copy link
Contributor

@elgris elgris Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Up to @zerzhang and @bdraco, but I'd just pass seconds as an arg and let the caller do the minutes->seconds conversion. The same for other _update_interval methods.

await self._send_command(COMMAND_TEMPERATURE_UPDATE_INTERVAL + f"{seconds:04x}")

async def set_co2_update_interval(self, minutes: int):
"""
Sets the interval in which co2 levels are measured in battery powered mode.
Original App assumes minutes in {5, 10, 30}
"""
seconds = minutes * 60
await self._send_command(COMMAND_CO2_UPDATE_INTERVAL + f"{seconds:04x}")

async def set_button_function(self, change_unit: bool, change_data_source: bool):
"""
Sets the function of the top button:
Default (both options false): Only update data
changeUnit: switch between ℃ and ℉
changeDataSource: switch between display of indoor and outdoor temperature
"""
change_unit_byte = (
"00" if change_unit else "01"
) # yes, it has to be reversed like this!
change_data_source_byte = (
"01" if change_data_source else "00"
) # yes, it has to be reversed like this!
await self._send_command(
COMMAND_BUTTON_FUNCTION + change_unit_byte + change_data_source_byte
)

async def force_new_co2_measurement(self):
"""Requests a new CO2 measurement, regardless of update interval"""
await self._send_command(COMMAND_FORCE_NEW_CO2_MEASUREMENT)

async def calibrate_co2_sensor(self):
"""
Calibrate CO2-Sensor.
Place your device in a well-ventilated area for 1 minute before calling this.
After calling this the calibration runs for about 5 minutes.
Keep the device still during this process.
"""
await self._send_command(COMMAND_CALIBRATE_CO2_SENSOR)

async def set_alert_sound(self, sound_on: bool, volume: int):
"""
Sets the Alert-Mode.
If soundOn is False the display flashes.
If soundOn is True the device additionally beeps.
The volume is expected to be in {2,3,4} (2: low, 3: medium, 4: high)
"""
sound_on_byte = "02" if sound_on else "01"
await self._send_command(COMMAND_ALERT_SOUND + f"{volume:02x}" + sound_on_byte)

def _validate_result(
self, op_name: str, result: bytes | None, min_length: int | None = None
) -> bytes:
Expand All @@ -170,3 +313,21 @@ def _validate_result(
f"{self.name}: Unexpected response len for {op_name}, wanted at least {min_length} (result={result.hex() if result else 'None'} rssi={self.rssi})"
)
return result

def _get_point_five_byte(self, cold: float, hot: float):
"""Represents if either of the temperatures has a .5 decimalplace"""
point_five = 0x00
if int(cold * 10) % 10 == 5:
point_five += 0x05
if int(hot * 10) % 10 == 5:
point_five += 0x50
return f"{point_five:02x}"

def _encode_temperature(self, temp: int):
# The encoding for a negative temperature is the value as hex
# The encoding for a positive temperature is the value + 128 as hex
if temp > 0:
temp += 128
else:
temp *= -1
return f"{temp:02x}"
201 changes: 201 additions & 0 deletions tests/test_meter_pro.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,3 +247,204 @@ async def test_set_time_display_format_failure():

with pytest.raises(SwitchbotOperationError):
await device.set_time_display_format(is_12h_mode=True)


@pytest.mark.asyncio
@pytest.mark.parametrize(
("show_battery", "expected_payload"),
[
(True, "01"),
(False, "00"),
],
)
async def test_show_battery_level(show_battery: bool, expected_payload: str):
device = create_device()
device._send_command.return_value = bytes.fromhex("01")

await device.show_battery_level(show_battery=show_battery)
device._send_command.assert_called_with("570f68070108" + expected_payload)


@pytest.mark.asyncio
async def test_set_co2_thresholds():
device = create_device()
device._send_command.return_value = bytes.fromhex("01")

await device.set_co2_thresholds(lower=500, upper=1000)
device._send_command.assert_called_with("570f6802030201f403e8")


@pytest.mark.asyncio
async def test_set_co2_thresholds_throws_on_invalid_input():
device = create_device()
device._send_command.return_value = bytes.fromhex("01")

# Error if lower >= upper
with pytest.raises(ValueError, match="Lower should be smaller than upper"):
await device.set_co2_thresholds(lower=500, upper=400)


@pytest.mark.asyncio
@pytest.mark.parametrize(
("cold", "hot", "dry", "wet", "expected_payload"),
[
(10.0, 20.0, 40, 80, "9450008a28"),
(-20.0, -10.0, 40, 80, "0a50001428"),
(-20.0, 70, 40, 80, "c650001428"),
(0.5, 22, 40, 82, "9652050028"),
(0, 22, 40, 82, "9652000028"),
(14, 37.5, 30, 70, "a546508e1e"),
],
)
async def test_set_comfortlevel(
cold: float, hot: float, dry: int, wet: int, expected_payload: str
):
device = create_device()
device._send_command.return_value = bytes.fromhex("01")

await device.set_comfortlevel(cold, hot, dry, wet)
device._send_command.assert_called_with("570f68020188" + expected_payload)


@pytest.mark.asyncio
async def test_set_comfortlevel_throws_on_invalid_input():
device = create_device()
device._send_command.return_value = bytes.fromhex("01")

# Error if cold >= hot
with pytest.raises(ValueError, match="Cold should be smaller than Hot"):
await device.set_comfortlevel(16, 15, 10, 20)

# Error if dry >= wet
with pytest.raises(ValueError, match="Dry should be smaller than Wet"):
await device.set_comfortlevel(15, 16, 20, 10)


@pytest.mark.asyncio
@pytest.mark.parametrize(
("on", "co2_low", "co2_high", "reverse", "expected_payload"),
[
(False, 1000, 2000, False, "0007d003e8"),
(True, 1000, 2000, False, "0307d003e8"),
(True, 700, 2000, False, "0307d002bc"),
(True, 700, 1500, False, "0305dc02bc"),
(True, 700, 1500, True, "0405dc02bc"),
],
)
async def test_set_alert_co2(
on: bool, co2_low: int, co2_high: int, reverse: bool, expected_payload: str
):
# Values based on actual measurements from the app

device = create_device()
device._send_command.return_value = bytes.fromhex("01")

await device.set_alert_co2(on, co2_low, co2_high, reverse)
device._send_command.assert_called_with("570f68020301" + expected_payload)


@pytest.mark.asyncio
async def test_set_alert_co2_throws_on_invalid_input():
device = create_device()
device._send_command.return_value = bytes.fromhex("01")

# Error if lower >= upper
with pytest.raises(
ValueError,
match=r"Upper value should bigger than the lower value. Do you want to use reverse instead\?",
):
await device.set_alert_co2(True, 500, 400, True)


@pytest.mark.asyncio
@pytest.mark.parametrize(
("minutes", "expected_payload"),
[
(5, "012c"),
(10, "0258"),
(30, "0708"),
],
)
async def test_set_temperature_update_interval(minutes: int, expected_payload: str):
device = create_device()
device._send_command.return_value = bytes.fromhex("01")

await device.set_temperature_update_interval(minutes)
device._send_command.assert_called_with("570f68070105" + expected_payload)


@pytest.mark.asyncio
@pytest.mark.parametrize(
("minutes", "expected_payload"),
[
(5, "012c"),
(10, "0258"),
(30, "0708"),
],
)
async def test_set_co2_update_interval(minutes: int, expected_payload: str):
device = create_device()
device._send_command.return_value = bytes.fromhex("01")

await device.set_co2_update_interval(minutes)
device._send_command.assert_called_with("570f680b06" + expected_payload)


@pytest.mark.asyncio
@pytest.mark.parametrize(
("change_unit", "change_data_source", "expected_payload"),
[
(True, True, "0001"),
(True, False, "0000"),
(False, True, "0101"),
(False, False, "0100"),
],
)
async def test_set_button_function(
change_unit: bool, change_data_source: bool, expected_payload: str
):
# Values based on actual measurements from the app

device = create_device()
device._send_command.return_value = bytes.fromhex("01")

await device.set_button_function(change_unit, change_data_source)
device._send_command.assert_called_with("570f68070106" + expected_payload)


@pytest.mark.asyncio
async def test_force_new_co2_measurement():
device = create_device()
device._send_command.return_value = bytes.fromhex("01")

await device.force_new_co2_measurement()
device._send_command.assert_called_with("570f680b04")


@pytest.mark.asyncio
async def test_calibrate_co2_sensor():
device = create_device()
device._send_command.return_value = bytes.fromhex("01")

await device.calibrate_co2_sensor()
device._send_command.assert_called_with("570f680b02")


@pytest.mark.asyncio
@pytest.mark.parametrize(
("sound_on", "volume", "expected_payload"),
[
(False, 4, "0401"),
(True, 2, "0202"),
(True, 3, "0302"),
(True, 4, "0402"),
],
)
async def test_set_alert_sound(sound_on: bool, volume: int, expected_payload: str):
# Values based on actual measurements from the app

device = create_device()
device._send_command.return_value = bytes.fromhex("01")

await device.set_alert_sound(sound_on, volume)
device._send_command.assert_called_with("570f680204" + expected_payload)
Loading