From 62300f3753a7c8b0563cfa96f5dbe4b2de08c572 Mon Sep 17 00:00:00 2001 From: Laniakearr Date: Sat, 19 Jul 2025 13:48:13 -0400 Subject: [PATCH] add ros_2 bridge related files --- .../ros2_bridge/hand_test.py | 106 ++++++++++++++++++ .../ros2_bridge/test_publisher.py | 30 +++++ .../ros2_bridge/test_subscriber.py | 37 ++++++ 3 files changed, 173 insertions(+) create mode 100644 autonomy/simulation/Wato_additional_scripts/ros2_bridge/hand_test.py create mode 100644 autonomy/simulation/Wato_additional_scripts/ros2_bridge/test_publisher.py create mode 100644 autonomy/simulation/Wato_additional_scripts/ros2_bridge/test_subscriber.py diff --git a/autonomy/simulation/Wato_additional_scripts/ros2_bridge/hand_test.py b/autonomy/simulation/Wato_additional_scripts/ros2_bridge/hand_test.py new file mode 100644 index 0000000..91502ce --- /dev/null +++ b/autonomy/simulation/Wato_additional_scripts/ros2_bridge/hand_test.py @@ -0,0 +1,106 @@ +import argparse +from isaaclab.app import AppLauncher + +parser = argparse.ArgumentParser(description="Humanoid Hand Testing") +AppLauncher.add_app_launcher_args(parser) +args_cli = parser.parse_args() + +app_launcher = AppLauncher(args_cli) +simulation_app = app_launcher.app + +import torch +import rclpy + +import isaaclab.sim as sim_utils +from isaaclab.assets import AssetBaseCfg +from isaaclab.managers import SceneEntityCfg +from isaaclab.scene import InteractiveScene, InteractiveSceneCfg +from isaaclab.utils import configclass + +from HumanoidRL.HumanoidRLPackage.HumanoidRLSetup.modelCfg.humanoid import HAND_CFG +from isaaclab.utils.assets import ISAACLAB_NUCLEUS_DIR +from test_publisher import TestFloatPublisher +from test_subscriber import HandPoseSubscriber + +@configclass +class HandSceneCfg(InteractiveSceneCfg): + ground = AssetBaseCfg( + prim_path="/World/defaultGroundPlane", + spawn=sim_utils.GroundPlaneCfg(), + init_state=AssetBaseCfg.InitialStateCfg(pos=(0.0, 0.0, -1.05)), + ) + + dome_light = AssetBaseCfg( + prim_path="/World/Light", + spawn=sim_utils.DomeLightCfg(intensity=3000.0, color=(0.75, 0.75, 0.75)) + ) + + robot = HAND_CFG.replace(prim_path="{ENV_REGEX_NS}/Robot") + +def run_simulator(sim: sim_utils.SimulationContext, scene: InteractiveScene, node, test_node): + robot = scene["robot"] + robot_entity_cfg = SceneEntityCfg("robot", joint_names=[".*"], body_names=[".*"]) + robot_entity_cfg.resolve(scene) + + # Debug joint configuration + print(f"Number of joints: {len(robot_entity_cfg.joint_ids)}") + print(f"Joint IDs: {robot_entity_cfg.joint_ids}") + print(f"Default joint positions shape: {robot.data.default_joint_pos.shape}") + + sim_dt = sim.get_physics_dt() + joint_position = robot.data.default_joint_pos.clone() + joint_vel = robot.data.default_joint_vel.clone() + robot.write_joint_state_to_sim(joint_position, joint_vel) + + while simulation_app.is_running(): + if rclpy.ok(): + rclpy.spin_once(node, timeout_sec=0.01) + rclpy.spin_once(test_node, timeout_sec=0.01) + + joint_values = node.get_latest_joint_positions() + if joint_values and len(joint_values) == 15: + print(f"[INFO]: Received joint values: {joint_values}") + # Ensure joint_values is a list of floats + joint_tensor = torch.tensor(joint_values, device=sim.device) + joint_pos_des = joint_tensor.unsqueeze(0)[:, :len(robot_entity_cfg.joint_ids)].clone() + robot.reset() + robot.set_joint_position_target(joint_pos_des, joint_ids=robot_entity_cfg.joint_ids) + else: + print("[INFO]: No valid joint values received, using default positions.") + joint_position_list = [[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]] + joint_position = torch.tensor(joint_position_list[0], device=sim.device) + robot.reset() + joint_pos_des = joint_position.unsqueeze(0)[:, robot_entity_cfg.joint_ids].clone() + robot.set_joint_position_target(joint_pos_des, joint_ids=robot_entity_cfg.joint_ids) + + + scene.write_data_to_sim() + sim.step() + scene.update(sim_dt) + +def main(): + # Initialize ROS2 + rclpy.init() + node = HandPoseSubscriber() + test_node = TestFloatPublisher() + + # Initialize simulation + sim_cfg = sim_utils.SimulationCfg(dt=0.01, device=args_cli.device) + sim = sim_utils.SimulationContext(sim_cfg) + sim.set_camera_view([2.5, 2.5, 2.5], [0.0, 0.0, 0.0]) + + scene_cfg = HandSceneCfg(num_envs=1, env_spacing=2.0) + scene = InteractiveScene(scene_cfg) + sim.reset() + + print("[INFO]: Setup complete...") + run_simulator(sim, scene, node, test_node) + + # Cleanup + node.destroy_node() + test_node.destroy_node() + rclpy.shutdown() + simulation_app.close() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/autonomy/simulation/Wato_additional_scripts/ros2_bridge/test_publisher.py b/autonomy/simulation/Wato_additional_scripts/ros2_bridge/test_publisher.py new file mode 100644 index 0000000..5ca72da --- /dev/null +++ b/autonomy/simulation/Wato_additional_scripts/ros2_bridge/test_publisher.py @@ -0,0 +1,30 @@ +import rclpy +from rclpy.node import Node +from std_msgs.msg import Float32MultiArray + +class TestFloatPublisher(Node): + def __init__(self): + super().__init__('test_float_publisher') + self.publisher_ = self.create_publisher(Float32MultiArray, '/hand_joint_positions', 10) + self.timer = self.create_timer(0.1, self.timer_callback) + self.get_logger().info('Test Float Publisher initialized') + + def timer_callback(self): + msg = Float32MultiArray() + # Publish a list of 15 floats (e.g., all 1.0 for testing) + msg.data = [2.0] * 15 + self.publisher_.publish(msg) + self.get_logger().info(f'Published joint positions: {msg.data}') + +def main(args=None): + rclpy.init(args=args) + node = TestFloatPublisher() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + node.destroy_node() + rclpy.shutdown() + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/autonomy/simulation/Wato_additional_scripts/ros2_bridge/test_subscriber.py b/autonomy/simulation/Wato_additional_scripts/ros2_bridge/test_subscriber.py new file mode 100644 index 0000000..11cbe3e --- /dev/null +++ b/autonomy/simulation/Wato_additional_scripts/ros2_bridge/test_subscriber.py @@ -0,0 +1,37 @@ +import rclpy +from rclpy.node import Node +from std_msgs.msg import Float32MultiArray + +class HandPoseSubscriber(Node): + def __init__(self): + super().__init__('hand_pose_subscriber') + self.subscription = self.create_subscription( + Float32MultiArray, + '/hand_joint_positions', + self.listener_callback, + 10) + self.joint_positions = None + self.get_logger().info('Hand Pose Subscriber initialized') + + def listener_callback(self, msg): + if len(msg.data) == 15: + self.joint_positions = list(msg.data) + self.get_logger().info(f'Received joint positions: {self.joint_positions}') + else: + self.get_logger().warn(f'Received invalid joint positions: expected 15 floats, got {len(msg.data)}') + + def get_latest_joint_positions(self): + return self.joint_positions + +def main(args=None): + rclpy.init(args=args) + node = HandPoseSubscriber() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + node.destroy_node() + rclpy.shutdown() + +if __name__ == '__main__': + main() \ No newline at end of file