Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions emioapi/_motorgroup.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,15 +257,15 @@ def setOperatingMode(self, mode):
self.packetHandler.write1ByteTxRx(self.portHandler, DXL_ID, self.parameters.ADDR_OPERATING_MODE, mode)


def setInVelocityMode(self):
def enableVelocityMode(self):
self.setOperatingMode(self.parameters.VELOCITY_MODE)


def setInExtendedPositionMode(self):
def enableExtendedPositionMode(self):
self.setOperatingMode(self.parameters.EXT_POSITION_MODE)


def setInPositionMode(self):
def enablePositionMode(self):
self.setOperatingMode(self.parameters.POSITION_MODE)


Expand Down
5 changes: 3 additions & 2 deletions emioapi/emioapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,17 +167,18 @@ def listCameraDevices():
return EmioCamera.listCameras()


def connectToEmioDevice(self, device_name: str=None) -> bool:
def connectToEmioDevice(self, device_name: str=None, multi_turn: bool=False) -> bool:
"""
Connect to the emio device with the given name.

Args:
device_name: str: The name of the device to connect to. If None, the first device found that is not used will be used.
multi_turn: bool: Whether to enable the multi-turn mode of the motors. In multi-turn mode on, the angles interval is [-256*2π, 256*2π]

Returns:
True if the connection is successful, False otherwise.
"""
connected_index = self.motors.findAndOpen(device_name)
connected_index = self.motors.findAndOpen(device_name, multi_turn)

if connected_index>=0: # try to connect to the camera if foud a Emio to connect to
self.device_index = connected_index
Expand Down
47 changes: 32 additions & 15 deletions emioapi/emiomotors.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from dataclasses import field
from threading import Lock
from math import pi

import emioapi._motorgroup as motorgroup
import emioapi._emiomotorsparameters as emioparameters
Expand Down Expand Up @@ -39,7 +40,7 @@ class EmioMotors:

_initialized: bool = False
_length_to_rad: float = 1.0 / 20.0 # 1/radius of the pulley
_rad_to_pulse: int = 4096 / (2 * 3.1416) # the resolution of the Dynamixel xm430 w210
_rad_to_pulse: int = 4096 / (2 * pi) # the resolution of the Dynamixel xm430 w210
_length_to_pulse: int = _length_to_rad * _rad_to_pulse
_pulse_center: int = 2048
_max_vel: float = 1000 # *0.01 rev/min
Expand Down Expand Up @@ -113,11 +114,11 @@ def pulseToDeg(self, pulse: list):
Returns:
A list of angles in degrees for each motor.
"""
return [(self._pulse_center - float(item)) / self._rad_to_pulse * 180.0 / 3.1416 for item in pulse]
return [(self._pulse_center - float(item)) / self._rad_to_pulse * 180.0 / pi for item in pulse]


def _openAndConfig(self, device_name: str=None) -> bool:
"""Open the connection to the motors, configure it for position mode and enable torque sensing."""
def _openAndConfig(self, device_name: str=None, multi_turn:bool = False) -> bool:
"""Open the connection to the motors, configure it for position mode in multi-turn or not and enable torque sensing."""
with self._lock:
try:
self._mg.updateDeviceName(device_name)
Expand All @@ -128,36 +129,41 @@ def _openAndConfig(self, device_name: str=None) -> bool:

self._mg.open()
self._mg.clearPort()
self._mg.setInPositionMode()
if multi_turn:
self.enableExtendedPositionMode()
else:
self.enablePositionMode()
self._mg.enableTorque()

logger.debug(f"Motor group opened and configured. Device name: {self._mg.deviceName}")
logger.debug(f"Motor group opened and configured. Device name: {self._mg.deviceName}, Multi turn activated: {multi_turn}")
return True
except Exception as e:
logger.error(f"Failed to open and configure the motor group: {e}")
return False


def open(self, device_name: str=None) -> bool:
def open(self, device_name: str=None, multi_turn: bool=False) -> bool:
"""
Open the connection to the motors.

