Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: Lint

on:
push:
branches: [ "main" ]
paths:
- 'software/**'
pull_request:
branches: [ "main" ]
paths:
- 'software/**'

jobs:
lint:
runs-on: ubuntu-latest
defaults:
run:
working-directory: ./software

steps:
- uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.10"

- name: Install dependencies
run: |
python -m pip install --upgrade pip
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
pip install ruff

- name: Run Ruff (Linting)
run: ruff check .

6 changes: 4 additions & 2 deletions software/examples/alohamini/record_bi.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ def main():
parser.add_argument("--task_description", type=str, default="My task description4", help="Task description")
parser.add_argument("--remote_ip", type=str, default="127.0.0.1", help="Robot host IP")
parser.add_argument("--robot_id", type=str, default="lekiwi_host", help="Robot ID")
parser.add_argument("--left_arm_port", type=str, default="/dev/am_arm_leader_left", help="Left leader arm port")
parser.add_argument("--right_arm_port", type=str, default="/dev/am_arm_leader_right", help="Right leader arm port")
args = parser.parse_args()

# === Robot and teleop config ===
robot_config = LeKiwiClientConfig(remote_ip=args.remote_ip, id=args.robot_id)
leader_arm_config = BiSO100LeaderConfig(
left_arm_port="/dev/am_arm_leader_left",
right_arm_port="/dev/am_arm_leader_right",
left_arm_port=args.left_arm_port,
right_arm_port=args.right_arm_port,
id="so101_leader_bi3",
)
keyboard_config = KeyboardTeleopConfig()
Expand Down
6 changes: 4 additions & 2 deletions software/examples/alohamini/teleoperate_bi.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
parser.add_argument("--use_dummy", action="store_true", help="Do not connect robot, only print actions")
parser.add_argument("--fps", type=int, default=30, help="Main loop frequency (frames per second)")
parser.add_argument("--remote_ip", type=str, default="127.0.0.1", help="LeKiwi host IP address")
parser.add_argument("--left_arm_port", type=str, default="/dev/am_arm_leader_left", help="Left leader arm port")
parser.add_argument("--right_arm_port", type=str, default="/dev/am_arm_leader_right", help="Right leader arm port")

args = parser.parse_args()

Expand All @@ -27,8 +29,8 @@
# Create configs
robot_config = LeKiwiClientConfig(remote_ip=args.remote_ip, id="my_alohamini")
bi_cfg = BiSO100LeaderConfig(
left_arm_port="/dev/am_arm_leader_left",
right_arm_port="/dev/am_arm_leader_right",
left_arm_port=args.left_arm_port,
right_arm_port=args.right_arm_port,
id="so101_leader_bi3",
)
leader = BiSO100Leader(bi_cfg)
Expand Down
6 changes: 4 additions & 2 deletions software/examples/alohamini/teleoperate_bi_voice.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
parser.add_argument("--use_dummy", action="store_true", help="Do not connect robot, only print actions")
parser.add_argument("--fps", type=int, default=30, help="Main loop frequency (frames per second)")
parser.add_argument("--remote_ip", type=str, default="127.0.0.1", help="Alohamini host IP address")
parser.add_argument("--left_arm_port", type=str, default="/dev/am_arm_leader_left", help="Left leader arm port")
parser.add_argument("--right_arm_port", type=str, default="/dev/am_arm_leader_right", help="Right leader arm port")
args = parser.parse_args()

USE_DUMMY = args.use_dummy
Expand All @@ -29,8 +31,8 @@
# Create configs
robot_config = LeKiwiClientConfig(remote_ip=args.remote_ip, id="my_alohamini")
bi_cfg = BiSO100LeaderConfig(
left_arm_port="/dev/am_arm_leader_left",
right_arm_port="/dev/am_arm_leader_right",
left_arm_port=args.left_arm_port,
right_arm_port=args.right_arm_port,
id="so101_leader_bi3",
)

Expand Down
54 changes: 27 additions & 27 deletions software/examples/debug/motors.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
OperatingMode,
)

DEFAULT_PORT = "/dev/ttyACM0"
DEFAULT_PORT = os.getenv("FEETECH_PORT", "/dev/ttyACM0")
HALF_TURN_DEGREE = 180

GENERAL_ACTIONS = {"sleep", "print"}
Expand All @@ -24,8 +24,8 @@

def probe_scan_ids(port: str) -> dict[int, str]:
"""
探针扫描 ID 范围,返回 {id: model_name}(只含在线电机)。
仅做 ping,不对电机写寄存器;断开时不关力矩。
Probe ID range and return {id: model_name} (only online motors).
Only ping, do not write to registers; do not disable torque on disconnect.
"""

