diff --git a/10-harp.rules b/10-harp.rules new file mode 100644 index 0000000..c5114ef --- /dev/null +++ b/10-harp.rules @@ -0,0 +1,3 @@ +# UDEV rules for a Harp Device (actually an ftdi RS232 Serial [Uart] IC) +SUBSYSTEMS=="usb", ENV{.LOCAL_ifNum}="$attr{bInterfaceNumber}" +SUBSYSTEMS=="usb", KERNEL=="ttyUSB*", ATTRS{idVendor}=="0403", ATTRS{idProduct}=="6001", MODE="0666", SYMLINK+="harp_device_%E{.LOCAL_ifNum}" diff --git a/README.md b/README.md index 4b18b2d..b506d9b 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,16 @@ - - # pyharp Harp implementation of the Harp protocol. -## Edit the code +## Install with Pip +From this directory, install in editable mode with +```` +pip install -e . +```` + +Note that for the above to work, a fairly recent version of pip (>= 21.3) is required. + +## Install with Poetry Each Python user has is own very dear IDE for editing. Here, we are leaving instructions on how to edit this code using pyCharm, Anaconda and Poetry. @@ -69,3 +75,17 @@ Device info: * Firmware version: 1.0 * Device user name: IBL_rig_0 ``` + +## for Linux + +### Install UDEV Rules + +Install by either copying `10-harp.rules` over to your `/etc/udev/rules.d` folder or by symlinking it with: +```` +sudo ln -s /absolute/path/to/vibratome-controller/10-harp.rules /etc/udev/rules.d/10-harp.rules +```` + +Then reload udev rules with +```` +sudo udevadm control --reload-rules +```` diff --git a/examples/behavior_device_driver_test.py b/examples/behavior_device_driver_test.py new file mode 100755 index 0000000..d9cfab7 --- /dev/null +++ b/examples/behavior_device_driver_test.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python3 +from pyharp.drivers.behavior import Behavior +from pyharp.messages import HarpMessage +from pyharp.messages import MessageType +from struct import * +import os + + +# Open the device and print the info on screen +# Open serial connection and save communication to a file +device = None +if os.name == 'posix': # check for Linux. + device = Behavior("/dev/harp_device_00", "ibl.bin") +else: # assume Windows. + device = Behavior("COM95", "ibl.bin") + +print(f"digital inputs: {device.all_input_states:03b}") +print(f"digital outputs: {device.all_output_states:016b}") +print(f"setting digital outputs") +#device.all_output_states = 0x0000 # Set the whole port directly. +#device.set_outputs(0xFFFF) # Set the values set to logic 1 only. +#device.clear_outputs(0xFFFF)# Clear values set to logic 1 only. +print(f"digital outputs: {device.all_output_states:016b}") +device.set_io_configuration(0b111) + +# TODO: FIXME. IOs are not working +#device.set_io_configuration(0b111) # This is getting ignored? +#device.set_io_outputs(0b000) +#device.all_io_states = 0b000 +#print(f"digital ios: {device.all_io_states:03b}") + +#device.D0 = 0 +#print(f"D0: {device.D0}") +#device.D0 = 1 +#print(f"D0: {device.D0}") +# +#device.D1 = 0 +#print(f"D1: {device.D1}") +#device.D1 = 1 +#print(f"D1: {device.D1}") +# +#print(f"DI2: {device.DI2}") + + +#import time +#while True: +# print(f"PORT0 IN State: {device.port0_i0}") +# print(f"PORT0 IO State: {device.port0_io0}") +# print(f"PORT0 OUT State: {device.port0_o0}") +# print(f"all port io states: {device.all_port_io_states}") +# print(f"all port output states: {device.all_port_output_states}") +# print() +# time.sleep(0.1) diff --git a/examples/check_device_id.py b/examples/check_device_id.py old mode 100644 new mode 100755 index b118b0e..9568921 --- a/examples/check_device_id.py +++ b/examples/check_device_id.py @@ -1,7 +1,10 @@ +#!/usr/bin/env python3 from pyharp.device import Device from pyharp.messages import HarpMessage from pyharp.messages import MessageType +from pyharp.device_names import device_names from struct import * +import os # ON THIS EXAMPLE @@ -11,7 +14,11 @@ # Open the device -device = Device("COM95") # Open serial connection +# Open serial connection +if os.name == "posix": # check for Linux. + device = Device("/dev/harp_device_00") +else: # assume Windows. + device = Device("COM95") # Get some of the device's parameters device_id = device.WHO_AM_I # Get device's ID @@ -19,7 +26,7 @@ device_user_name = device.DEVICE_NAME # Get device's user name # Check if we are dealing with the correct device -if device_id == 2080: +if device_id in device_names: print("Correct device was found!") print(f"Device's ID: {device_id}") print(f"Device's name: {device_id_description}") diff --git a/examples/get_info.py b/examples/get_info.py old mode 100644 new mode 100755 index fca840c..eec8cef --- a/examples/get_info.py +++ b/examples/get_info.py @@ -1,7 +1,9 @@ -from pyharp.device import Device +#!/usr/bin/env python3 +from pyharp.device import Device, DeviceMode from pyharp.messages import HarpMessage from pyharp.messages import MessageType from struct import * +import os # ON THIS EXAMPLE @@ -11,7 +13,13 @@ # Open the device and print the info on screen -device = Device("COM95", "ibl.bin") # Open serial connection and save communication to a file +# Open serial connection and save communication to a file +if os.name == 'posix': # check for Linux. + #device = Device("/dev/harp_device_00", "ibl.bin") + #device = Device("/dev/ttyACM0") + device = Device("/dev/ttyUSB0") +else: # assume Windows. + device = Device("COM95", "ibl.bin") device.info() # Display device's info on screen # Get some of the device's parameters @@ -28,5 +36,10 @@ device_harp_l = device.HARP_VERSION_L # Get device's harp core version device_assembly = device.ASSEMBLY_VERSION # Get device's assembly version +reg_dump = device.dump_registers() +for reg_reply in reg_dump: + print(reg_reply) + print() + # Close connection -device.disconnect() \ No newline at end of file +device.disconnect() diff --git a/examples/wait_for_events.py b/examples/wait_for_events.py new file mode 100755 index 0000000..584836e --- /dev/null +++ b/examples/wait_for_events.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python3 +from pyharp.drivers.behavior import Behavior, Events +from pyharp.messages import HarpMessage +from pyharp.messages import MessageType +from struct import * +import os + +from pyharp.device import Device, DeviceMode + + +# Open the device and print the info on screen +# Open serial connection and save communication to a file +device = None +if os.name == 'posix': # check for Linux. + #device = Behavior("/dev/harp_device_00", "ibl.bin") + #device = Device("/dev/ttyACM0",) + device = Device("/dev/ttyUSB0",) +else: # assume Windows. + device = Behavior("COM95", "ibl.bin") + +print("Setting mode to active.") +# Mode will remain active for up to 3 seconds after CTS pin is brought low. +device.set_mode(DeviceMode.Active) +#device.disable_all_events() +#device.enable_events(Events.port_digital_inputs) +while True: + for msg in device.get_events(): + print(msg) + print() diff --git a/examples/write_and_read_from_registers.py b/examples/write_and_read_from_registers.py old mode 100644 new mode 100755 index ccde127..4430b00 --- a/examples/write_and_read_from_registers.py +++ b/examples/write_and_read_from_registers.py @@ -1,7 +1,9 @@ +#!/usr/bin/env python3 from pyharp.device import Device from pyharp.messages import HarpMessage from pyharp.messages import MessageType from struct import * +import os # ON THIS EXAMPLE @@ -12,26 +14,50 @@ # Open the device and print the info on screen -device = Device("COM95", "ibl.bin") # Open serial connection and save communication to a file +# Open serial connection and save communication to a file +if os.name == 'posix': # check for Linux. + device = Device("/dev/harp_device_00", "ibl.bin") +else: # assume Windows. + device = Device("COM95", "ibl.bin") # Read current analog sensor's higher threshold (ANA_SENSOR_TH0_HIGH) at address 42 -analog_threshold_h = device.send(HarpMessage.ReadU16(42).frame).payload_as_int() -print(f"Analog sensor's higher threshold: {analog_threshold_h}") - -# Increase current analog sensor's higher threshold by one unit -device.send(HarpMessage.WriteU16(42, analog_threshold_h+1).frame) - -# Check if the register was well written -analog_threshold_h = device.send(HarpMessage.ReadU16(42).frame).payload_as_int() -print(f"Analog sensor's higher threshold: {analog_threshold_h}") - -# Read 10 samples of the analog sensor and display the values -# The value is at register STREAM[0], address 33 -analog_sensor = [] -for x in range(10): - value = device.send(HarpMessage.ReadS16(33).frame).payload_as_int() - analog_sensor.append(value & 0xffff) -print(f"Analog sensor's values: {analog_sensor}") +#analog_threshold_h = device.send(HarpMessage.ReadU16(42).frame).payload_as_int() +#print(f"Analog sensor's higher threshold: {analog_threshold_h}") + + +import time + +print(f"System time: {time.perf_counter():.6f}") +data_stream = device.send(HarpMessage.ReadU8(33).frame) # returns a ReplyHarpMessage +#data_stream = device.send(HarpMessage.ReadS16(33).frame).payload_as_int_array() +print(f"Data Stream payload type: {data_stream.payload_type.name}") +print(f"Data Stream message type: {data_stream.message_type.name}") +print(f"Data Stream timestamp: {data_stream.timestamp}") +print(f"Data Stream num bytes: {data_stream.length}") +print(f"Data Stream payload: {data_stream.payload}") + +print(f"System time: {time.perf_counter():.6f}") +event_reg_response = device.send(HarpMessage.ReadU8(77).frame) # returns a ReplyHarpMessage +print(f"EVNT_ENABLE payload type: {event_reg_response.payload_type.name}") +print(f"EVNT_ENABLE message type: {event_reg_response.message_type.name}") +print(f"EVNT_ENABLE timestamp: {event_reg_response.timestamp}") +print(f"EVNT_ENABLE num bytes: {event_reg_response.length}") +print(f"EVNT_ENABLE payload: {event_reg_response.payload[0]:08b}") + +## Increase current analog sensor's higher threshold by one unit +#device.send(HarpMessage.WriteU16(42, analog_threshold_h+1).frame) +# +## Check if the register was well written +#analog_threshold_h = device.send(HarpMessage.ReadU16(42).frame).payload_as_int() +#print(f"Analog sensor's higher threshold: {analog_threshold_h}") +# +## Read 10 samples of the analog sensor and display the values +## The value is at register STREAM[0], address 33 +#analog_sensor = [] +#for x in range(10): +# value = device.send(HarpMessage.ReadS16(33).frame).payload_as_int() +# analog_sensor.append(value & 0xffff) +#print(f"Analog sensor's values: {analog_sensor}") # Close connection -device.disconnect() \ No newline at end of file +device.disconnect() diff --git a/pyharp/device.py b/pyharp/device.py index 5e0e64d..b4c6f6d 100644 --- a/pyharp/device.py +++ b/pyharp/device.py @@ -1,18 +1,31 @@ +from __future__ import annotations # enable subscriptable type hints for lists. import serial -from typing import Optional +import logging +import queue +from typing import Optional, Union from pathlib import Path +from pyharp.harp_serial import HarpSerial from pyharp.messages import HarpMessage, ReplyHarpMessage -from pyharp.messages import CommonRegisters -from pyharp import device_names +from pyharp.messages import CommonRegisters, MessageType +from pyharp.device_names import device_names +from enum import Enum +from time import perf_counter + + +class DeviceMode(Enum): + Standby = 0 + Active = 1 + Reserved = 2 + Speed = 3 class Device: """ - https://github.com/harp-tech/protocol/blob/master/Device%201.0%201.3%2020190207.pdf + https://github.com/harp-tech/protocol/blob/master/Device%201.1%201.0%2020220402.pdf """ - _ser: serial.Serial + _ser: HarpSerial _dump_file_path: Path WHO_AM_I: int @@ -24,18 +37,23 @@ class Device: HARP_VERSION_L: int FIRMWARE_VERSION_H: int FIRMWARE_VERSION_L: int - # TIMESTAMP_SECOND = 0x08 - # TIMESTAMP_MICRO = 0x09 - # OPERATION_CTRL = 0x0A - # RESET_DEV = 0x0B DEVICE_NAME: str - def __init__(self, serial_port: str, dump_file_path: Optional[str] = None): + TIMEOUT_S = 1.0 + + def __init__( + self, + serial_port: str, + dump_file_path: Optional[str] = None, + read_timeout_s=1, + ): + self.log = logging.getLogger(f"{__name__}.{self.__class__.__name__}") self._serial_port = serial_port if dump_file_path is None: self._dump_file_path = None else: self._dump_file_path = Path() / dump_file_path + self.read_timeout_s = read_timeout_s self.connect() self.load() @@ -53,25 +71,22 @@ def load(self) -> None: def info(self) -> None: print("Device info:") - #print(f"* Who am I (ID): {self.WHO_AM_I}") - #print(f"* Who am I (Device): {self.WHO_AM_I_DEVICE}") print(f"* Who am I: ({self.WHO_AM_I}) {self.WHO_AM_I_DEVICE}") print(f"* HW version: {self.HW_VERSION_H}.{self.HW_VERSION_L}") print(f"* Assembly version: {self.ASSEMBLY_VERSION}") print(f"* HARP version: {self.HARP_VERSION_H}.{self.HARP_VERSION_L}") - print( - f"* Firmware version: {self.FIRMWARE_VERSION_H}.{self.FIRMWARE_VERSION_L}" - ) + print(f"* Firmware version: {self.FIRMWARE_VERSION_H}.{self.FIRMWARE_VERSION_L}") print(f"* Device user name: {self.DEVICE_NAME}") + print(f"* Mode: {self.read_device_mode().name}") def read(self): pass def connect(self) -> None: - self._ser = serial.Serial( + self._ser = HarpSerial( self._serial_port, # "/dev/tty.usbserial-A106C8O9" baudrate=1000000, - timeout=1, + timeout=self.__class__.TIMEOUT_S, parity=serial.PARITY_NONE, stopbits=1, bytesize=8, @@ -87,7 +102,7 @@ def read_who_am_i(self) -> int: reply: ReplyHarpMessage = self.send( HarpMessage.ReadU16(address).frame, dump=False ) - + #print(str(reply)) return reply.payload_as_int() def read_who_am_i_device(self) -> str: @@ -96,6 +111,7 @@ def read_who_am_i_device(self) -> str: reply: ReplyHarpMessage = self.send( HarpMessage.ReadU16(address).frame, dump=False ) + #print(str(reply)) return device_names.get(reply.payload_as_int()) @@ -105,6 +121,7 @@ def read_hw_version_h(self) -> int: reply: ReplyHarpMessage = self.send( HarpMessage.ReadU8(address).frame, dump=False ) + #print(str(reply)) return reply.payload_as_int() @@ -114,6 +131,7 @@ def read_hw_version_l(self) -> int: reply: ReplyHarpMessage = self.send( HarpMessage.ReadU8(address).frame, dump=False ) + #print(str(reply)) return reply.payload_as_int() @@ -123,6 +141,7 @@ def read_assembly_version(self) -> int: reply: ReplyHarpMessage = self.send( HarpMessage.ReadU8(address).frame, dump=False ) + #print(str(reply)) return reply.payload_as_int() @@ -132,6 +151,7 @@ def read_harp_h_version(self) -> int: reply: ReplyHarpMessage = self.send( HarpMessage.ReadU8(address).frame, dump=False ) + #print(str(reply)) return reply.payload_as_int() @@ -141,6 +161,7 @@ def read_harp_l_version(self) -> int: reply: ReplyHarpMessage = self.send( HarpMessage.ReadU8(address).frame, dump=False ) + #print(str(reply)) return reply.payload_as_int() @@ -150,6 +171,7 @@ def read_fw_h_version(self) -> int: reply: ReplyHarpMessage = self.send( HarpMessage.ReadU8(address).frame, dump=False ) + #print(str(reply)) return reply.payload_as_int() @@ -158,7 +180,8 @@ def read_fw_l_version(self) -> int: reply: ReplyHarpMessage = self.send( HarpMessage.ReadU8(address).frame, dump=False - ) +) + #print(str(reply)) return reply.payload_as_int() @@ -169,32 +192,117 @@ def read_device_name(self) -> str: reply: ReplyHarpMessage = self.send( HarpMessage.ReadU8(address).frame, dump=False ) + #print(str(reply)) return reply.payload_as_string() - def send(self, message_bytes: bytearray, dump: bool = True) -> ReplyHarpMessage: + def read_device_mode(self) -> DeviceMode: + address = CommonRegisters.OPERATION_CTRL + reply = self.send(HarpMessage.ReadU8(address).frame) + return DeviceMode(reply.payload_as_int() & 0x03) + + def dump_registers(self) -> list: + """Assert the DUMP bit to dump the values of all core and app registers + as Harp Read Reply Messages. + """ + address = CommonRegisters.OPERATION_CTRL + reg_value = self.send(HarpMessage.ReadU8(address).frame).payload_as_int() + reg_value |= 0x08 # Assert DUMP bit + self._ser.write(HarpMessage.WriteU8(address, reg_value).frame) + replies = [] + while True: + msg = self._read() + if msg is not None: + replies.append(msg) + else: + break + return replies + +# TODO: Not sure if we want to implement these. Delete if no. + def set_mode(self, mode: DeviceMode) -> ReplyHarpMessage: + """Change the device's OPMODE. Reply can be ignored.""" + address = CommonRegisters.OPERATION_CTRL + # Read register first. + reg_value = self.send(HarpMessage.ReadU8(address).frame).payload_as_int() + reg_value &= ~0x03 # mask off old mode. + reg_value |= mode.value + reply = self.send(HarpMessage.WriteU8(address, reg_value).frame) + return reply + + def enable_status_led(self): + """enable the device's status led if one exists.""" + address = CommonRegisters.OPERATION_CTRL + # Read register first. + reg_value = self.send(HarpMessage.ReadU8(address).frame).payload_as_int() + reg_value |= (1 << 5) + reply = self.send(HarpMessage.WriteU8(address, reg_value).frame) + + def disable_status_led(self): + """disable the device's status led if one exists.""" + address = CommonRegisters.OPERATION_CTRL + # Read register first. + reg_value = self.send(HarpMessage.ReadU8(address).frame).payload_as_int() + reg_value &= ~(1 << 5) + reply = self.send(HarpMessage.WriteU8(address, reg_value).frame) + + def enable_alive_en(self): + """Enable ALIVE_EN such that the device sends an event each second.""" + address = CommonRegisters.OPERATION_CTRL + # Read register first. + reg_value = self.send(HarpMessage.ReadU8(address).frame).payload_as_int() + reg_value |= (1 << 7) + reply = self.send(HarpMessage.WriteU8(address, reg_value).frame) + + def disable_alive_en(self): + """disable ALIVE_EN such that the device does not send an event each second.""" + address = CommonRegisters.OPERATION_CTRL + # Read register first. + reg_value = self.send(HarpMessage.ReadU8(address).frame).payload[0] + reg_value &= ((1<< 7) ^ 0xFF) # bitwise ~ operator substitute for Python ints. + reply = self.send(HarpMessage.WriteU8(address, reg_value).frame) + + def reset_device(self): + address = CommonRegisters.RESET_DEV + # reset_value = 0xFF & (1< ReplyHarpMessage: + """Send a harp message; return the device's reply.""" + #print(f"Sending: {repr(message_bytes)}") self._ser.write(message_bytes) # TODO: handle case where read is None + reply: ReplyHarpMessage = self._read() - message_type = self._ser.read(1)[0] # byte array with only one byte - message_length = self._ser.read(1)[0] - message_content = self._ser.read(message_length) - - frame = bytearray() - frame.append(message_type) - frame.append(message_length) - frame += message_content - - reply: ReplyHarpMessage = HarpMessage.parse(frame) - - if dump: + if dump and self._dump_file_path is not None: self._dump_reply(reply.frame) return reply + def _read(self) -> Union[ReplyHarpMessage, None]: + """(Blocking) Read an incoming serial message.""" + try: + return self._ser.msg_q.get(block=True, timeout=self.read_timeout_s) + except queue.Empty: + return None + def _dump_reply(self, reply: bytes): assert self._dump_file_path is not None with self._dump_file_path.open(mode="ab") as f: f.write(reply) + + def get_events(self) -> list[ReplyHarpMessage]: + """Get all events from the event queue.""" + msgs = [] + while True: + try: + msgs.append(self._ser.event_q.get(timeout=False)) + except queue.Empty: + break + return msgs + + def event_count(self) -> int: + """Get the number of events in the event queue.""" + return self._ser.event_q.qsize() \ No newline at end of file diff --git a/pyharp/device_names.py b/pyharp/device_names.py index e5d8cbd..d7b7557 100644 --- a/pyharp/device_names.py +++ b/pyharp/device_names.py @@ -1,41 +1,26 @@ -def get(value: int) -> str: - if value == 1024: - return "Poke" - elif value == 1040: - return "MultiPwm" - elif value == 1056: - return "Wear" - elif value == 1072: - return "VoltsDrive" - elif value == 1088: - return "LedController" - elif value == 1104: - return "Synchronizer" - elif value == 1121: - return "SimpleAnalogGenerator" - elif value == 1136: - return "Archimedes" - elif value == 1152: - return "ClockSynchronizer" - elif value == 1168: - return "Camera" - elif value == 1184: - return "PyControl" - elif value == 1200: - return "FlyPad" - elif value == 1216: - return "Behavior" - elif value == 1232: - return "LoadCells" - elif value == 1248: - return "AudioSwitch" - elif value == 1264: - return "Rgb" - elif value == 1200: - return "FlyPad" - elif value == 2064: - return "FP3002" - elif value == 2080: - return "IblBehavior" - else: - return "NotSpecified" +from collections import defaultdict + + +current_device_names = \ + {1024: 'Poke', + 1040: 'MultiPwm', + 1056: 'Wear', + 1072: 'VoltsDrive', + 1088: 'LedController', + 1104: 'Synchronizer', + 1121: 'SimpleAnalogGenerator', + 1136: 'Archimedes', + 1152: 'ClockSynchronizer', + 1168: 'Camera', + 1184: 'PyControl', + 1200: 'FlyPad', + 1216: 'Behavior', + 1232: 'LoadCells', + 1248: 'AudioSwitch', + 1264: 'Rgb', + 1200: 'FlyPad', + 2064: 'FP3002', + 2080: 'IblBehavior'} +device_names = defaultdict(lambda: 'NotSpecified') +device_names.update(current_device_names) + diff --git a/pyharp/drivers/__init__.py b/pyharp/drivers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyharp/drivers/behavior.py b/pyharp/drivers/behavior.py new file mode 100644 index 0000000..989f89a --- /dev/null +++ b/pyharp/drivers/behavior.py @@ -0,0 +1,408 @@ +"""Behavior Device Driver.""" + +from pyharp.messages import HarpMessage, ReplyHarpMessage +from pyharp.device_names import device_names +from pyharp.device import Device +import serial +from serial.serialutil import SerialException +from enum import Enum + + +# These definitions are from app_regs.h in the firmware. +# Type, Base Address, "Description." +REGISTERS = \ +{ # RJ45 "PORT" (0, 1, 2) Digital Inputs + "PORT_DIS" : ("U8", 32, "Reflects the state of DI digital lines of each Port."), + + # Manipulate any of the board's digital outputs. + "OUTPUTS_SET": ("U16", 34, "Set the corresponding output."), + "OUTPUTS_CLR": ("U16", 35, "Clear the corresponding output."), + "OUTPUTS_TOGGLE": ("U16", 36, "Toggle the corresponding output."), + "OUTPUTS_OUT": ("U16", 37, "Control corresponding output."), + + # RJ45 "PORT" (0, 1, 2) Digital IOs + "PORT_DIOS_SET": ("U8", 38, "Set the corresponding DIO."), + "PORT_DIOS_CLEAR": ("U8", 39, "Clear the corresponding DIO."), + "PORT_DIOS_TOGGLE": ("U8", 40, "Toggle the corresponding DIO."), + "PORT_DIOS_OUT": ("U8", 41, "Control the corresponding DIO."), + "PORT_DIOS_CONF": ("U8", 42, "Set the DIOs direction (1 is output)."), + "PORT_DIOS_IN": ("U8", 43, "State of the DIOs."), + + "ADD_REG_DATA": ("S16", 44, "Voltage at ADC input and decoder (poke2) value."), + + "EVNT_ENABLE": ("U8", 77, "Enable events within the bitfields."), +} + + +# Register Bitfields +class PORT_DIS(Enum): + DI0 = 0 + DI1 = 1 + DI2 = 2 + +class OUTPUTS_OUT(Enum): + PORT0_DO = 0 + PORT0_D1 = 1 + PORT0_D2 = 2 + + PORT0_12V = 3 + PORT1_12V = 4 + PORT2_12V = 5 + + B_LED0 = 6 + B_LED1 = 7 + B_RGB0 = 8 + B_RGB1 = 9 + + DO0 = 10 + DO1 = 11 + DO2 = 12 + DO3 = 13 + +class PORT_DIOS_IN(Enum): + DIO0 = 0 + DIO1 = 0 + DIO2 = 0 + + +# reader-friendly events for enabling/disabling. +class Events(Enum): + port_digital_inputs = 0 # PORT_DIS + port_digital_ios = 1 # PORT_DIOS_IN + analog_input = 2 # DATA + cam0 = 3 # CAM0 + cam1 = 3 # CAM1 + + +class Behavior: + """Driver for Behavior Device.""" + + # On Linux, the symlink to the first detected harp device. + # Name set in udev rules and will increment with subsequent devices. + DEVICE_NAME = "Behavior" + DEFAULT_PORT_NAME = "/dev/harp_device_00" + ID = 1216 + + # TODO: put this in a base class? + READ_MSG_LOOKUP = \ + { + "U8": HarpMessage.ReadU8, + "U16": HarpMessage.ReadU16, + "S16": HarpMessage.ReadS16, + } + + WRITE_MSG_LOOKUP = \ + { + "U8": HarpMessage.WriteU8, + "U16": HarpMessage.WriteU16, + "S16": HarpMessage.WriteS16, + } + + + def __init__(self, port_name=None, output_filename=None): + """Class constructor. Connect to a device.""" + + self.device = None + + try: + if port_name is None: + self.device = Device(self.__class__.DEFAULT_PORT_NAME, output_filename) + else: + self.device = Device(port_name, output_filename) + except (FileNotFoundError, SerialException): + print("Error: Failed to connect to Behavior Device. Is it plugged in?") + raise + + if self.device.WHO_AM_I != self.__class__.ID: + raise IOError("Error: Did not connect to Harp Behavior Device.") + + + def get_reg_info(self, reg_name: str) -> str: + """get info for this device's particular reg.""" + try: + return REGISTERS[reg_name][2] + except KeyError: + raise KeyError(f"reg: {reg_name} is not a register in " + "{self.__class__.name} Device's register map.") + + + def disable_all_events(self) -> ReplyHarpMessage: + """Disable the publishing of all events from Behavior device.""" + event_reg_bitmask = (((1 << Events.port_digital_inputs.value) | \ + (1 << Events.port_digital_ios.value) | \ + (1 << Events.analog_input.value) | \ + (1 << Events.cam0.value) | \ + (1 << Events.cam1.value) ) ^ 0xFF) + reg_type, reg_index, _ = REGISTERS["EVNT_ENABLE"] + write_message_type = self.__class__.WRITE_MSG_LOOKUP[reg_type] + return self.device.send(write_message_type(reg_index, event_reg_bitmask).frame) + + + def enable_events(self, *events: Events) -> ReplyHarpMessage: + """enable any events passed in as arguments.""" + event_reg_bitmask = 0x00 + for event in events: + event_reg_bitmask |= (1 << event.value) + reg_type, reg_index, _ = REGISTERS["EVNT_ENABLE"] + write_message_type = self.__class__.WRITE_MSG_LOOKUP[reg_type] + return self.device.send(write_message_type(reg_index, event_reg_bitmask).frame) + + +# Board inputs, outputs, and some settings configured as @properties. + # INPUTS + @property + def all_input_states(self): + """return the state of all PORT digital inputs.""" + reg_type, reg_index, _ = REGISTERS["PORT_DIS"] + read_message_type = self.__class__.READ_MSG_LOOKUP[reg_type] + return self.device.send(read_message_type(reg_index).frame).payload_as_int() + + @property + def DI0(self): + """return the state of port0 digital input 0.""" + return self.all_port_input_states & 0x01 + + @property + def DI1(self): + """return the state of port1 digital input 0.""" + offset = PORT_DIS.DI1.value + return (self.all_port_input_states >> offset) & 0x01 + + @property + def DI2(self): + """return the state of port2 digital input 0.""" + offset = PORT_DIS.DI2.value + return (self.all_input_states >> offset) & 0x01 + +# These do not work currently. Perhaps something needs to be cleared (MIMIC?) +# before they will configure properly. +# # IOs +# def set_io_configuration(self, bitmask : int): +# """set the state of all PORT digital ios. (1 is output.)""" +# reg_type, reg_index, _ = REGISTERS["PORT_DIOS_CONF"] +# write_message_type = self.__class__.WRITE_MSG_LOOKUP[reg_type] +# self.device.send(write_message_type(reg_index, bitmask).frame) +# +# @property +# def all_io_states(self): +# """return the state of all PORT digital ios.""" +# reg_type, reg_index, _ = REGISTERS["PORT_DIOS_IN"] +# read_message_type = self.__class__.READ_MSG_LOOKUP[reg_type] +# return self.device.send(read_message_type(reg_index).frame).payload_as_int() +# +# @all_io_states.setter +# def all_io_states(self, bitmask : int): +# """set the state of all PORT digital input/outputs.""" +# # Setting the state of the "DIO" pins, requires writing to the +# # _IN register, which is different from the OUTPUT +# reg_type, reg_index, _ = REGISTERS["PORT_DIOS_IN"] +# write_message_type = self.__class__.WRITE_MSG_LOOKUP[reg_type] +# return self.device.send(write_message_type(reg_index, bitmask).frame) +# +# def set_io_outputs(self, bitmask : int): +# """set digital input/outputs to logic 1 according to bitmask.""" +# reg_type, reg_index, _ = REGISTERS["PORT_DIOS_SET"] +# write_message_type = self.__class__.WRITE_MSG_LOOKUP[reg_type] +# return self.device.send(write_message_type(reg_index, bitmask).frame) +# +# def clear_io_outputs(self, bitmask : int): +# """clear digital input/outputs (specified with logic 1) according to bitmask.""" +# reg_type, reg_index, _ = REGISTERS["PORT_DIOS_CLEAR"] +# write_message_type = self.__class__.WRITE_MSG_LOOKUP[reg_type] +# return self.device.send(write_message_type(reg_index, bitmask).frame) +# +# @property +# def port0_io0(self): +# """read the digital io state.""" +# return self.all_port_io_states & 0x01 +# +# @port0_io0.setter +# def port0_io0(self, value: int): +# """write port0 digital io state.""" +# pass +# +# @property +# def port1_io0(self): +# """read the digital io state.""" +# return (self.all_port_io_states >> 1) & 0x01 +# +# @port0_io0.setter +# def port1_io0(self, value: int): +# """write port0 digital io state.""" +# self.set_outputs(value&0x01) +# +# @property +# def port2_io0(self): +# """read the digital io state.""" +# return (self.all_port_io_states >> 2) & 0x01 +# +# @port0_io0.setter +# def port2_io0(self, value: int): +# """write port0 digital io state.""" +# pass + + + # OUTPUTS + @property + def all_output_states(self): + """return the state of all PORT digital inputs.""" + reg_type, reg_index, _ = REGISTERS["OUTPUTS_OUT"] + read_message_type = self.__class__.READ_MSG_LOOKUP[reg_type] + return self.device.send(read_message_type(reg_index).frame).payload_as_int() + + @all_output_states.setter + def all_output_states(self, bitmask : int): + """set the state of all PORT digital inputs.""" + reg_type, reg_index, _ = REGISTERS["OUTPUTS_OUT"] + write_message_type = self.__class__.WRITE_MSG_LOOKUP[reg_type] + return self.device.send(write_message_type(reg_index, bitmask).frame) + + def set_outputs(self, bitmask : int): + """set digital outputs to logic 1 according to bitmask.""" + reg_type, reg_index, _ = REGISTERS["OUTPUTS_SET"] + write_message_type = self.__class__.WRITE_MSG_LOOKUP[reg_type] + return self.device.send(write_message_type(reg_index, bitmask).frame) + + def clear_outputs(self, bitmask : int): + """clear digital outputs (specified with logic 1) according to bitmask.""" + reg_type, reg_index, _ = REGISTERS["OUTPUTS_CLR"] + write_message_type = self.__class__.WRITE_MSG_LOOKUP[reg_type] + return self.device.send(write_message_type(reg_index, bitmask).frame) + + @property + def D0(self): + """read the digital output D0 state.""" + return (self.all_output_states >> 10) & 0x01 + + @D0.setter + def D0(self, value): + """set the digital output D0 state.""" + if value: + self.set_outputs(1 << 10) + else: + self.clear_outputs(1 << 10) + + @property + def D1(self): + """read the digital output D1 state.""" + return (self.all_output_states >> 11) & 0x01 + + @D1.setter + def D1(self, value): + """set the digital output D1 state.""" + if value: + self.set_outputs(1 << 11) + else: + self.clear_outputs(1 << 11) + + @property + def D2(self): + """read the digital output D2 state.""" + return (self.all_output_states >> 12) & 0x01 + + @D2.setter + def D2(self, value): + """set the digital output D2 state.""" + if value: + self.set_outputs(1 << 12) + else: + self.clear_outputs(1 << 12) + + @property + def D3(self): + """read the digital output D3 state.""" + return (self.all_output_states >> 10) & 0x01 + + @D3.setter + def D3(self, value): + """set the digital output D3 state.""" + if value: + self.set_outputs(1 << 13) + else: + self.clear_outputs(1 << 13) + + @property + def port0_D0(self): + return self.all_output_states & 0x01 + + @port0_D0.setter + def port0_D0(self, value): + if value: + self.set_outputs(1) + else: + self.clear_outputs(1) + + @property + def port1_D0(self): + return (self.all_output_states >> 1) & 0x01 + + @port1_D0.setter + def port1_D0(self, value): + if value: + self.set_outputs(1 << 1) + else: + self.clear_outputs(1 << 1) + + @property + def port2_D0(self): + return (self.all_output_states >> 2) & 0x01 + + @port2_D0.setter + def port2_D0(self, value): + if value: + self.set_outputs(1 << 2) + else: + self.clear_outputs(1 << 2) + + + @property + def port0_12V(self): + return (self.all_output_states >> 3) & 0x01 + + @port0_12V.setter + def port0_12V(self, value): + if value: + self.set_outputs(1 << 3) + else: + self.clear_outputs(1 << 3) + + @property + def port1_12V(self): + return (self.all_output_states >> 4) & 0x01 + + @port1_12V.setter + def port1_12V(self, value): + if value: + self.set_outputs(1 << 4) + else: + self.clear_outputs(1 << 4) + + @property + def port2_12V(self): + return (self.all_output_states >> 5) & 0x01 + + @port2_12V.setter + def port2_12V(self, value): + if value: + self.set_outputs(1 << 5) + else: + self.clear_outputs(1 << 5) + + + + + def __enter__(self): + """Setup for the 'with' statement""" + return self + + + def __exit__(self, *args): + """Cleanup for the 'with' statement""" + if self.device is not None: + self.device.disconnect() + + + def __del__(self): + """Cleanup when Device gets garbage collected.""" + if self.device is not None: + self.device.disconnect() diff --git a/pyharp/harp_serial.py b/pyharp/harp_serial.py new file mode 100644 index 0000000..3b9d4a2 --- /dev/null +++ b/pyharp/harp_serial.py @@ -0,0 +1,84 @@ +from typing import Union +from functools import partial +import logging +import queue +import threading +import serial +import serial.threaded + +from pyharp.messages import HarpMessage, MessageType + + +class HarpSerialProtocol(serial.threaded.Protocol): + _read_q: queue.Queue + + def __init__(self, _read_q: queue.Queue, *args, **kwargs): + self._read_q = _read_q + super().__init__(*args, **kwargs) + + def connection_made(self, transport: serial.threaded.ReaderThread) -> None: + # print(f"Connected to {transport.serial.port}") + return super().connection_made(transport) + + def data_received(self, data: bytes) -> None: + for byte in data: + self._read_q.put(byte) + return super().data_received(data) + + def connection_lost(self, exc: Union[BaseException, None]) -> None: + # print(f"Lost connection!") + return super().connection_lost(exc) + + +class HarpSerial: + + msg_q: queue.Queue + event_q: queue.Queue + + def __init__(self, serial_port: str, **kwargs): + self._ser = serial.Serial(serial_port, **kwargs) + + self.log = logging.getLogger(f"{__name__}.{self.__class__.__name__}") + + self._read_q = queue.Queue() + self.msg_q = queue.Queue() + self.event_q = queue.Queue() + + self._reader = serial.threaded.ReaderThread( + self._ser, + partial(HarpSerialProtocol, self._read_q), + ) + self._reader.start() + transport, protocol = self._reader.connect() + + self._parse_thread = threading.Thread( + target=self.parse_harp_msgs_threaded, + daemon=True, + ) + self._parse_thread.start() + + def close(self): + self._reader.close() + + def write(self, data): + self._reader.write(data) + + def parse_harp_msgs_threaded(self): + while True: + message_type = self._read_q.get(1) # byte array with only one byte + message_length = self._read_q.get(1) + message_content = bytes([self._read_q.get() for _ in range(message_length)]) + self.log.debug(f"reply (type): {message_type}") + self.log.debug(f"reply (length): {message_length}") + self.log.debug(f"reply (payload): {message_content}") + + frame = bytearray() + frame.append(message_type) + frame.append(message_length) + frame += message_content + msg = HarpMessage.parse(frame) + + if msg.message_type == MessageType.EVENT: + self.event_q.put(msg) + else: + self.msg_q.put(msg) diff --git a/pyharp/messages.py b/pyharp/messages.py index 383ae2e..e86747a 100644 --- a/pyharp/messages.py +++ b/pyharp/messages.py @@ -1,8 +1,11 @@ +from __future__ import annotations # for type hints (PEP 563) +from enum import Enum # from abc import ABC, abstractmethod -from typing import Union, Tuple, Optional +from typing import Union, Tuple, Optional, List +import struct -class MessageType: +class MessageType(Enum): READ: int = 1 WRITE: int = 2 EVENT: int = 3 @@ -10,7 +13,7 @@ class MessageType: WRITE_ERROR: int = 10 -class PayloadType: +class PayloadType(Enum): isUnsigned: int = 0x00 isSigned: int = 0x80 isFloat: int = 0x40 @@ -36,9 +39,6 @@ class PayloadType: TimestampedS64 = hasTimestamp | S64 TimestampedFloat = hasTimestamp | Float - ALL_UNSIGNED = [U8, U16, U32, TimestampedU8, TimestampedU16] - ALL_SIGNED = [S8, S16, S32, TimestampedS8, TimestampedS16] - class CommonRegisters: WHO_AM_I = 0x00 @@ -56,10 +56,11 @@ class CommonRegisters: DEVICE_NAME = 0x0C -T = Union[int, bytearray] - - class HarpMessage: + """ + https://github.com/harp-tech/protocol/blob/master/Binary%20Protocol%201.0%201.1%2020180223.pdf + """ + DEFAULT_PORT: int = 255 _frame: bytearray @@ -77,189 +78,195 @@ def frame(self) -> bytearray: return self._frame @property - def message_type(self) -> int: - return self._frame[0] + def message_type(self) -> MessageType: + return MessageType(self._frame[0]) + + @property + def length(self) -> int: + return self._frame[1] + + @property + def address(self) -> int: + return self._frame[2] + + @property + def port(self) -> int: + return self._frame[3] + + @property + def payload_type(self) -> PayloadType: + return PayloadType(self._frame[4]) + + @property + def checksum(self) -> int: + return self._frame[-1] @staticmethod - def ReadU8(address: int) -> "ReadU8HarpMessage": + def ReadU8(address: int) -> ReadU8HarpMessage: return ReadU8HarpMessage(address) @staticmethod - def ReadS8(address: int) -> "ReadS8HarpMessage": + def ReadS8(address: int) -> ReadS8HarpMessage: return ReadS8HarpMessage(address) @staticmethod - def ReadS16(address: int) -> "ReadS16HarpMessage": + def ReadS16(address: int) -> ReadS16HarpMessage: return ReadS16HarpMessage(address) @staticmethod - def ReadU16(address: int) -> "ReadU16HarpMessage": + def ReadU16(address: int) -> ReadU16HarpMessage: return ReadU16HarpMessage(address) + # TODO: ReadS16 + + @staticmethod + def ReadU32(address: int) -> ReadU32HarpMessage: + return ReadU32HarpMessage(address) + @staticmethod - def WriteU8(address: int, value: int) -> "WriteU8HarpMessage": + def ReadS32(address: int) -> ReadS32HarpMessage: + return ReadS32HarpMessage(address) + + @staticmethod + def ReadFloat(address: int) -> ReadFloatHarpMessage: + return ReadFloatHarpMessage(address) + + @staticmethod + def WriteU8(address: int, value: int) -> WriteU8HarpMessage: return WriteU8HarpMessage(address, value) @staticmethod - def WriteS8(address: int, value: int) -> "WriteS8HarpMessage": + def WriteS8(address: int, value: int) -> WriteS8HarpMessage: return WriteS8HarpMessage(address, value) @staticmethod - def WriteS16(address: int, value: int) -> "WriteS16HarpMessage": + def WriteS16(address: int, value: int) -> WriteS16HarpMessage: return WriteS16HarpMessage(address, value) @staticmethod - def WriteU16(address: int, value: int) -> "WriteU16HarpMessage": + def WriteU16(address: int, value: int) -> WriteU16HarpMessage: return WriteU16HarpMessage(address, value) @staticmethod - def parse(frame: bytearray) -> "ReplyHarpMessage": + def WriteFloat(address: int, value: int) -> WriteFloatHarpMessage: + return WriteFloatHarpMessage(address, value) + + @staticmethod + def WriteU32(address: int, value: int) -> WriteU32HarpMessage: + return WriteU32HarpMessage(address, value) + + @staticmethod + def WriteS32(address: int, value: int) -> WriteS32HarpMessage: + return WriteS32HarpMessage(address, value) + + @staticmethod + def parse(frame: bytearray) -> ReplyHarpMessage: return ReplyHarpMessage(frame) +# A Response Message from a harp device. class ReplyHarpMessage(HarpMessage): - PAYLOAD_START_ADDRESS: int - PAYLOAD_LAST_ADDRESS: int - _message_type: int - _length: int - _address: int - _payload_type: int - _payload: bytes - _checksum: int + def __init__( self, frame: bytearray, ): """ - :param payload_type: - :param payload: - :param address: - :param offset: how many bytes more besides the length corresponding to U8 (for example, for U16 it would be offset=1) + :param frame: the serialized message frame. """ self._frame = frame - - self._message_type = frame[0] - self._length = frame[1] - self._address = frame[2] - self._port = frame[3] - self._payload_type = frame[4] - # TOOO: add timestamp here - self._payload = frame[ - 11:-1 - ] # retrieve all content from 11 (where payload starts) until the checksum (not inclusive) - self._checksum = frame[-1] # last index is the checksum - - # print(f"Type: {self.message_type}") - # print(f"Length: {self.length}") - # print(f"Address: {self.address}") - # print(f"Port: {self.port}") - # print(f"Payload Type: {self.payload_type}") - # print(f"Payload: {self.payload}") - # print(f"Checksum: {self.checksum}") - # print(f"Frame: {self.frame}") - - @property - def frame(self) -> bytearray: - return self._frame - - @property - def message_type(self) -> int: - return self._message_type - - @property - def length(self) -> int: - return self._length + # retrieve all content from 11 (where payload starts) until the checksum (not inclusive) + self._raw_payload = frame[11:-1] + self._payload = self._parse_payload(self._raw_payload) # payload formatted as list[payload type] + + # Assign timestamp after _payload since @properties all rely on self._payload. + self._timestamp = int.from_bytes(frame[5:9], byteorder="little", signed=False) + \ + int.from_bytes(frame[9:11], byteorder="little", signed=False)*32e-6 + # Timestamp is junk if it's not present. + if not (self.payload_type.value & PayloadType.hasTimestamp.value): + self._timestamp = None + + + def _parse_payload(self, raw_payload) -> list[int]: + """return the payload as a list of ints after parsing it from the raw payload.""" + is_signed = True if (self.payload_type.value & 0x80) else False + is_float = True if (self.payload_type.value & 0x40) else False + bytes_per_word = self.payload_type.value & 0x07 + payload_len = len(raw_payload) # payload length in bytes. + + word_chunks = [raw_payload[i:i+bytes_per_word] for i in range(0, payload_len, bytes_per_word)] + if not is_float: + return [int.from_bytes(chunk, byteorder="little", signed=is_signed) for chunk in word_chunks] + else: # handle float case. + return [struct.unpack(' int: - return self._address - - @property - def port(self) -> int: - return self._port - - @property - def payload_type(self) -> int: - return self._payload_type + def payload(self) -> Union[int, list[int]]: + """return the payload formatted as the appropriate type.""" + return self._payload @property - def payload(self) -> bytes: - return self._payload + def timestamp(self) -> float: + return self._timestamp def payload_as_int(self) -> int: - value: int = 0 - if self.payload_type in PayloadType.ALL_UNSIGNED: - value = int.from_bytes(self.payload, byteorder="little", signed=False) - elif self.payload_type in PayloadType.ALL_SIGNED: - value = int.from_bytes(self.payload, byteorder="little", signed=True) - return value - - def payload_as_int_array(self): - pass # TODO: implement this + return self.payload[0] def payload_as_string(self) -> str: - return self.payload.decode("utf-8") + return self._raw_payload.decode("utf-8") - @property - def checksum(self) -> int: - return self._checksum + def payload_as_float(self) -> float: + return self.payload[0] # already parsed. +# A Read Request Message sent to a harp device. class ReadHarpMessage(HarpMessage): MESSAGE_TYPE: int = MessageType.READ - _length: int - _address: int - _payload_type: int - _checksum: int - def __init__(self, payload_type: int, address: int): + + def __init__(self, payload_type: PayloadType, address: int): self._frame = bytearray() - self._frame.append(self.MESSAGE_TYPE) + self._frame.append(self.MESSAGE_TYPE.value) length: int = 4 self._frame.append(length) - self._frame.append(address) self._frame.append(self.DEFAULT_PORT) - self._frame.append(payload_type) + self._frame.append(payload_type.value) self._frame.append(self.calculate_checksum()) - # def calculate_checksum(self) -> int: - # return ( - # self.message_type - # + self.length - # + self.address - # + self.port - # + self.payload_type - # ) & 255 - - @property - def message_type(self) -> int: - return self._frame[0] - - @property - def length(self) -> int: - return self._frame[1] - - @property - def address(self) -> int: - return self._frame[2] - - @property - def port(self) -> int: - return self._frame[3] - - @property - def payload_type(self) -> int: - return self._frame[4] - - @property - def checksum(self) -> int: - return self._frame[5] - class ReadU8HarpMessage(ReadHarpMessage): def __init__(self, address: int): @@ -280,18 +287,27 @@ class ReadS16HarpMessage(ReadHarpMessage): def __init__(self, address: int): super().__init__(PayloadType.S16, address) +class ReadU32HarpMessage(ReadHarpMessage): + def __init__(self, address: int): + super().__init__(PayloadType.U32, address) + + +class ReadS32HarpMessage(ReadHarpMessage): + def __init__(self, address: int): + super().__init__(PayloadType.S32, address) + + +class ReadFloatHarpMessage(ReadHarpMessage): + def __init__(self, address: int): + super().__init__(PayloadType.Float, address) + class WriteHarpMessage(HarpMessage): BASE_LENGTH: int = 5 MESSAGE_TYPE: int = MessageType.WRITE - _length: int - _address: int - _payload_type: int - _payload: int - _checksum: int def __init__( - self, payload_type: int, payload: bytes, address: int, offset: int = 0 + self, payload_type: PayloadType, payload: bytes, address: int, offset: int = 0 ): """ @@ -302,53 +318,23 @@ def __init__( """ self._frame = bytearray() - self._frame.append(self.MESSAGE_TYPE) + self._frame.append(self.MESSAGE_TYPE.value) self._frame.append(self.BASE_LENGTH + offset) self._frame.append(address) self._frame.append(HarpMessage.DEFAULT_PORT) - self._frame.append(payload_type) + self._frame.append(payload_type.value) - for i in payload: - self._frame.append(i) + # Handle payloads that are bytes or bytearray (bytearray = multi-motor instructions) + if isinstance(payload, bytearray): + self._frame += payload + else: + for i in payload: + self._frame.append(i) self._frame.append(self.calculate_checksum()) - # def calculate_checksum(self) -> int: - # return ( - # self.message_type - # + self.length - # + self.address - # + self.port - # + self.payload_type - # + self.payload - # ) & 255 - - @property - def message_type(self) -> int: - return self._frame[0] - - @property - def length(self) -> int: - return self._frame[1] - - @property - def address(self) -> int: - return self._frame[2] - - @property - def port(self) -> int: - return self._frame[3] - - @property - def payload_type(self) -> int: - return self._frame[4] - - @property - def checksum(self) -> int: - return self._frame[-1] - class WriteU8HarpMessage(WriteHarpMessage): def __init__(self, address: int, value: int): @@ -393,3 +379,47 @@ def __init__(self, address: int, value: int): @property def payload(self) -> int: return int.from_bytes(self._frame[5:7], byteorder="little", signed=True) + + +class WriteFloatHarpMessage(WriteHarpMessage): + def __init__(self, address: int, value: float): + super().__init__( + PayloadType.Float, + struct.pack(' float: + return struct.unpack(' int: + return int.from_bytes(self._frame[5:9], byteorder="little", signed=False) + + +class WriteS32HarpMessage(WriteHarpMessage): + def __init__(self, address: int, value: int | List[int]): + if isinstance(value, list): + payload = bytearray() + for val in value: + payload += val.to_bytes(4, byteorder="little", signed=True) + offset = 15 + else: + payload = value.to_bytes(4, byteorder="little", signed=True) + offset = 3 + super().__init__( + PayloadType.S32, payload, address, offset=offset + ) + + @property + def payload(self) -> int | List[int]: + return int.from_bytes(self._frame[5:9], byteorder="little", signed=True) diff --git a/pyproject.toml b/pyproject.toml index 62037dc..ca8d302 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,8 +16,8 @@ mypy = "^0.782" black = "^19.10b0" [build-system] -requires = ["poetry>=0.12"] -build-backend = "poetry.masonry.api" +requires = ["poetry-core>=1.0.8"] +build-backend = "poetry.core.masonry.api" [tool.poetry.scripts] pyharp = "pyharp.main:main" diff --git a/tests/test_device.py b/tests/test_device.py index da16859..5a5eab9 100644 --- a/tests/test_device.py +++ b/tests/test_device.py @@ -1,5 +1,5 @@ import serial - +import time from typing import Optional from pyharp.messages import HarpMessage, ReplyHarpMessage from pyharp.device import Device @@ -84,3 +84,11 @@ def test_U8() -> None: # assert not ser.is_open # # # assert data[0] == '\t' + + +def test_device_events(device: Device) -> None: + while True: + print(device.event_count()) + for msg in device.get_events(): + print(msg) + time.sleep(0.3)