diff --git a/robel/components/robot/dynamixel_client.py b/robel/components/robot/dynamixel_client.py index 382d84f..7ada993 100644 --- a/robel/components/robot/dynamixel_client.py +++ b/robel/components/robot/dynamixel_client.py @@ -30,6 +30,9 @@ ADDR_PRESENT_VELOCITY = 128 ADDR_PRESENT_CURRENT = 126 ADDR_PRESENT_POS_VEL_CUR = 126 +ADDR_PRESENT_VOLTAGE = 144 +ADDR_PRESENT_TEMPERATURE = 146 +ADDR_PRESENT_VOL_TEMP = 144 # Data Byte Length LEN_PRESENT_POSITION = 4 @@ -37,12 +40,16 @@ LEN_PRESENT_CURRENT = 2 LEN_PRESENT_POS_VEL_CUR = 10 LEN_GOAL_POSITION = 4 +LEN_PRESENT_VOLTAGE = 2 +LEN_PRESENT_TEMPERATURE = 1 +LEN_PRESENT_VOL_TEMP = 3 DEFAULT_POS_SCALE = 2.0 * np.pi / 4096 # 0.088 degrees # See http://emanual.robotis.com/docs/en/dxl/x/xh430-v210/#goal-velocity DEFAULT_VEL_SCALE = 0.229 * 2.0 * np.pi / 60.0 # 0.229 rpm DEFAULT_CUR_SCALE = 1.34 - +DEFAULT_VOL_SCALE = 0.1 +DEFAULT_TEMP_SCALE = 1.0 def dynamixel_cleanup_handler(): """Cleanup function to ensure Dynamixels are disconnected properly.""" @@ -87,7 +94,9 @@ def __init__(self, lazy_connect: bool = False, pos_scale: Optional[float] = None, vel_scale: Optional[float] = None, - cur_scale: Optional[float] = None): + cur_scale: Optional[float] = None, + vol_scale: Optional[float] = None, + temp_scale: Optional[float] = None): """Initializes a new client. Args: @@ -124,6 +133,12 @@ def __init__(self, vel_scale=vel_scale if vel_scale is not None else DEFAULT_VEL_SCALE, cur_scale=cur_scale if cur_scale is not None else DEFAULT_CUR_SCALE, ) + self._vol_temp_reader = DynamixelVolTempReader( + self, + self.motor_ids, + vol_scale=vol_scale if vol_scale is not None else DEFAULT_VOL_SCALE, + temp_scale=temp_scale if temp_scale is not None else DEFAULT_TEMP_SCALE, + ) self._sync_writers = {} self.OPEN_CLIENTS.add(self) @@ -174,7 +189,7 @@ def set_torque_enabled(self, motor_ids: Sequence[int], enabled: bool, retries: int = -1, - retry_interval: float = 0.25): + retry_interval: float = 0.05): """Sets whether torque is enabled for the motors. Args: @@ -204,6 +219,10 @@ def read_pos_vel_cur(self) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """Returns the current positions and velocities.""" return self._pos_vel_cur_reader.read() + def read_vol_temp(self) -> Tuple[np.ndarray, np.ndarray]: + """Returns the current voltages and temperatures.""" + return self._vol_temp_reader.read() + def write_desired_pos(self, motor_ids: Sequence[int], positions: np.ndarray): """Writes the given desired positions. @@ -445,6 +464,42 @@ def _get_data(self): return (self._pos_data.copy(), self._vel_data.copy(), self._cur_data.copy()) +class DynamixelVolTempReader(DynamixelReader): + """Reads input volatage and temperature.""" + + def __init__(self, + client: DynamixelClient, + motor_ids: Sequence[int], + vol_scale: float = 1.0, + temp_scale: float = 1.0): + super().__init__( + client, + motor_ids, + address=ADDR_PRESENT_VOL_TEMP, + size=LEN_PRESENT_VOL_TEMP, + ) + self.vol_scale = vol_scale + self.temp_scale = temp_scale + + def _initialize_data(self): + """Initializes the cached data.""" + self._vol_data = np.zeros(len(self.motor_ids), dtype=np.float32) + self._temp_data = np.zeros(len(self.motor_ids), dtype=np.float32) + + def _update_data(self, index: int, motor_id: int): + """Updates the data index for the given motor ID.""" + vol = self.operation.getData(motor_id, ADDR_PRESENT_VOLTAGE, + LEN_PRESENT_VOLTAGE) + temp = self.operation.getData(motor_id, ADDR_PRESENT_TEMPERATURE, + LEN_PRESENT_TEMPERATURE) + vol = unsigned_to_signed(vol, size=2) + temp = unsigned_to_signed(temp, size=4) + self._vol_data[index] = float(vol) * self.vol_scale + self._temp_data[index] = float(temp) * self.temp_scale + + def _get_data(self): + """Returns a copy of the data.""" + return (self._vol_data.copy(), self._temp_data.copy()) # Register global cleanup function. atexit.register(dynamixel_cleanup_handler) @@ -481,9 +536,14 @@ def _get_data(self): dxl_client.write_desired_pos(motors, way_point) read_start = time.time() pos_now, vel_now, cur_now = dxl_client.read_pos_vel_cur() + vol_now, temp_now = dxl_client.read_vol_temp() + + if step % 5 == 0: print('[{}] Frequency: {:.2f} Hz'.format( step, 1.0 / (time.time() - read_start))) print('> Pos: {}'.format(pos_now.tolist())) print('> Vel: {}'.format(vel_now.tolist())) print('> Cur: {}'.format(cur_now.tolist())) + print('> Vol: {}'.format(vol_now.tolist())) + print('> Tmp: {}'.format(temp_now.tolist()))