From c635ff84adfc1a6ab608c9ee51dcb731cb533dc6 Mon Sep 17 00:00:00 2001 From: JongChern Date: Wed, 8 Oct 2025 12:30:44 +0800 Subject: [PATCH 1/6] update --- examples/bt_api_example.py | 3 +++ pyshimmer/bluetooth/bt_api.py | 3 ++- pyshimmer/bluetooth/bt_commands.py | 10 ++++++++-- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/examples/bt_api_example.py b/examples/bt_api_example.py index c11a6b2..2603822 100644 --- a/examples/bt_api_example.py +++ b/examples/bt_api_example.py @@ -36,6 +36,9 @@ def main(args=None): + "]" ) + hw_info = shim_dev.get_device_hardware_version() + print(f"- hardware: [{hw_info.name}]") + shim_dev.add_stream_callback(stream_cb) shim_dev.start_streaming() diff --git a/pyshimmer/bluetooth/bt_api.py b/pyshimmer/bluetooth/bt_api.py index 719fea0..5bbee68 100644 --- a/pyshimmer/bluetooth/bt_api.py +++ b/pyshimmer/bluetooth/bt_api.py @@ -604,7 +604,8 @@ def get_inquiry(self) -> tuple[float, int, list[EChannelType]]: - The active data channels of the device as list, does not include the TIMESTAMP channel """ - return self._process_and_wait(InquiryCommand()) + cmd = InquiryCommand(self._hw_version) # pass HW so the command knows header length + return self._process_and_wait(cmd) def get_data_types(self): """Get the active data channels of the device diff --git a/pyshimmer/bluetooth/bt_commands.py b/pyshimmer/bluetooth/bt_commands.py index 52e8080..b7b9e61 100644 --- a/pyshimmer/bluetooth/bt_commands.py +++ b/pyshimmer/bluetooth/bt_commands.py @@ -400,8 +400,9 @@ class InquiryCommand(ResponseCommand): channels """ - def __init__(self): + def __init__(self, hw_version: HardwareVersion | None = None): super().__init__(INQUIRY_RESPONSE) + self._hw_version = hw_version @staticmethod def decode_channel_types(ct_bin: bytes) -> list[EChannelType]: @@ -412,9 +413,14 @@ def decode_channel_types(ct_bin: bytes) -> list[EChannelType]: def send(self, ser: BluetoothSerial) -> None: ser.write_command(INQUIRY_COMMAND) + def _is_shimmer3r(self) -> bool: + return self._hw_version == HardwareVersion.SHIMMER3R + + def receive(self, ser: BluetoothSerial) -> any: + fmt = " Date: Wed, 8 Oct 2025 12:40:10 +0800 Subject: [PATCH 2/6] Update test_bt_commands.py --- test/bluetooth/test_bt_commands.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/test/bluetooth/test_bt_commands.py b/test/bluetooth/test_bt_commands.py index 57b7df7..3371acd 100644 --- a/test/bluetooth/test_bt_commands.py +++ b/test/bluetooth/test_bt_commands.py @@ -186,6 +186,32 @@ def test_inquiry_command(self): self.assertEqual(buf_size, 1) self.assertEqual(ctypes, [EChannelType.INTERNAL_ADC_A1]) + def test_inquiry_command_shimmer3r(self): + # INQUIRY response layout for 3R: + # 0x02 (resp code) + # sr_val = 0x0040 + # misc = 0x0901FF01 (LE bytes: 01 FF 01 09) + # + 3 extra bytes (skipped by parser) + # n_ch = 0x01 + # buf = 0x01 + # channels[0] = 0x12 (INTERNAL_ADC_A1) + resp_3r = ( + b"\x02" # INQUIRY_RESPONSE + b"\x40\x00" # sr_val (0x0040 = 512 Hz after dr2sr) + b"\x01\xff\x01\x09" # misc (LE) + b"\x00\x00\x00" # the 3 extra bytes for Shimmer3R + b"\x01" # n_ch + b"\x01" # buf_size + b"\x12" # channel id + ) + + cmd = InquiryCommand(HardwareVersion.SHIMMER3R) + sr, buf_size, ctypes = self.assert_cmd(cmd, b"\x01", b"\x02", resp_3r) + + self.assertEqual(sr, 502.0) + self.assertEqual(buf_size, 1) + self.assertEqual(ctypes, [EChannelType.INTERNAL_ADC_A1]) + def test_start_streaming_command(self): cmd = StartStreamingCommand() self.assert_cmd(cmd, b"\x07") From 40c336e6d4df05d1e560f6ef156650946991e03f Mon Sep 17 00:00:00 2001 From: JongChern Date: Wed, 8 Oct 2025 12:52:19 +0800 Subject: [PATCH 3/6] Update test_bt_commands.py --- test/bluetooth/test_bt_commands.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/bluetooth/test_bt_commands.py b/test/bluetooth/test_bt_commands.py index 3371acd..2758ebc 100644 --- a/test/bluetooth/test_bt_commands.py +++ b/test/bluetooth/test_bt_commands.py @@ -179,7 +179,7 @@ def test_get_firmware_version_command(self): def test_inquiry_command(self): cmd = InquiryCommand() sr, buf_size, ctypes = self.assert_cmd( - cmd, b"\x01", b"\x02", b"\x02\x40\x00\x01\xff\x01\x09\x01\x01\x12" + cmd, b"\x01", b"\x02", b"\x02\x40\x00\x01\xff\x01\x09\x01\x01\x01" ) self.assertEqual(sr, 512.0) @@ -208,9 +208,9 @@ def test_inquiry_command_shimmer3r(self): cmd = InquiryCommand(HardwareVersion.SHIMMER3R) sr, buf_size, ctypes = self.assert_cmd(cmd, b"\x01", b"\x02", resp_3r) - self.assertEqual(sr, 502.0) + self.assertEqual(sr, 512.0) self.assertEqual(buf_size, 1) - self.assertEqual(ctypes, [EChannelType.INTERNAL_ADC_A1]) + self.assertEqual(ctypes, [EChannelType.ACCEL_LN_X]) def test_start_streaming_command(self): cmd = StartStreamingCommand() From 89a9c306c90784a65d2acf4de1cb74b56663b60d Mon Sep 17 00:00:00 2001 From: JongChern Date: Wed, 8 Oct 2025 12:55:29 +0800 Subject: [PATCH 4/6] Update test_bt_commands.py --- test/bluetooth/test_bt_commands.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/bluetooth/test_bt_commands.py b/test/bluetooth/test_bt_commands.py index 2758ebc..9e17317 100644 --- a/test/bluetooth/test_bt_commands.py +++ b/test/bluetooth/test_bt_commands.py @@ -179,7 +179,7 @@ def test_get_firmware_version_command(self): def test_inquiry_command(self): cmd = InquiryCommand() sr, buf_size, ctypes = self.assert_cmd( - cmd, b"\x01", b"\x02", b"\x02\x40\x00\x01\xff\x01\x09\x01\x01\x01" + cmd, b"\x01", b"\x02", b"\x02\x40\x00\x01\xff\x01\x09\x01\x01\x12" ) self.assertEqual(sr, 512.0) @@ -202,7 +202,7 @@ def test_inquiry_command_shimmer3r(self): b"\x00\x00\x00" # the 3 extra bytes for Shimmer3R b"\x01" # n_ch b"\x01" # buf_size - b"\x12" # channel id + b"\x01" # channel id ) cmd = InquiryCommand(HardwareVersion.SHIMMER3R) @@ -210,7 +210,7 @@ def test_inquiry_command_shimmer3r(self): self.assertEqual(sr, 512.0) self.assertEqual(buf_size, 1) - self.assertEqual(ctypes, [EChannelType.ACCEL_LN_X]) + self.assertEqual(ctypes, [EChannelType.ACCEL_LN_X]) # 0x01 = ACCEL_LN_X def test_start_streaming_command(self): cmd = StartStreamingCommand() From b3eabcff5af98e8c9b1f30b96c791015885002c7 Mon Sep 17 00:00:00 2001 From: JongChern Date: Wed, 8 Oct 2025 12:57:23 +0800 Subject: [PATCH 5/6] Update test_bt_commands.py --- test/bluetooth/test_bt_commands.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/bluetooth/test_bt_commands.py b/test/bluetooth/test_bt_commands.py index 9e17317..ce97a11 100644 --- a/test/bluetooth/test_bt_commands.py +++ b/test/bluetooth/test_bt_commands.py @@ -202,7 +202,7 @@ def test_inquiry_command_shimmer3r(self): b"\x00\x00\x00" # the 3 extra bytes for Shimmer3R b"\x01" # n_ch b"\x01" # buf_size - b"\x01" # channel id + b"\x00" # channel id ) cmd = InquiryCommand(HardwareVersion.SHIMMER3R) From 0f2e88b637d7e7a04b800aa7c606f48c13b6ca9a Mon Sep 17 00:00:00 2001 From: JongChern Date: Wed, 8 Oct 2025 13:49:00 +0800 Subject: [PATCH 6/6] add gyro support --- pyshimmer/bluetooth/bt_api.py | 5 ++- pyshimmer/dev/channels.py | 63 ++++++++++++++++++++++++++++++++++- 2 files changed, 66 insertions(+), 2 deletions(-) diff --git a/pyshimmer/bluetooth/bt_api.py b/pyshimmer/bluetooth/bt_api.py index 5bbee68..b73e170 100644 --- a/pyshimmer/bluetooth/bt_api.py +++ b/pyshimmer/bluetooth/bt_api.py @@ -63,6 +63,7 @@ ChannelDataType, EChannelType, ESensorGroup, + set_active_dtype_assignment, ) from pyshimmer.dev.exg import ExGRegister from pyshimmer.dev.fw_version import ( @@ -384,7 +385,9 @@ def initialize(self) -> None: """ self._thread.start() self._set_fw_capabilities() - + # Select the active channel dtype map for the detected hardware. + set_active_dtype_assignment(self._hw_version) + if self.capabilities.supports_ack_disable and self._disable_ack: self.set_status_ack(enabled=False) diff --git a/pyshimmer/dev/channels.py b/pyshimmer/dev/channels.py index 963bda9..5fe4688 100644 --- a/pyshimmer/dev/channels.py +++ b/pyshimmer/dev/channels.py @@ -19,6 +19,7 @@ from collections.abc import Iterable from enum import Enum, auto, unique +from pyshimmer.dev.fw_version import HardwareVersion from pyshimmer.util import raise_to_next_pow, unpack, flatten_list, bit_is_set @@ -355,7 +356,7 @@ class ESensorGroup(Enum): """ Assigns each channel type its appropriate data type. """ -ChDataTypeAssignment: dict[EChannelType, ChannelDataType] = { +ChDataTypeAssignmentShimmer3: dict[EChannelType, ChannelDataType] = { EChannelType.ACCEL_LN_X: ChannelDataType(2, signed=True, le=True), EChannelType.ACCEL_LN_Y: ChannelDataType(2, signed=True, le=True), EChannelType.ACCEL_LN_Z: ChannelDataType(2, signed=True, le=True), @@ -400,6 +401,58 @@ class ESensorGroup(Enum): EChannelType.TIMESTAMP: ChannelDataType(3, signed=False, le=True), } +""" +Assigns each channel type its appropriate data type. +""" +ChDataTypeAssignmentShimmer3R: dict[EChannelType, ChannelDataType] = { + EChannelType.ACCEL_LN_X: ChannelDataType(2, signed=True, le=True), + EChannelType.ACCEL_LN_Y: ChannelDataType(2, signed=True, le=True), + EChannelType.ACCEL_LN_Z: ChannelDataType(2, signed=True, le=True), + EChannelType.VBATT: ChannelDataType(2, signed=True, le=True), + EChannelType.ACCEL_WR_X: ChannelDataType(2, signed=True, le=True), + EChannelType.ACCEL_WR_Y: ChannelDataType(2, signed=True, le=True), + EChannelType.ACCEL_WR_Z: ChannelDataType(2, signed=True, le=True), + EChannelType.MAG_REG_X: ChannelDataType(2, signed=True, le=True), + EChannelType.MAG_REG_Y: ChannelDataType(2, signed=True, le=True), + EChannelType.MAG_REG_Z: ChannelDataType(2, signed=True, le=True), + EChannelType.GYRO_X: ChannelDataType(2, signed=True, le=True), + EChannelType.GYRO_Y: ChannelDataType(2, signed=True, le=True), + EChannelType.GYRO_Z: ChannelDataType(2, signed=True, le=True), + EChannelType.EXTERNAL_ADC_A0: ChannelDataType(2, signed=False, le=True), + EChannelType.EXTERNAL_ADC_A1: ChannelDataType(2, signed=False, le=True), + EChannelType.EXTERNAL_ADC_A2: ChannelDataType(2, signed=False, le=True), + EChannelType.INTERNAL_ADC_A3: ChannelDataType(2, signed=False, le=True), + EChannelType.INTERNAL_ADC_A0: ChannelDataType(2, signed=False, le=True), + EChannelType.INTERNAL_ADC_A1: ChannelDataType(2, signed=False, le=True), + EChannelType.INTERNAL_ADC_A2: ChannelDataType(2, signed=False, le=True), + EChannelType.ACCEL_HG_X: None, + EChannelType.ACCEL_HG_Y: None, + EChannelType.ACCEL_HG_Z: None, + EChannelType.MAG_WR_X: None, + EChannelType.MAG_WR_Y: None, + EChannelType.MAG_WR_Z: None, + EChannelType.TEMPERATURE: ChannelDataType(2, signed=False, le=False), + EChannelType.PRESSURE: ChannelDataType(3, signed=False, le=False), + EChannelType.GSR_RAW: ChannelDataType(2, signed=False, le=True), + EChannelType.EXG1_STATUS: ChannelDataType(1, signed=False, le=True), + EChannelType.EXG1_CH1_24BIT: ChannelDataType(3, signed=True, le=False), + EChannelType.EXG1_CH2_24BIT: ChannelDataType(3, signed=True, le=False), + EChannelType.EXG2_STATUS: ChannelDataType(1, signed=False, le=True), + EChannelType.EXG2_CH1_24BIT: ChannelDataType(3, signed=True, le=False), + EChannelType.EXG2_CH2_24BIT: ChannelDataType(3, signed=True, le=False), + EChannelType.EXG1_CH1_16BIT: ChannelDataType(2, signed=True, le=False), + EChannelType.EXG1_CH2_16BIT: ChannelDataType(2, signed=True, le=False), + EChannelType.EXG2_CH1_16BIT: ChannelDataType(2, signed=True, le=False), + EChannelType.EXG2_CH2_16BIT: ChannelDataType(2, signed=True, le=False), + EChannelType.STRAIN_HIGH: ChannelDataType(2, signed=False, le=True), + EChannelType.STRAIN_LOW: ChannelDataType(2, signed=False, le=True), + EChannelType.TIMESTAMP: ChannelDataType(3, signed=False, le=True), +} + +ChDataTypeAssignmentShimmer: dict[EChannelType, ChannelDataType] = dict(ChDataTypeAssignmentShimmer3) + +ChDataTypeAssignment = ChDataTypeAssignmentShimmer + """ This dictionary contains the mapping from sensor to data channels. Since one sensor can record on multiple channels, the mapping is one-to-many. @@ -529,6 +582,14 @@ class ESensorGroup(Enum): ENABLED_SENSORS_LEN = 0x03 SENSOR_DTYPE = ChannelDataType(size=ENABLED_SENSORS_LEN, signed=False, le=True) +def set_active_dtype_assignment(hw: HardwareVersion | None) -> None: + """ + Mutate the active dtype map in-place to match the given hardware. + Using in-place mutation ensures modules that imported the dict earlier see the update. + """ + target = ChDataTypeAssignmentShimmer3R if hw == HardwareVersion.SHIMMER3R else ChDataTypeAssignmentShimmer3 + ChDataTypeAssignmentShimmer.clear() + ChDataTypeAssignmentShimmer.update(target) def get_enabled_channels(sensors: list[ESensorGroup]) -> list[EChannelType]: """Determine the set of data channels for a set of enabled sensors