1+ #!/usr/bin/env python3
2+ # -*- coding: utf-8 -*-
3+ import can
4+ import time ,sys
5+ import threading
6+ import numpy as np
7+ from enum import Enum
8+ from sensor_msgs .msg import JointState
9+
10+ class FrameProperty (Enum ):
11+ INVALID_FRAME_PROPERTY = 0x00
12+ JOINT_POSITION_RCO = 0x01
13+ MAX_PRESS_RCO = 0x02
14+ JOINT_POSITION2_RCO = 0x04
15+ JOINT_SPEED = 0x05
16+ REQUEST_DATA_RETURN = 0x09
17+ JOINT_POSITION_N = 0x11
18+ MAX_PRESS_N = 0x12
19+ HAND_NORMAL_FORCE = 0X20
20+ HAND_TANGENTIAL_FORCE = 0X21
21+ HAND_TANGENTIAL_FORCE_DIR = 0X22
22+ HAND_APPROACH_INC = 0X23
23+ MOTOR_TEMPERATURE_1 = 0x33
24+ MOTOR_TEMPERATURE_2 = 0x34
25+
26+ class LinkerHandL10Can :
27+ def __init__ (self ,can_id , can_channel = 'can0' , baudrate = 1000000 , ):
28+ self .x01 = [0 ] * 5
29+ self .x02 = [0 ] * 5
30+ self .x04 = [0 ] * 5
31+ self .x05 = [0 ] * 5
32+ self .x33 = self .x34 = [0 ] * 5
33+ # 故障码
34+ self .x35 ,self .x36 = [0 ] * 5 ,[0 ] * 5
35+ self .xb0 ,self .xb1 ,self .xb2 ,self .xb3 ,self .xb4 ,self .xb5 = [- 1 ] * 5 ,[- 1 ] * 5 ,[- 1 ] * 5 ,[- 1 ] * 5 ,[- 1 ] * 5 ,[- 1 ] * 5
36+ self .can_id = can_id
37+ self .joint_angles = [0 ] * 10
38+ self .pressures = [200 ] * 5 # 默认扭矩200
39+ self .bus = self .init_can_bus (can_channel , baudrate )
40+ self .normal_force , self .tangential_force , self .tangential_force_dir , self .approach_inc = [[0.0 ] * 5 for _ in range (4 )]
41+ self .version = None
42+ # 启动接收线程
43+ self .running = True
44+ self .receive_thread = threading .Thread (target = self .receive_response )
45+ self .receive_thread .daemon = True
46+ self .receive_thread .start ()
47+
48+ def init_can_bus (self , channel , baudrate ):
49+ if sys .platform == "linux" :
50+ return can .interface .Bus (channel = channel , interface = "socketcan" , bitrate = baudrate )
51+ elif sys .platform == "win32" :
52+ return can .interface .Bus (channel = 'PCAN_USBBUS1' , interface = 'pcan' , bitrate = baudrate )
53+ else :
54+ raise EnvironmentError ("Unsupported platform for CAN interface" )
55+
56+ def send_frame (self , frame_property , data_list ):
57+ """发送一个带有指定属性和数据的单个CAN帧。"""
58+ frame_property_value = int (frame_property .value ) if hasattr (frame_property , 'value' ) else frame_property
59+ data = [frame_property_value ] + [int (val ) for val in data_list ]
60+ msg = can .Message (arbitration_id = self .can_id , data = data , is_extended_id = False )
61+ try :
62+ self .bus .send (msg )
63+ except can .CanError as e :
64+ print (f"Failed to send message: { e } " )
65+ time .sleep (0.005 )
66+
67+ def set_joint_positions (self , joint_angles ):
68+ """将10个关节的位置设置(joint_angles: 10个数值的列表)。"""
69+ self .joint_angles = joint_angles
70+ # 分帧发送角度控制 L10协议为前6和后4分开
71+ self .send_frame (FrameProperty .JOINT_POSITION2_RCO , self .joint_angles [6 :])
72+ time .sleep (0.001 )
73+ self .send_frame (FrameProperty .JOINT_POSITION_RCO , self .joint_angles [:6 ])
74+
75+
76+ def set_max_torque_limits (self , pressures ,type = "get" ):
77+ """设置最大扭矩限制"""
78+ if type == "get" :
79+ self .pressures = [0.0 ]
80+ else :
81+ self .pressures = pressures [:5 ]
82+ #self.send_frame(FrameProperty.MAX_PRESS_RCO, self.pressures)
83+
84+
85+ def set_joint_speed_l10 (self ,speed = [180 ]* 5 ):
86+ self .x05 = speed
87+ for i in range (2 ):
88+ time .sleep (0.01 )
89+ self .send_frame (0x05 , speed )
90+ def set_speed (self ,speed = [180 ]* 5 ):
91+ self .x05 = speed
92+ for i in range (2 ):
93+ time .sleep (0.01 )
94+ self .send_frame (0x05 , speed )
95+ def request_all_status (self ):
96+ """获取所有关节位置和压力。"""
97+ self .send_frame (FrameProperty .REQUEST_DATA_RETURN , [])
98+ ''' -------------------压力传感器---------------------- '''
99+ def get_normal_force (self ):
100+ self .send_frame (FrameProperty .HAND_NORMAL_FORCE ,[])
101+
102+ def get_tangential_force (self ):
103+ self .send_frame (FrameProperty .HAND_TANGENTIAL_FORCE ,[])
104+
105+ def get_tangential_force_dir (self ):
106+ self .send_frame (FrameProperty .HAND_TANGENTIAL_FORCE_DIR ,[])
107+ def get_approach_inc (self ):
108+ self .send_frame (FrameProperty .HAND_APPROACH_INC ,[])
109+ ''' -------------------电机温度---------------------- '''
110+ def get_motor_temperature (self ):
111+ self .send_frame (FrameProperty .MOTOR_TEMPERATURE_1 ,[])
112+ self .send_frame (FrameProperty .MOTOR_TEMPERATURE_2 ,[])
113+ # 电机故障码
114+ def get_motor_fault_code (self ):
115+ self .send_frame (0x35 ,[])
116+ self .send_frame (0x36 ,[])
117+ def receive_response (self ):
118+ """接收CAN响应并处理."""
119+ while self .running :
120+ try :
121+ msg = self .bus .recv (timeout = 1.0 )
122+ if msg :
123+ self .process_response (msg )
124+ except can .CanError as e :
125+ print (f"Error receiving CAN message: { e } " )
126+
127+ def process_response (self , msg ):
128+ """处理接收到的CAN消息。"""
129+ if msg .arbitration_id == self .can_id :
130+ frame_type = msg .data [0 ]
131+ response_data = msg .data [1 :]
132+ if frame_type == FrameProperty .JOINT_POSITION_RCO .value : # 0x01
133+ self .x01 = list (response_data ) #
134+ elif frame_type == FrameProperty .MAX_PRESS_RCO .value : # 0x02
135+ self .x02 = list (response_data )
136+ elif frame_type == FrameProperty .JOINT_POSITION2_RCO .value : # 0x04
137+ self .x04 = list (response_data )
138+ elif frame_type == 0x05 :
139+ #self.x05 = list(response_data)
140+ pass
141+ elif frame_type == 0x20 :
142+ #ColorMsg(msg=f"五指法向压力:{list(response_data)}")
143+ d = list (response_data )
144+ self .normal_force = [float (i ) for i in d ]
145+ elif frame_type == 0x21 :
146+ #ColorMsg(msg=f"五指切向压力:{list(response_data)}")
147+ d = list (response_data )
148+ self .tangential_force = [float (i ) for i in d ]
149+ elif frame_type == 0x22 :
150+ #ColorMsg(msg=f"五指切向压力方向:{list(response_data)}")
151+ d = list (response_data )
152+ self .tangential_force_dir = [float (i ) for i in d ]
153+ elif frame_type == 0x23 :
154+ #ColorMsg(msg=f"五指接近度:{list(response_data)}")
155+ d = list (response_data )
156+ self .approach_inc = [float (i ) for i in d ]
157+ elif frame_type == 0x33 :
158+ self .x33 = list (response_data )
159+ elif frame_type == 0x34 :
160+ self .x34 = list (response_data )
161+ elif frame_type == 0x35 :
162+ self .x35 = list (response_data )
163+ elif frame_type == 0x36 :
164+ self .x36 = list (response_data )
165+ elif frame_type == 0xb0 :
166+ self .xb0 = list (response_data )
167+ elif frame_type == 0xb1 :
168+ self .xb1 = list (response_data )
169+ elif frame_type == 0xb2 :
170+ self .xb2 = list (response_data )
171+ elif frame_type == 0xb3 :
172+ self .xb3 = list (response_data )
173+ elif frame_type == 0xb4 :
174+ self .xb4 = list (response_data )
175+ elif frame_type == 0xb5 :
176+ self .xb5 = list (response_data )
177+ elif frame_type == 0x64 :
178+ self .version = list (response_data )
179+
180+ def set_torque (self ,torque = []):
181+ '''设置最大扭矩'''
182+ self .send_frame (0x02 , torque )
183+ def get_version (self ):
184+ '''获取版本'''
185+ self .send_frame (0x64 ,[])
186+ time .sleep (0.001 )
187+ return self .version
188+ def get_current_status (self ):
189+ '''获取当前关节状态'''
190+ self .send_frame (0x01 ,[])
191+ time .sleep (0.001 )
192+ self .send_frame (0x04 ,[])
193+ time .sleep (0.001 )
194+ return self .x01 + self .x04
195+ def get_speed (self ):
196+ '''获取当前速度'''
197+ return self .x05
198+ def get_press (self ):
199+ '''获取当前扭矩'''
200+ self .set_max_torque_limits (pressures = [0.0 ], type = "get" )
201+ time .sleep (0.001 )
202+ return self .x02
203+ def get_force (self ):
204+ '''获取压感数据'''
205+ return [self .normal_force ,self .tangential_force , self .tangential_force_dir , self .approach_inc ]
206+ def get_temperature (self ):
207+ '''获取电机当前温度'''
208+ self .get_motor_temperature ()
209+ return self .x33 + self .x34
210+
211+ def get_touch_type (self ):
212+ '''获取触摸类型'''
213+ self .send_frame (0xb1 ,[])
214+ time .sleep (0.002 )
215+ if len (self .xb1 ) == 2 :
216+ return 2
217+ else :
218+ return - 1
219+
220+ def get_touch (self ):
221+ '''获取触摸数据'''
222+ self .send_frame (0xb1 ,[])
223+ self .send_frame (0xb2 ,[])
224+ self .send_frame (0xb3 ,[])
225+ self .send_frame (0xb4 ,[])
226+ self .send_frame (0xb5 ,[])
227+ return [self .xb1 [1 ],self .xb2 [1 ],self .xb3 [1 ],self .xb4 [1 ],self .xb5 [1 ],0 ] # 最后一位是手掌,目前没有
228+
229+ def get_torque (self ):
230+ '''暂不支持'''
231+
232+ return [- 1 ] * 5
233+ def get_fault (self ):
234+ '''获取电机故障'''
235+ self .get_motor_fault_code ()
236+ return self .x35 + self .x36
237+ def get_current (self ):
238+ '''获取电流'''
239+ return [None ]* 10
240+ def close_can_interface (self ):
241+ """Stop the CAN communication."""
242+ self .running = False
243+ if self .receive_thread .is_alive ():
244+ self .receive_thread .join ()
245+ if self .bus :
246+ self .bus .shutdown ()
0 commit comments