From 86ae160301a4484c65eb801d5b25c5eaa7b10f2b Mon Sep 17 00:00:00 2001 From: wangyukai Date: Mon, 8 Dec 2025 08:26:13 +0000 Subject: [PATCH] update flash, add flash with collision controller --- internnav/configs/evaluator/__init__.py | 2 + .../configs/evaluator/vln_default_config.py | 3 + .../controllers/__init__.py | 3 + .../h1_vln_move_by_flash_controller.py | 2 - ...move_by_flash_with_collision_controller.py | 185 ++++++++++++++++++ .../sensors/vln_camera.py | 1 + internnav/evaluator/utils/common.py | 2 +- .../eval/configs/h1_internvla_n1_async_cfg.py | 5 +- 8 files changed, 198 insertions(+), 5 deletions(-) create mode 100644 internnav/env/utils/internutopia_extension/controllers/vln_move_by_flash_with_collision_controller.py diff --git a/internnav/configs/evaluator/__init__.py b/internnav/configs/evaluator/__init__.py index 27e63a3..1bd4419 100644 --- a/internnav/configs/evaluator/__init__.py +++ b/internnav/configs/evaluator/__init__.py @@ -44,10 +44,12 @@ class TaskCfg(BaseModel): robot_name: Optional[str] = None robot: Optional[RobotCfg] = None robot_flash: Optional[bool] = None + flash_collision: Optional[bool] = None robot_usd_path: Optional[str] = None camera_resolution: Optional[List[int]] = None metric: Optional[MetricCfg] = None camera_prim_path: Optional[str] = None + one_step_stand_still: Optional[bool] = None class EvalDatasetCfg(BaseModel): diff --git a/internnav/configs/evaluator/vln_default_config.py b/internnav/configs/evaluator/vln_default_config.py index 71bb696..e22841f 100644 --- a/internnav/configs/evaluator/vln_default_config.py +++ b/internnav/configs/evaluator/vln_default_config.py @@ -239,6 +239,9 @@ def get_config(evaluator_cfg: EvalCfg): # add the flash controller in, by flash flag. if evaluator_cfg.task.robot_flash: + vln_move_by_flash_cfg.type = ( + 'VlnMoveByFlashCollisionController' if evaluator_cfg.task.flash_collision else 'VlnMoveByFlashController' + ) robot.controllers.append(ControllerCfg(controller_settings=vln_move_by_flash_cfg.model_dump())) if evaluator_cfg.task.robot_flash or evaluator_cfg.eval_settings.get('vis_output', True): diff --git a/internnav/env/utils/internutopia_extension/controllers/__init__.py b/internnav/env/utils/internutopia_extension/controllers/__init__.py index 10eaa68..f33bb02 100644 --- a/internnav/env/utils/internutopia_extension/controllers/__init__.py +++ b/internnav/env/utils/internutopia_extension/controllers/__init__.py @@ -3,5 +3,8 @@ from .h1_vln_move_by_flash_controller import VlnMoveByFlashController from .h1_vln_move_by_speed_controller import VlnMoveBySpeedController from .stand_still import StandStillController +from .vln_move_by_flash_with_collision_controller import ( + VlnMoveByFlashCollisionController, +) # from .h1_vln_dp_conrtroller import VlnDpMoveBySpeedController diff --git a/internnav/env/utils/internutopia_extension/controllers/h1_vln_move_by_flash_controller.py b/internnav/env/utils/internutopia_extension/controllers/h1_vln_move_by_flash_controller.py index 86e57c0..c8c16df 100644 --- a/internnav/env/utils/internutopia_extension/controllers/h1_vln_move_by_flash_controller.py +++ b/internnav/env/utils/internutopia_extension/controllers/h1_vln_move_by_flash_controller.py @@ -89,7 +89,6 @@ def reset_robot_state(self, position, orientation): Args: position, orientation: np.array, issac_robot.get_world_pose() """ - # robot = self.robot.isaac_robot robot = self.robot.articulation robot._articulation.set_world_pose(position=position, orientation=orientation) robot._articulation.set_world_velocity(np.zeros(6)) @@ -108,7 +107,6 @@ def forward(self, action: int) -> ArticulationAction: ArticulationAction: joint signals to apply (nothing). """ # get robot new position - # positions, orientations = self.robot.isaac_robot.get_world_pose() positions, orientations = self.robot.articulation.get_world_pose() new_robot_position, new_robot_rotation = self.get_new_position_and_rotation(positions, orientations, action) diff --git a/internnav/env/utils/internutopia_extension/controllers/vln_move_by_flash_with_collision_controller.py b/internnav/env/utils/internutopia_extension/controllers/vln_move_by_flash_with_collision_controller.py new file mode 100644 index 0000000..d5338bc --- /dev/null +++ b/internnav/env/utils/internutopia_extension/controllers/vln_move_by_flash_with_collision_controller.py @@ -0,0 +1,185 @@ +import math +from typing import Any, Dict, List + +import numpy as np +from internutopia.core.robot.articulation import ArticulationAction +from internutopia.core.robot.controller import BaseController +from internutopia.core.robot.robot import BaseRobot +from internutopia.core.scene.scene import IScene + +from internnav.evaluator.utils.path_plan import world_to_pixel + +from ..configs.controllers.flash_controller import VlnMoveByFlashControllerCfg + + +@BaseController.register('VlnMoveByFlashCollisionController') +class VlnMoveByFlashCollisionController(BaseController): # codespell:ignore + """ + Discrete Controller, direct set robot world position to achieve teleport-type locomotion. + a general controller adaptable to different type of robots. + """ + + def __init__(self, config: VlnMoveByFlashControllerCfg, robot: BaseRobot, scene: IScene) -> None: + self._user_config = None + self.current_steps = 0 + self.steps_per_action = config.steps_per_action if config.steps_per_action is not None else 200 + + self.forward_distance = config.forward_distance if config.forward_distance is not None else 0.25 + self.rotation_angle = config.rotation_angle if config.rotation_angle is not None else 15.0 # in degrees + self.physics_frequency = config.physics_frequency if config.physics_frequency is not None else 240 + + # self.forward_speed = config.forward_speed if config.forward_speed is not None else 0.25 + # self.rotation_speed = config.rotation_speed if config.rotation_speed is not None else 1.0 + self.forward_speed = self.forward_distance / self.steps_per_action * self.physics_frequency + self.rotation_speed = np.deg2rad( + self.rotation_angle / self.steps_per_action * self.physics_frequency + ) # 200 is the physics_dt + + self.current_action = None + + super().__init__(config=config, robot=robot, scene=scene) + + def get_new_position_and_rotation(self, robot_position, robot_rotation, action): + """ + Calculate robot new state by previous state and action. The move should be based on the controller + settings. + Caution: the rotation need to reset pitch and roll to prevent robot falling. This may due to no + adjustment during the whole path and some rotation accumulated + + Args: + robot_position (np.ndarray): Current world position of the robot, shape (3,), in [x, y, z] format. + robot_rotation (np.ndarray): Current world orientation of the robot as a quaternion, shape (4,), in [x, y, z, w] format. + action (int): Discrete action to apply: + - 0: no movement (stand still) + - 1: move forward + - 2: rotate left + - 3: rotate right + + Returns: + Tuple[np.ndarray, np.ndarray]: The new robot position and rotation as (position, rotation), + both in world frame. + """ + from omni.isaac.core.utils.rotations import ( + euler_angles_to_quat, + quat_to_euler_angles, + ) + + _, _, yaw = quat_to_euler_angles(robot_rotation) + if action == 1: # forward + dx = self.forward_distance * math.cos(yaw) + dy = self.forward_distance * math.sin(yaw) + new_robot_position = robot_position + [dx, dy, 0] + new_robot_rotation = robot_rotation + elif action == 2: # left + new_robot_position = robot_position + new_yaw = yaw + math.radians(self.rotation_angle) + new_robot_rotation = euler_angles_to_quat( + np.array([0.0, 0.0, new_yaw]) + ) # using 0 to prevent the robot from falling + elif action == 3: # right + new_robot_position = robot_position + new_yaw = yaw - math.radians(self.rotation_angle) + new_robot_rotation = euler_angles_to_quat(np.array([0.0, 0.0, new_yaw])) + else: + new_robot_position = robot_position + new_robot_rotation = robot_rotation + + return new_robot_position, new_robot_rotation + + def reset_robot_state(self, position, orientation): + """Set robot state to the new position and orientation. + + Args: + position, orientation: np.array, issac_robot.get_world_pose() + """ + robot = self.robot.articulation + robot._articulation.set_world_pose(position=position, orientation=orientation) + robot._articulation.set_world_velocity(np.zeros(6)) + robot._articulation.set_joint_velocities(np.zeros(len(robot.dof_names))) + robot._articulation.set_joint_positions(np.zeros(len(robot.dof_names))) + robot._articulation.set_joint_efforts(np.zeros(len(robot.dof_names))) + + def get_map_info(self, topdown_global_map_camera, dilation_iterations=0, voxel_size=0.1, agent_radius=0.25): + # 获取 free_map + min_height = self.robot.get_robot_base().get_world_pose()[0][2] + 0.6 # default robot height + max_height = 1.55 + 8 + data_info = topdown_global_map_camera.get_data() + depth = np.array(data_info['depth']) + flat_surface_mask = np.ones_like(depth, dtype=bool) + if self.robot.config.type == 'VLNH1Robot': + depth_mask = ((depth >= min_height) & (depth < max_height)) | ((depth <= 0.5) & (depth > 0.02)) + elif self.robot.config.type == 'VLNAliengoRobot': + base_height = self.robot.get_robot_base().get_world_pose()[0][2] + foot_height = self.robot.get_ankle_height() + min_height = base_height - foot_height + 0.05 + depth_mask = (depth >= min_height) & (depth < max_height) + free_map = np.zeros_like(depth, dtype=int) + free_map[flat_surface_mask & depth_mask] = 1 + # free_map[robot_mask == 1] = 1 # This mask is too large, cover all obstacles around the robot + return free_map + + def check_collision(self, position, aperture=200) -> bool: + """ + Check if there are any obstacles at the position. + Generate a depth map based on a top down camera and check the position + + Return: + bool: True if the position is already occupied + """ + topdown_global_map_camera = self.robot.sensors['topdown_camera_500'] + free_map = self.get_map_info(topdown_global_map_camera, dilation_iterations=2) + + # convert position to free map pixel + camera_pose = topdown_global_map_camera.get_world_pose()[0] + width, height = topdown_global_map_camera.resolution + px, py = world_to_pixel(position, camera_pose, aperture, width, height) + + px_int, py_int = int(px), int(py) + # Get a region: (px, py) and one pixel right/down + robot_size = 3 + sub_map = free_map[px_int - robot_size : px_int + robot_size, py_int - robot_size : py_int + robot_size] + return np.any(sub_map == 0) # 1 = free, so True = collision + + def forward(self, action: int) -> ArticulationAction: + """ + Teleport robot by position, orientation and action + + Args: + action: int + 0. discrete action (int): 0: stop, 1: forward, 2: left, 3: right + + Returns: + ArticulationAction: joint signals to apply (nothing). + """ + # get robot new position + positions, orientations = self.robot.articulation.get_world_pose() + new_robot_position, new_robot_rotation = self.get_new_position_and_rotation(positions, orientations, action) + + # Check if there is a collision with obstacles. Abort the teleport if there is + if action != 1 or not self.check_collision(new_robot_position): + # set robot to new state + self.reset_robot_state(new_robot_position, new_robot_rotation) + else: + print("[FLASH CONTROLLER]: Collision detected, flash abort") + + # Dummy action to do nothing + return ArticulationAction() + + def action_to_control(self, action: List | np.ndarray) -> ArticulationAction: + """ + Convert input action (in 1d array format) to joint signals to apply. + + Args: + action (List | np.ndarray): 1-element 1d array containing + 0. discrete action (int): 0: stop, 1: forward, 2: left, 3: right + + Returns: + ArticulationAction: joint signals to apply. + """ + assert len(action) == 1, 'action must contain 1 element' + return self.forward(action=int(action[0])) + + def get_obs(self) -> Dict[str, Any]: + return { + 'finished': True, + } diff --git a/internnav/env/utils/internutopia_extension/sensors/vln_camera.py b/internnav/env/utils/internutopia_extension/sensors/vln_camera.py index 238b9a8..6336b44 100644 --- a/internnav/env/utils/internutopia_extension/sensors/vln_camera.py +++ b/internnav/env/utils/internutopia_extension/sensors/vln_camera.py @@ -21,6 +21,7 @@ def __init__(self, config: VLNCameraCfg, robot: BaseRobot, scene: IScene): super().__init__(config, robot, scene) self.config = config self._camera = None + self.resolution = config.resolution def get_data(self) -> Dict: output_data = {} diff --git a/internnav/evaluator/utils/common.py b/internnav/evaluator/utils/common.py index 08dde7e..7d2680d 100644 --- a/internnav/evaluator/utils/common.py +++ b/internnav/evaluator/utils/common.py @@ -10,7 +10,7 @@ def create_robot_mask(topdown_global_map_camera, mask_size=20): - height, width = topdown_global_map_camera._camera._resolution + height, width = topdown_global_map_camera.resolution center_x, center_y = width // 2, height // 2 # Calculate the top-left and bottom-right coordinates half_size = mask_size // 2 diff --git a/scripts/eval/configs/h1_internvla_n1_async_cfg.py b/scripts/eval/configs/h1_internvla_n1_async_cfg.py index 36f1012..569aa60 100644 --- a/scripts/eval/configs/h1_internvla_n1_async_cfg.py +++ b/scripts/eval/configs/h1_internvla_n1_async_cfg.py @@ -44,7 +44,7 @@ }, ), task=TaskCfg( - task_name='test', + task_name='test_flash', task_settings={ 'env_num': 1, 'use_distributed': False, # If the others setting in task_settings, please set use_distributed = False. @@ -57,6 +57,7 @@ ), robot_name='h1', robot_flash=True, # If robot_flash is True, the mode is flash (set world_pose directly); else you choose physical mode. + flash_collision=True, # If flash_collision is True, the robot will stop when collision detected. robot_usd_path='data/Embodiments/vln-pe/h1/h1_internvla.usd', camera_resolution=[640, 480], # (W,H) camera_prim_path='torso_link/h1_1_25_down_30', @@ -75,7 +76,7 @@ eval_type='vln_distributed', eval_settings={ 'save_to_json': True, - 'vis_output': False, + 'vis_output': True, 'use_agent_server': False, # If use_agent_server=True, please start the agent server first. }, )