Args:
device_name: str: if set, it will connect to the device with the given name, If not set, the first emio device will be used.
multi_turn: bool: Whether to enable the multi-turn mode of the motors. In multi-turn mode on, the angles interval is [-256*2π, 256*2π]
"""
if self._openAndConfig(device_name):
if self._openAndConfig(device_name, multi_turn):
self._device_index = motorgroup.listMotors().index(self._mg.deviceName)
logger.info(f"Connected to emio device: {self._mg.deviceName}")
return True
return False


def findAndOpen(self, device_name: str=None) -> int:
def findAndOpen(self, device_name: str=None, multi_turn: bool=False) -> int:
"""
Iterate over the serial ports and try to conenct to the first available emio motors.

Args:
device_name: str: If set, It will try to connected to the given device name (port name)
multi_turn: bool: Whether to enable the multi-turn mode of the motors. In multi-turn mode on, the angles interval is [-256*2π, 256*2π]

Returns:
the index in the list of port to which it connected. If no connection was possible, returns -1.
Expand All @@ -166,8 +172,7 @@ def findAndOpen(self, device_name: str=None) -> int:
try:
index = motorgroup.listMotors().index(device_name)
logger.info(f"Trying given emio number {index} on port: {device_name}.")
self.open(device_name)
return index if len(motorgroup.listMotors())>0 and self.open(device_name) else -1
return index if len(motorgroup.listMotors())>0 and self.open(device_name, multi_turn) else -1
except:
return -1

Expand All @@ -179,7 +184,7 @@ def findAndOpen(self, device_name: str=None) -> int:
while not connected and index<len(motorgroup.listMotors()):
device_name = motorgroup.listMotors()[index]
logger.info(f"Trying emio number {index} on port: {device_name}.")
connected = self.open(device_name)
connected = self.open(device_name, multi_turn)

if connected:
self._device_index = index
Expand All @@ -193,15 +198,27 @@ def findAndOpen(self, device_name: str=None) -> int:
def close(self):
"""Close the connection to the motors."""
with self._lock:
self._mg.close()
logger.info("Motors connection closed.")
try:
self._mg.close()
logger.info("Motors connection closed.")
except Exception as e:
print(e)


def printStatus(self):
"""Print the current position of the motors."""
with self._lock:
logger.info(f"Current position of the motors in radians: {[ a%3.14 for a in self.pulseToRad(self._mg.getCurrentPosition())]}")
logger.info(f"Current position of the motors in radians: {[ a%pi for a in self.pulseToRad(self._mg.getCurrentPosition())]}\n"
+f"{' '*75} in pulses: {[ a for a in self._mg.getCurrentPosition()]}\n"
+f"{' '*75} in degrees: {[ a*180/pi for a in self.pulseToRad(self._mg.getCurrentPosition())]}")


def enablePositionMode(self):
self._mg.enablePositionMode()


def enableExtendedPositionMode(self):
self._mg.enableExtendedPositionMode()


####################
Expand Down
68 changes: 68 additions & 0 deletions examples/motors_multiturn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#!/usr/bin/env -S uv run --script

import time
import logging
import os
import sys
import locale
from math import pi

sys.path.append(os.path.dirname(os.path.realpath(__file__))+'/..')
from emioapi import EmioMotors
from emioapi._logging_config import logger

def main(emio: EmioMotors, loops=1):

initial_pos_pulse = [0] * 4
emio.max_velocity = [1000] * 4
logger.info(f"Initial position in rad: {initial_pos_pulse}")
emio.angles = initial_pos_pulse
time.sleep(1)
emio.printStatus()

angle_command = ""
while True:
angle_command = input("Enter an angle for the motor [-256*360, 256*360]: ")
try:
if angle_command == "quit":
break
new_angle = locale.atof(angle_command)
new_angle = new_angle*pi/180
new_pos = [new_angle]*4
print("-"*20)
logger.info(f"new_pos {new_pos}")
if emio.is_connected:
emio.angles = new_pos
time.sleep(1)
emio.printStatus()
else:
emio.open(multi_turn=True)
except Exception as e:
logger.error(f"Error during communication: {e}")
emio.close()
emio.open()


if __name__ == "__main__":
try:
logger.info("Starting EMIO API test...")
logger.info("Opening and configuring EMIO API...")

emio_motors = EmioMotors()

if emio_motors.open():

emio_motors.printStatus()

logger.info("Emio motors opened and configured.")
logger.info("Running main function...")
main(emio_motors, 15)

logger.info("Main function completed.")
logger.info("Closing Emio motor connection...")

emio_motors.close()
logger.info("Emio connection closed.")
except Exception as e:
logger.exception(f"An error occurred: {e}")
emio_motors.close()