probe_bus = build_bus(port, {})
Expand Down Expand Up @@ -58,7 +58,7 @@ def probe_scan_ids(port: str) -> dict[int, str]:

def build_motors_from_scan(port: str):
"""
基于 probe_scan_ids 结果,构建 motors 字典(名称统一为 motor_<id>)。
Build motors dict based on probe_scan_ids results (names unified as motor_<id>).
"""
from lerobot.motors.motors_bus import Motor, MotorNormMode
found = probe_scan_ids(port)
Expand Down Expand Up @@ -125,13 +125,13 @@ def _motor_angle_from_position(position):

def get_motors_states(port):
"""
动态表格显示电机状态;总是先扫描 [scan_start, scan_end],只显示在线电机。
- 传了 motors 也不会直接用,而是按 ID 范围扫描后重建 motors(保证不读离线 ID)
Display motor states in a dynamic table; always scan [scan_start, scan_end] first, showing only online motors.
- Even if motors are passed, they are not used directly; motors are rebuilt after scanning the ID range (ensuring no offline IDs are read)
"""
import time, sys, shutil
from lerobot.motors.motors_bus import Motor, MotorNormMode

# ---------- ANSI 工具 ----------
# ---------- ANSI Utils ----------
CSI = "\x1b["
def _hide_cursor(): sys.stdout.write(f"{CSI}?25l"); sys.stdout.flush()
def _show_cursor(): sys.stdout.write(f"{CSI}?25h"); sys.stdout.flush()
Expand All @@ -152,17 +152,17 @@ def F(v, w, a=">"):
f"{F(st.get('Temperature'),4)} | {F(st.get('Port','-'),16,'<')}")
return row[:maxw] if len(row) > maxw else row

# ---------- 第一步:用探针扫描在线电机 ----------
# ---------- Step 1: Scan for online motors using probe ----------

motors = build_motors_from_scan(port)
if not motors:
print(f"No motors found in ID range [{SCAN_START}, {SCAN_END}] on {port}.")
return


# ---------- 第二步:用“只包含在线电机”的字典进入动态显示 ----------
# ---------- Step 2: Enter dynamic display using dict containing only online motors ----------
bus = build_bus(port, motors)
if not _connect_bus(bus): # 你的辅助函数里保持 handshake=False 更稳
if not _connect_bus(bus): # Keep handshake=False in helper function for stability
return

try:
Expand Down Expand Up @@ -213,7 +213,7 @@ def F(v, w, a=">"):


maxw = _term_width()
sep = "-" * min(maxw, 140) # 120 放宽到 140
sep = "-" * min(maxw, 140) # Widened from 120 to 140
header = (f"{'NAME':<15} | {'ID':>3} | {'POS':>6} | {'OFF':>6} | {'ANG':>6} | "
f"{'LOAD':>6} | {'ACC':>6} | {'VOLT':>4} | {'CURR(MA)':>8} | {'TEMP':>4} | PORT")
header = header[:maxw] if len(header) > maxw else header
Expand All @@ -235,7 +235,7 @@ def F(v, w, a=">"):
except KeyboardInterrupt:
pass
finally:
# 关键:不要在断开时去“对所有电机”关力矩,避免离线 ID 报错
# Key: Do not disable torque for "all motors" on disconnect to avoid errors from offline IDs.
try:
bus.disconnect(disable_torque=False)
finally:
Expand All @@ -248,9 +248,9 @@ def configure_motor_id(port: str, current_id: int, new_id: int):
current_id = int(current_id)
new_id = int(new_id)
if current_id == new_id:
raise SystemExit("current_id == new_id,没有必要改。")
raise SystemExit("current_id == new_id, no change needed.")

# 1) 先用“裸总线”去 ping 校验(不带 motors 映射,避免库做多余事)
# 1) Ping check using "bare bus" (without motors mapping to avoid extra library actions)
probe_bus = FeetechMotorsBus(port=port, motors={})
try:
probe_bus.connect(handshake=False)
Expand All @@ -265,11 +265,11 @@ def configure_motor_id(port: str, current_id: int, new_id: int):
pass

if not ok_cur:
raise SystemExit(f"[ABORT] 未发现 ID={current_id} 在线,放弃改 ID。")
raise SystemExit(f"[ABORT] ID={current_id} not online, aborting ID change.")
if ok_new:
raise SystemExit(f"[ABORT] 目标 ID={new_id} 已被占用,放弃改 ID。")
raise SystemExit(f"[ABORT] Target ID={new_id} is occupied, aborting ID change.")

# 2) 用“只包含 current_id 的单电机字典”建立总线(名字无所谓,只要 id current_id
# 2) Build bus with single-motor dict containing current_id (name doesn't matter, as long as id is current_id)
tmp_name = f"motor_{current_id}"
one_motor = {
tmp_name: Motor(id=current_id, model=DEFAULT_FEETECH_MODEL, norm_mode=MotorNormMode.RANGE_0_100)
Expand All @@ -280,7 +280,7 @@ def configure_motor_id(port: str, current_id: int, new_id: int):
bus.connect(handshake=False)
print(f"Connected on port {bus.port} (current_id={current_id})")

# 保险:解锁 & 力矩处理(按你电机需要,可调整/删掉)
# Safety: Unlock & Torque handling (adjust/remove as needed)
try:
bus.write("Lock", tmp_name, 0, normalize=False)
except Exception:
Expand All @@ -290,12 +290,12 @@ def configure_motor_id(port: str, current_id: int, new_id: int):
except Exception:
pass

# 3) 直接对“当前 id 的那颗”写寄存器:ID = new_id
# 3) Directly write register for "current id": ID = new_id
bus.write("ID", tmp_name, new_id, normalize=False)
time.sleep(0.2)

# 4) 改后校验:当前 id 应该失联,new_id 应该在线
# 这里用一个“空 bus”去 ping,避免受 motors 映射影响
# 4) Post-change verify: current id should be offline, new_id should be online
# Use an "empty bus" to ping here to avoid being affected by motors mapping
probe_bus2 = FeetechMotorsBus(port=port, motors={})
try:
probe_bus2.connect(handshake=False)
Expand All @@ -309,7 +309,7 @@ def configure_motor_id(port: str, current_id: int, new_id: int):

if ok_cur2 or not ok_new2:
raise SystemExit(
f"[VERIFY FAIL] 改后验证失败:ID={current_id} 仍在线={ok_cur2}, ID={new_id} 在线={ok_new2}"
f"[VERIFY FAIL] Post-change verify failed: ID={current_id} still online={ok_cur2}, ID={new_id} online={ok_new2}"
)

print(f"[OK] Changed ID {current_id} -> {new_id} (model={DEFAULT_FEETECH_MODEL})")
Expand Down Expand Up @@ -386,16 +386,16 @@ def move_motor_to_position(
position: int,
):
"""
直接用数值ID控制电机到指定 position(原始ticks)。
- 不需要 motors{},不需要电机名。
- 默认确保处于“位置模式”,必要时切换。
Control motor directly by numeric ID to specified position (raw ticks).
- No motors{} or motor names needed.
- Default to ensuring "Position Mode", switching if necessary.
"""
tmp_name = f"motor_{int(motor_id)}"
one_motor = {
tmp_name: Motor(id=int(motor_id), model=DEFAULT_FEETECH_MODEL, norm_mode=MotorNormMode.RANGE_0_100)
}

# 可选:简单范围夹取(12bit 0~4095;按你家实际寄存器范围调整)
# Optional: Simple range clamping (12bit 0~4095; adjust according to your register range)
try:
position = int(position)
position = max(0, min(4095, position))
Expand All @@ -407,7 +407,7 @@ def move_motor_to_position(
bus.connect(handshake=False)
print(f"Connected on port {bus.port} (ID={motor_id})")

# 若不确定电机当前模式,保险起见切到“位置模式”
# Switch to "Position Mode" for safety if current mode is uncertain
bus.disable_torque(tmp_name)
bus.write("Operating_Mode", tmp_name, OperatingMode.POSITION.value, normalize=False)
bus.enable_torque(tmp_name)
Expand Down
5 changes: 3 additions & 2 deletions software/src/lerobot/robots/alohamini/config_lekiwi.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from dataclasses import dataclass, field

from lerobot.cameras.configs import CameraConfig, Cv2Rotation
Expand Down Expand Up @@ -43,8 +44,8 @@ def lekiwi_cameras_config() -> dict[str, CameraConfig]:
@RobotConfig.register_subclass("lekiwi")
@dataclass
class LeKiwiConfig(RobotConfig):
left_port: str = "/dev/am_arm_follower_left" # port to connect to the bus
right_port: str = "/dev/am_arm_follower_right" # port to connect to the bus
left_port: str = os.getenv("LEKIWI_LEFT_PORT", "/dev/am_arm_follower_left") # port to connect to the bus
right_port: str = os.getenv("LEKIWI_RIGHT_PORT", "/dev/am_arm_follower_right") # port to connect to the bus
disable_torque_on_disconnect: bool = True

# `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes.
Expand Down
Loading
Loading