diff --git a/data/.lfs/xarm7.tar.gz b/data/.lfs/xarm7.tar.gz index 8e2cfa368a..897f052bb8 100644 --- a/data/.lfs/xarm7.tar.gz +++ b/data/.lfs/xarm7.tar.gz @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:47dd79f13845ae6a35368345b7443a9190c7584d548caddd9c3eae224442c6fc -size 3280557 +oid sha256:c97e2283c0a726afd48e91172f84605765b8af8ace7ac107b810a8d11869bc99 +size 1606344 diff --git a/dimos/hardware/manipulators/sim/adapter.py b/dimos/hardware/manipulators/sim/adapter.py index 3979ce98c5..39cf6ffc5e 100644 --- a/dimos/hardware/manipulators/sim/adapter.py +++ b/dimos/hardware/manipulators/sim/adapter.py @@ -23,7 +23,7 @@ from pathlib import Path from typing import TYPE_CHECKING, Any -from dimos.simulation.engines.mujoco_engine import MujocoEngine +from dimos.simulation.engines.mujoco_engine import MujocoEngine, get_or_create_engine from dimos.simulation.manipulators.sim_manip_interface import SimManipInterface if TYPE_CHECKING: @@ -41,11 +41,13 @@ def __init__( dof: int = 7, address: str | None = None, headless: bool = True, + engine: MujocoEngine | None = None, **_: Any, ) -> None: - if address is None: - raise ValueError("address (MJCF XML path) is required for sim_mujoco adapter") - engine = MujocoEngine(config_path=Path(address), headless=headless) + if engine is None: + if address is None: + raise ValueError("address (MJCF XML path) is required for sim_mujoco adapter") + engine = get_or_create_engine(config_path=Path(address), headless=headless) # Detect gripper from engine joints gripper_idx = None diff --git a/dimos/manipulation/blueprints.py b/dimos/manipulation/blueprints.py index 8f726fe173..c327ac5c25 100644 --- a/dimos/manipulation/blueprints.py +++ b/dimos/manipulation/blueprints.py @@ -184,6 +184,8 @@ def _make_xarm7_config( add_gripper: bool = False, gripper_hardware_id: str | None = None, tf_extra_links: list[str] | None = None, + pre_grasp_offset: float = 0.10, + home_joints: list[float] | None = None, ) -> RobotModelConfig: """Create XArm7 robot config. @@ -227,8 +229,8 @@ def _make_xarm7_config( coordinator_task_name=coordinator_task, gripper_hardware_id=gripper_hardware_id, tf_extra_links=tf_extra_links or [], - # Home configuration: arm extended forward, elbow up (safe observe pose) - home_joints=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], + pre_grasp_offset=pre_grasp_offset, + home_joints=home_joints or [0.0] * 7, ) @@ -492,6 +494,68 @@ def _make_piper_config( ) +# Sim perception: MuJoCo camera replaces RealSense for sim-based pick-and-place +# Both the sim adapter and camera resolve the same MujocoEngine via the registry +# (keyed by MJCF path), so they share physics state. +# The engine is created lazily when the adapter connects / camera starts. + +from dimos.control.blueprints._hardware import XARM7_SIM_PATH, sim_xarm7 +from dimos.control.coordinator import TaskConfig as TaskConfig +from dimos.simulation.sensors.mujoco_camera import MujocoCamera +from dimos.visualization.rerun.bridge import RerunBridgeModule, _resolve_viewer_mode + +xarm_perception_sim = autoconnect( + PickAndPlaceModule.blueprint( + robots=[ + _make_xarm7_config( + "arm", + joint_prefix="arm_", + coordinator_task="traj_arm", + add_gripper=True, + gripper_hardware_id="arm", + tf_extra_links=["link7"], + pre_grasp_offset=0.05, + home_joints=[0.0, 0.0, 0.0, 0.0, 0.0, -0.7, 0.0], + ), + ], + planning_timeout=10.0, + enable_viz=True, + ), + MujocoCamera.blueprint( + address=str(XARM7_SIM_PATH), + camera_name="wrist_camera", + base_frame_id="link7", + ), + ObjectSceneRegistrationModule.blueprint(target_frame="world"), + ControlCoordinator.blueprint( + tick_rate=100.0, + publish_joint_state=True, + joint_state_frame_id="coordinator", + hardware=[sim_xarm7("arm", headless=False, gripper=True)], + tasks=[ + TaskConfig( + name="traj_arm", + type="trajectory", + joint_names=[f"arm_joint{i + 1}" for i in range(7)], + priority=10, + ), + ], + ), + RerunBridgeModule.blueprint(viewer_mode=_resolve_viewer_mode()), +).transports( + { + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), + } +) + + +xarm_perception_sim_agent = autoconnect( + xarm_perception_sim, + McpServer.blueprint(), + McpClient.blueprint(system_prompt=_MANIPULATION_AGENT_SYSTEM_PROMPT), +) + + __all__ = [ "PIPER_GRIPPER_COLLISION_EXCLUSIONS", "XARM_GRIPPER_COLLISION_EXCLUSIONS", @@ -501,4 +565,6 @@ def _make_piper_config( "xarm7_planner_coordinator_agent", "xarm_perception", "xarm_perception_agent", + "xarm_perception_sim", + "xarm_perception_sim_agent", ] diff --git a/dimos/manipulation/manipulation_module.py b/dimos/manipulation/manipulation_module.py index 25d1983a86..a5dc6cc3e2 100644 --- a/dimos/manipulation/manipulation_module.py +++ b/dimos/manipulation/manipulation_module.py @@ -471,6 +471,14 @@ def _plan_path_only( if start is None: return self._fail("No joint state") + # Trim goal to planner DOF (e.g. strip gripper joint from coordinator state) + planner_dof = len(start.position) + if len(goal.position) > planner_dof: + goal = JointState( + name=list(goal.name[:planner_dof]) if goal.name else [], + position=list(goal.position[:planner_dof]), + ) + result = self._planner.plan_joint_path( world=self._world_monitor.world, robot_id=robot_id, diff --git a/dimos/manipulation/planning/spec/config.py b/dimos/manipulation/planning/spec/config.py index baef5ea4fb..f8dcb8b548 100644 --- a/dimos/manipulation/planning/spec/config.py +++ b/dimos/manipulation/planning/spec/config.py @@ -16,7 +16,6 @@ from __future__ import annotations -from collections.abc import Sequence from pathlib import Path from pydantic import Field @@ -74,7 +73,7 @@ class RobotModelConfig(ModuleConfig): coordinator_task_name: str | None = None gripper_hardware_id: str | None = None # TF publishing for extra links (e.g., camera mount) - tf_extra_links: Sequence[str] = () + tf_extra_links: list[str] = Field(default_factory=list) # Home/observe joint configuration for go_home skill home_joints: list[float] | None = None # Pre-grasp offset distance in meters (along approach direction) diff --git a/dimos/msgs/sensor_msgs/CameraInfo.py b/dimos/msgs/sensor_msgs/CameraInfo.py index a371475675..a37682f7a5 100644 --- a/dimos/msgs/sensor_msgs/CameraInfo.py +++ b/dimos/msgs/sensor_msgs/CameraInfo.py @@ -90,6 +90,38 @@ def __init__( self.roi_width = 0 self.roi_do_rectify = False + @classmethod + def from_intrinsics( + cls, + fx: float, + fy: float, + cx: float, + cy: float, + width: int, + height: int, + frame_id: str = "", + ) -> CameraInfo: + """Create CameraInfo from pinhole intrinsics (no distortion). + + Args: + fx: Focal length x (pixels) + fy: Focal length y (pixels) + cx: Principal point x (pixels) + cy: Principal point y (pixels) + width: Image width + height: Image height + frame_id: Frame ID + """ + return cls( + height=height, + width=width, + distortion_model="plumb_bob", + D=[0.0, 0.0, 0.0, 0.0, 0.0], + K=[fx, 0.0, cx, 0.0, fy, cy, 0.0, 0.0, 1.0], + P=[fx, 0.0, cx, 0.0, 0.0, fy, cy, 0.0, 0.0, 0.0, 1.0, 0.0], + frame_id=frame_id, + ) + def with_ts(self, ts: float) -> CameraInfo: """Return a copy of this CameraInfo with the given timestamp. diff --git a/dimos/msgs/sensor_msgs/PointCloud2.py b/dimos/msgs/sensor_msgs/PointCloud2.py index a8a56df179..f9641e9c39 100644 --- a/dimos/msgs/sensor_msgs/PointCloud2.py +++ b/dimos/msgs/sensor_msgs/PointCloud2.py @@ -100,9 +100,10 @@ def __getstate__(self) -> dict[str, object]: # Remove non-picklable objects del state["_pcd_tensor"] state["_pcd_legacy_cache"] = None - # Remove cached_property entries that hold unpicklable Open3D types - state.pop("oriented_bounding_box", None) - state.pop("axis_aligned_bounding_box", None) + # Remove all cached_property entries + for key in list(state): + if isinstance(getattr(type(self), key, None), functools.cached_property): + del state[key] return state def __setstate__(self, state: dict[str, object]) -> None: diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index 00690d514f..bd31e9a518 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -91,6 +91,8 @@ "unity-sim": "dimos.simulation.unity.blueprint:unity_sim", "xarm-perception": "dimos.manipulation.blueprints:xarm_perception", "xarm-perception-agent": "dimos.manipulation.blueprints:xarm_perception_agent", + "xarm-perception-sim": "dimos.manipulation.blueprints:xarm_perception_sim", + "xarm-perception-sim-agent": "dimos.manipulation.blueprints:xarm_perception_sim_agent", "xarm6-planner-only": "dimos.manipulation.blueprints:xarm6_planner_only", "xarm7-planner-coordinator": "dimos.manipulation.blueprints:xarm7_planner_coordinator", "xarm7-planner-coordinator-agent": "dimos.manipulation.blueprints:xarm7_planner_coordinator_agent", @@ -137,6 +139,7 @@ "mock-b1-connection-module": "dimos.robot.unitree.b1.connection.MockB1ConnectionModule", "module-a": "dimos.robot.unitree.demo_error_on_name_conflicts.ModuleA", "module-b": "dimos.robot.unitree.demo_error_on_name_conflicts.ModuleB", + "mujoco-camera": "dimos.simulation.sensors.mujoco_camera.MujocoCamera", "navigation-module": "dimos.robot.unitree.rosnav.NavigationModule", "navigation-skill-container": "dimos.agents.skills.navigation.NavigationSkillContainer", "object-db-module": "dimos.perception.detection.moduleDB.ObjectDBModule", diff --git a/dimos/simulation/conftest.py b/dimos/simulation/conftest.py new file mode 100644 index 0000000000..6a12c3d1cc --- /dev/null +++ b/dimos/simulation/conftest.py @@ -0,0 +1,131 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Shared test fixtures for simulation tests.""" + +from __future__ import annotations + +from pathlib import Path +from unittest.mock import MagicMock, patch + +import numpy as np +import pytest + +from dimos.simulation.engines.mujoco_engine import _clear_registry +from dimos.simulation.utils.xml_parser import JointMapping + +ARM_DOF = 7 + + +def _make_joint_mapping(name: str, idx: int) -> JointMapping: + """Create a JointMapping for a simple revolute joint.""" + return JointMapping( + name=name, + joint_id=idx, + actuator_id=idx, + qpos_adr=idx, + dof_adr=idx, + tendon_qpos_adrs=(), + tendon_dof_adrs=(), + ) + + +def _make_gripper_mapping(name: str, idx: int) -> JointMapping: + """Create a JointMapping for a tendon-driven gripper.""" + return JointMapping( + name=name, + joint_id=None, + actuator_id=idx, + qpos_adr=None, + dof_adr=None, + tendon_qpos_adrs=(idx, idx + 1), + tendon_dof_adrs=(idx, idx + 1), + ) + + +def _patch_mujoco_engine(n_joints: int = ARM_DOF): + """Patch only the MuJoCo C-library and filesystem boundaries. + + Mocks ``_resolve_xml_path``, ``MjModel.from_xml_path``, ``MjData``, and + ``build_joint_mappings`` — the rest of ``MujocoEngine.__init__`` runs as-is. + """ + mappings = [_make_joint_mapping(f"joint{i}", i) for i in range(ARM_DOF)] + if n_joints > ARM_DOF: + mappings.append(_make_gripper_mapping(f"joint{ARM_DOF}", ARM_DOF)) + + fake_model = MagicMock() + fake_model.opt.timestep = 0.002 + fake_model.nu = n_joints + fake_model.nq = n_joints + fake_model.njnt = n_joints + fake_model.actuator_ctrlrange = np.array( + [[-6.28, 6.28]] * ARM_DOF + ([[0.0, 255.0]] if n_joints > ARM_DOF else []) + ) + fake_model.jnt_range = np.array( + [[-6.28, 6.28]] * ARM_DOF + ([[0.0, 0.85]] if n_joints > ARM_DOF else []) + ) + fake_model.jnt_qposadr = np.arange(n_joints) + + fake_data = MagicMock() + fake_data.qpos = np.zeros(n_joints + 4) # extra for tendon qpos addresses + fake_data.actuator_length = np.zeros(n_joints) + + patches = [ + patch( + "dimos.simulation.engines.mujoco_engine.MujocoEngine._resolve_xml_path", + return_value=Path("/fake/scene.xml"), + ), + patch( + "dimos.simulation.engines.mujoco_engine.mujoco.MjModel.from_xml_path", + return_value=fake_model, + ), + patch("dimos.simulation.engines.mujoco_engine.mujoco.MjData", return_value=fake_data), + patch("dimos.simulation.engines.mujoco_engine.build_joint_mappings", return_value=mappings), + ] + return patches + + +@pytest.fixture(autouse=True) +def clean_engine_registry(): + """Clear the engine registry before and after each simulation test.""" + _clear_registry() + yield + _clear_registry() + + +@pytest.fixture +def patched_mujoco_engine(): + """Start MuJoCo engine patches for the test, stop on teardown. + + Usage: + def test_something(patched_mujoco_engine): + engine = get_or_create_engine(config_path=Path("/fake/scene.xml")) + """ + patches = _patch_mujoco_engine(ARM_DOF) + for p in patches: + p.start() + yield + for p in patches: + p.stop() + + +@pytest.fixture +def patched_mujoco_engine_with_gripper(): + """Same as patched_mujoco_engine but with ARM_DOF+1 joints (gripper).""" + patches = _patch_mujoco_engine(ARM_DOF + 1) + for p in patches: + p.start() + yield + for p in patches: + p.stop() diff --git a/dimos/simulation/engines/mujoco_engine.py b/dimos/simulation/engines/mujoco_engine.py index df8359746a..1cb61ebf45 100644 --- a/dimos/simulation/engines/mujoco_engine.py +++ b/dimos/simulation/engines/mujoco_engine.py @@ -16,25 +16,112 @@ from __future__ import annotations +from dataclasses import dataclass +from pathlib import Path import threading import time from typing import TYPE_CHECKING import mujoco import mujoco.viewer as viewer # type: ignore[import-untyped,import-not-found] +import numpy as np +from numpy.typing import NDArray from dimos.simulation.engines.base import SimulationEngine from dimos.simulation.utils.xml_parser import JointMapping, build_joint_mappings from dimos.utils.logging_config import setup_logger if TYPE_CHECKING: - from pathlib import Path - from dimos.msgs.sensor_msgs.JointState import JointState logger = setup_logger() +@dataclass +class CameraConfig: + """Configuration for a camera to be rendered inside the sim loop.""" + + name: str + width: int = 640 + height: int = 480 + fps: float = 15.0 + + +@dataclass +class CameraFrame: + """Thread-safe container for a rendered camera frame.""" + + rgb: NDArray[np.uint8] + depth: NDArray[np.float32] + cam_pos: NDArray[np.float64] + cam_mat: NDArray[np.float64] + fovy: float + timestamp: float + + +@dataclass +class _CameraRendererState: + """Mutable state for a single camera renderer (sim-thread only).""" + + cfg: CameraConfig + cam_id: int + rgb_renderer: mujoco.Renderer + depth_renderer: mujoco.Renderer + interval: float + last_render_time: float = 0.0 + + +_engine_registry: dict[str, MujocoEngine] = {} +_engine_registry_lock = threading.Lock() + + +def get_or_create_engine( + config_path: Path, + headless: bool = True, + cameras: list[CameraConfig] | None = None, +) -> MujocoEngine: + """Return the shared MujocoEngine for *config_path*, creating one if needed. + + If an engine already exists for the resolved path and *cameras* are supplied, + any **new** camera configs are appended (idempotent by name). + """ + key = str(config_path.resolve()) + with _engine_registry_lock: + if key in _engine_registry: + engine = _engine_registry[key] + if engine._headless != headless: + logger.warning( + f"get_or_create_engine: ignoring headless={headless} — " + f"existing engine for '{key}' was created with headless={engine._headless}" + ) + # Merge new camera configs (by name), guarded against sim-thread iteration + if cameras: + with engine._camera_lock: + existing_names = {c.name for c in engine._camera_configs} + for cam in cameras: + if cam.name not in existing_names: + engine._camera_configs.append(cam) + return engine + + engine = MujocoEngine(config_path=config_path, headless=headless, cameras=cameras) + _engine_registry[key] = engine + return engine + + +def unregister_engine(engine: MujocoEngine) -> None: + """Remove an engine from the registry (called on disconnect).""" + with _engine_registry_lock: + keys_to_remove = [k for k, v in _engine_registry.items() if v is engine] + for k in keys_to_remove: + del _engine_registry[k] + + +def _clear_registry() -> None: + """Clear the engine registry (for test teardown only).""" + with _engine_registry_lock: + _engine_registry.clear() + + class MujocoEngine(SimulationEngine): """ MuJoCo simulation engine. @@ -44,7 +131,12 @@ class MujocoEngine(SimulationEngine): - applies control commands """ - def __init__(self, config_path: Path, headless: bool) -> None: + def __init__( + self, + config_path: Path, + headless: bool, + cameras: list[CameraConfig] | None = None, + ) -> None: super().__init__(config_path=config_path, headless=headless) xml_path = self._resolve_xml_path(config_path) @@ -76,6 +168,11 @@ def __init__(self, config_path: Path, headless: bool) -> None: self._joint_position_targets[i] = current_pos self._joint_positions[i] = current_pos + # Camera rendering state (renderers created in sim thread) + self._camera_configs = cameras or [] + self._camera_frames: dict[str, CameraFrame] = {} + self._camera_lock = threading.Lock() + def _resolve_xml_path(self, config_path: Path) -> Path: if config_path is None: raise ValueError("config_path is required for MuJoCo simulation loading") @@ -168,15 +265,80 @@ def disconnect(self) -> bool: if self._sim_thread and self._sim_thread.is_alive(): self._sim_thread.join(timeout=2.0) self._sim_thread = None + unregister_engine(self) return True except Exception as e: logger.error(f"{self.__class__.__name__}: disconnect() failed: {e}") return False + def _init_new_cameras(self, cam_renderers: dict[str, _CameraRendererState]) -> None: + """Create renderers for any camera configs not yet initialized. + + Must be called from the sim thread (MuJoCo thread-safety). + """ + with self._camera_lock: + configs_snapshot = list(self._camera_configs) + for cfg in configs_snapshot: + if cfg.name in cam_renderers: + continue + cam_id = mujoco.mj_name2id(self._model, mujoco.mjtObj.mjOBJ_CAMERA, cfg.name) + if cam_id < 0: + logger.warning(f"Camera '{cfg.name}' not found in MJCF, skipping") + continue + rgb_renderer = mujoco.Renderer(self._model, height=cfg.height, width=cfg.width) + depth_renderer = mujoco.Renderer(self._model, height=cfg.height, width=cfg.width) + depth_renderer.enable_depth_rendering() + interval = 1.0 / cfg.fps if cfg.fps > 0 else float("inf") + cam_renderers[cfg.name] = _CameraRendererState( + cfg=cfg, + cam_id=cam_id, + rgb_renderer=rgb_renderer, + depth_renderer=depth_renderer, + interval=interval, + ) + logger.info( + f"Camera '{cfg.name}' renderer created ({cfg.width}x{cfg.height} @ {cfg.fps}fps)" + ) + + def _render_cameras(self, now: float, cam_renderers: dict[str, _CameraRendererState]) -> None: + """Render all due cameras and store frames. Must be called from sim thread.""" + self._init_new_cameras(cam_renderers) + for state in cam_renderers.values(): + if now - state.last_render_time < state.interval: + continue + state.last_render_time = now + + state.rgb_renderer.update_scene(self._data, camera=state.cam_id) + rgb = state.rgb_renderer.render().copy() + + state.depth_renderer.update_scene(self._data, camera=state.cam_id) + depth = state.depth_renderer.render().copy() + + frame = CameraFrame( + rgb=rgb, + depth=depth.astype(np.float32), + cam_pos=self._data.cam_xpos[state.cam_id].copy(), + cam_mat=self._data.cam_xmat[state.cam_id].copy(), + fovy=float(self._model.cam_fovy[state.cam_id]), + timestamp=now, + ) + with self._camera_lock: + self._camera_frames[state.cfg.name] = frame + + @staticmethod + def _close_cam_renderers(cam_renderers: dict[str, _CameraRendererState]) -> None: + for state in cam_renderers.values(): + state.rgb_renderer.close() + state.depth_renderer.close() + def _sim_loop(self) -> None: logger.info(f"{self.__class__.__name__}: sim loop started") dt = 1.0 / self._control_frequency + # Camera renderers — created in sim thread (MuJoCo thread-safety). + # Checked each tick so cameras added after connect() are picked up. + cam_renderers: dict[str, _CameraRendererState] = {} + def _step_once(sync_viewer: bool) -> None: loop_start = time.time() self._apply_control() @@ -184,6 +346,7 @@ def _step_once(sync_viewer: bool) -> None: if sync_viewer: m_viewer.sync() self._update_joint_state() + self._render_cameras(loop_start, cam_renderers) elapsed = time.time() - loop_start sleep_time = dt - elapsed @@ -200,6 +363,7 @@ def _step_once(sync_viewer: bool) -> None: while m_viewer.is_running() and not self._stop_event.is_set(): _step_once(sync_viewer=True) + self._close_cam_renderers(cam_renderers) logger.info(f"{self.__class__.__name__}: sim loop stopped") @property @@ -327,7 +491,26 @@ def get_joint_range(self, joint_index: int) -> tuple[float, float] | None: ) return None + def read_camera(self, camera_name: str) -> CameraFrame | None: + """Read the latest rendered frame for a camera (thread-safe). + + Returns None if the camera hasn't rendered yet or doesn't exist. + """ + with self._camera_lock: + return self._camera_frames.get(camera_name) + + def get_camera_fovy(self, camera_name: str) -> float | None: + """Get vertical field of view for a named camera, in degrees.""" + cam_id = mujoco.mj_name2id(self._model, mujoco.mjtObj.mjOBJ_CAMERA, camera_name) + if cam_id < 0: + return None + return float(self._model.cam_fovy[cam_id]) + __all__ = [ + "CameraConfig", + "CameraFrame", "MujocoEngine", + "get_or_create_engine", + "unregister_engine", ] diff --git a/dimos/simulation/manipulators/sim_manip_interface.py b/dimos/simulation/manipulators/sim_manip_interface.py index 07e56c5afd..abcfcc96f7 100644 --- a/dimos/simulation/manipulators/sim_manip_interface.py +++ b/dimos/simulation/manipulators/sim_manip_interface.py @@ -206,7 +206,7 @@ def write_gripper_position(self, position: float) -> bool: position = max(jlo, min(jhi, position)) if jhi != jlo: t = (position - jlo) / (jhi - jlo) - ctrl_value = chi - t * (chi - clo) + ctrl_value = clo + t * (chi - clo) else: ctrl_value = clo self._engine.set_position_target(self._gripper_idx, ctrl_value) diff --git a/dimos/simulation/manipulators/test_sim_adapter.py b/dimos/simulation/manipulators/test_sim_adapter.py index 8f253229f0..1413004fee 100644 --- a/dimos/simulation/manipulators/test_sim_adapter.py +++ b/dimos/simulation/manipulators/test_sim_adapter.py @@ -16,125 +16,30 @@ from __future__ import annotations -from pathlib import Path from unittest.mock import MagicMock, patch -import numpy as np import pytest from dimos.hardware.manipulators.sim.adapter import SimMujocoAdapter, register -from dimos.simulation.utils.xml_parser import JointMapping - -ARM_DOF = 7 - - -def _make_joint_mapping(name: str, idx: int) -> JointMapping: - """Create a JointMapping for a simple revolute joint.""" - return JointMapping( - name=name, - joint_id=idx, - actuator_id=idx, - qpos_adr=idx, - dof_adr=idx, - tendon_qpos_adrs=(), - tendon_dof_adrs=(), - ) - - -def _make_gripper_mapping(name: str, idx: int) -> JointMapping: - """Create a JointMapping for a tendon-driven gripper.""" - return JointMapping( - name=name, - joint_id=None, - actuator_id=idx, - qpos_adr=None, - dof_adr=None, - tendon_qpos_adrs=(idx, idx + 1), - tendon_dof_adrs=(idx, idx + 1), - ) - - -def _patch_mujoco_engine(n_joints: int): - """Patch only the MuJoCo C-library and filesystem boundaries. - - Mocks ``_resolve_xml_path``, ``MjModel.from_xml_path``, ``MjData``, and - ``build_joint_mappings`` — the rest of ``MujocoEngine.__init__`` runs as-is. - """ - mappings = [_make_joint_mapping(f"joint{i}", i) for i in range(ARM_DOF)] - if n_joints > ARM_DOF: - mappings.append(_make_gripper_mapping(f"joint{ARM_DOF}", ARM_DOF)) - - fake_model = MagicMock() - fake_model.opt.timestep = 0.002 - fake_model.nu = n_joints - fake_model.nq = n_joints - fake_model.njnt = n_joints - fake_model.actuator_ctrlrange = np.array( - [[-6.28, 6.28]] * ARM_DOF + ([[0.0, 255.0]] if n_joints > ARM_DOF else []) - ) - fake_model.jnt_range = np.array( - [[-6.28, 6.28]] * ARM_DOF + ([[0.0, 0.85]] if n_joints > ARM_DOF else []) - ) - fake_model.jnt_qposadr = np.arange(n_joints) - - fake_data = MagicMock() - fake_data.qpos = np.zeros(n_joints + 4) # extra for tendon qpos addresses - fake_data.actuator_length = np.zeros(n_joints) - - patches = [ - patch( - "dimos.simulation.engines.mujoco_engine.MujocoEngine._resolve_xml_path", - return_value=Path("/fake/scene.xml"), - ), - patch( - "dimos.simulation.engines.mujoco_engine.mujoco.MjModel.from_xml_path", - return_value=fake_model, - ), - patch("dimos.simulation.engines.mujoco_engine.mujoco.MjData", return_value=fake_data), - patch("dimos.simulation.engines.mujoco_engine.build_joint_mappings", return_value=mappings), - ] - return patches +from dimos.simulation.conftest import ARM_DOF class TestSimMujocoAdapter: """Tests for SimMujocoAdapter with and without gripper.""" @pytest.fixture - def adapter_with_gripper(self): + def adapter_with_gripper(self, patched_mujoco_engine_with_gripper): """SimMujocoAdapter with ARM_DOF arm joints + 1 gripper joint.""" - patches = _patch_mujoco_engine(ARM_DOF + 1) - for p in patches: - p.start() - try: - adapter = SimMujocoAdapter(dof=ARM_DOF, address="/fake/scene.xml", headless=True) - finally: - for p in patches: - p.stop() - return adapter + return SimMujocoAdapter(dof=ARM_DOF, address="/fake/scene.xml", headless=True) @pytest.fixture - def adapter_no_gripper(self): + def adapter_no_gripper(self, patched_mujoco_engine): """SimMujocoAdapter with ARM_DOF arm joints, no gripper.""" - patches = _patch_mujoco_engine(ARM_DOF) - for p in patches: - p.start() - try: - adapter = SimMujocoAdapter(dof=ARM_DOF, address="/fake/scene.xml", headless=True) - finally: - for p in patches: - p.stop() - return adapter - - def test_address_required(self): - patches = _patch_mujoco_engine(ARM_DOF) - for p in patches: - p.start() - try: - with pytest.raises(ValueError, match="address"): - SimMujocoAdapter(dof=ARM_DOF, address=None) - finally: - for p in patches: - p.stop() + return SimMujocoAdapter(dof=ARM_DOF, address="/fake/scene.xml", headless=True) + + def test_address_required(self, patched_mujoco_engine): + with pytest.raises(ValueError, match="address"): + SimMujocoAdapter(dof=ARM_DOF, address=None) def test_gripper_detected(self, adapter_with_gripper): assert adapter_with_gripper._gripper_idx == ARM_DOF diff --git a/dimos/simulation/sensors/mujoco_camera.py b/dimos/simulation/sensors/mujoco_camera.py new file mode 100644 index 0000000000..2e5cde393a --- /dev/null +++ b/dimos/simulation/sensors/mujoco_camera.py @@ -0,0 +1,401 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""MuJoCo simulation camera module. + +Drop-in replacement for RealSenseCamera in manipulation blueprints. +Renders RGB + depth from a named camera in a shared MujocoEngine. +""" + +from __future__ import annotations + +import math +from pathlib import Path +import threading +import time +from typing import Any + +from pydantic import Field +import reactivex as rx +from scipy.spatial.transform import Rotation as R + +from dimos.core.core import rpc +from dimos.core.module import Module, ModuleConfig +from dimos.core.stream import Out +from dimos.hardware.sensors.camera.spec import ( + DepthCameraConfig, + DepthCameraHardware, +) +from dimos.msgs.geometry_msgs.Quaternion import Quaternion +from dimos.msgs.geometry_msgs.Transform import Transform +from dimos.msgs.geometry_msgs.Vector3 import Vector3 +from dimos.msgs.sensor_msgs.CameraInfo import CameraInfo +from dimos.msgs.sensor_msgs.Image import Image, ImageFormat +from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2 +from dimos.simulation.engines.mujoco_engine import ( + CameraConfig, + CameraFrame, + MujocoEngine, + get_or_create_engine, +) +from dimos.spec import perception +from dimos.utils.logging_config import setup_logger +from dimos.utils.reactive import backpressure + +logger = setup_logger() + +_RX180 = R.from_euler("x", 180, degrees=True) + + +def _default_identity_transform() -> Transform: + return Transform( + translation=Vector3(0.0, 0.0, 0.0), + rotation=Quaternion(0.0, 0.0, 0.0, 1.0), + ) + + +class MujocoCameraConfig(ModuleConfig, DepthCameraConfig): + """Configuration for the MuJoCo simulation camera.""" + + address: str = "" + headless: bool = False + camera_name: str = "wrist_camera" + width: int = 640 + height: int = 480 + fps: int = 15 + base_frame_id: str = "link7" + base_transform: Transform | None = Field(default_factory=_default_identity_transform) + # MuJoCo renders color+depth from same virtual camera, alignment is a no-op + align_depth_to_color: bool = True + enable_depth: bool = True + enable_pointcloud: bool = False + pointcloud_fps: float = 5.0 + camera_info_fps: float = 1.0 + + +class MujocoCamera(DepthCameraHardware, Module[MujocoCameraConfig], perception.DepthCamera): + """Simulated depth camera that renders from a MujocoEngine. + + Implements the same output ports and TF chain as RealSenseCamera so it can + be used as a drop-in replacement in manipulation blueprints. + + The engine is resolved automatically from the registry via ``address`` + (the same MJCF path used by the sim_mujoco adapter). + """ + + color_image: Out[Image] + depth_image: Out[Image] + pointcloud: Out[PointCloud2] + camera_info: Out[CameraInfo] + depth_camera_info: Out[CameraInfo] + + default_config = MujocoCameraConfig + + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + self._engine: MujocoEngine | None = None + self._stop_event = threading.Event() + self._thread: threading.Thread | None = None + self._camera_info_base: CameraInfo | None = None + + @property + def _camera_link(self) -> str: + return f"{self.config.camera_name}_link" + + @property + def _color_frame(self) -> str: + return f"{self.config.camera_name}_color_frame" + + @property + def _color_optical_frame(self) -> str: + return f"{self.config.camera_name}_color_optical_frame" + + @property + def _depth_frame(self) -> str: + return f"{self.config.camera_name}_depth_frame" + + @property + def _depth_optical_frame(self) -> str: + return f"{self.config.camera_name}_depth_optical_frame" + + @rpc + def get_color_camera_info(self) -> CameraInfo | None: + return self._camera_info_base + + @rpc + def get_depth_camera_info(self) -> CameraInfo | None: + return self._camera_info_base + + @rpc + def get_depth_scale(self) -> float: + return 1.0 + + def _build_camera_info(self) -> None: + """Compute camera intrinsics from the MuJoCo model (pinhole, no distortion).""" + if self._engine is None: + return + fovy_deg = self._engine.get_camera_fovy(self.config.camera_name) + if fovy_deg is None: + logger.error("Camera not found in MJCF", camera_name=self.config.camera_name) + return + + h = self.config.height + w = self.config.width + fovy_rad = math.radians(fovy_deg) + fy = h / (2.0 * math.tan(fovy_rad / 2.0)) + fx = fy # square pixels + + self._camera_info_base = CameraInfo.from_intrinsics( + fx=fx, + fy=fy, + cx=w / 2.0, + cy=h / 2.0, + width=w, + height=h, + frame_id=self._color_optical_frame, + ) + + @rpc + def start(self) -> None: + if self._engine is None and self.config.address: + self._engine = get_or_create_engine( + config_path=Path(self.config.address), + headless=self.config.headless, + cameras=[ + CameraConfig( + name=self.config.camera_name, + width=self.config.width, + height=self.config.height, + fps=self.config.fps, + ) + ], + ) + if self._engine is None: + raise RuntimeError( + "MujocoCamera: set address (MJCF path) in config or call set_engine()" + ) + + # Ensure camera config is registered (handles the set_engine() path) + cam_cfg = CameraConfig( + name=self.config.camera_name, + width=self.config.width, + height=self.config.height, + fps=self.config.fps, + ) + with self._engine._camera_lock: + existing_names = {c.name for c in self._engine._camera_configs} + if cam_cfg.name not in existing_names: + self._engine._camera_configs.append(cam_cfg) + + logger.info("MujocoCamera engine resolved", connected=self._engine.connected) + + self._build_camera_info() + + self._stop_event.clear() + self._thread = threading.Thread(target=self._publish_loop, daemon=True) + self._thread.start() + + # Periodic camera_info publishing + interval_sec = 1.0 / self.config.camera_info_fps + self._disposables.add( + rx.interval(interval_sec).subscribe( + on_next=lambda _: self._publish_camera_info(), + on_error=lambda e: logger.error("CameraInfo publish error", error=e), + ) + ) + + # Optional pointcloud generation + if self.config.enable_pointcloud and self.config.enable_depth: + pc_interval = 1.0 / self.config.pointcloud_fps + self._disposables.add( + backpressure(rx.interval(pc_interval)).subscribe( + on_next=lambda _: self._generate_pointcloud(), + on_error=lambda e: logger.error("Pointcloud error", error=e), + ) + ) + + @rpc + def stop(self) -> None: + self._stop_event.set() + if self._thread and self._thread.is_alive(): + self._thread.join(timeout=2.0) + self._thread = None + self._camera_info_base = None + super().stop() + + def _publish_loop(self) -> None: + """Poll engine for rendered frames and publish at configured FPS.""" + interval = 1.0 / self.config.fps + last_timestamp = 0.0 + published_count = 0 + + logger.info("MujocoCamera publish loop started", camera=self.config.camera_name) + + # Wait for engine to connect (adapter may not have started yet) + deadline = time.monotonic() + 30.0 + while ( + not self._stop_event.is_set() + and self._engine is not None + and not self._engine.connected + ): + if time.monotonic() > deadline: + logger.error("MujocoCamera: timed out waiting for engine to connect") + return + self._stop_event.wait(timeout=0.1) + + if self._stop_event.is_set(): + return + + logger.info("MujocoCamera: engine connected, polling for frames") + + while not self._stop_event.is_set() and self._engine is not None: + try: + frame = self._engine.read_camera(self.config.camera_name) + if frame is None or frame.timestamp <= last_timestamp: + self._stop_event.wait(timeout=interval * 0.5) + continue + + last_timestamp = frame.timestamp + ts = time.time() + + # Publish RGB + color_img = Image( + data=frame.rgb, + format=ImageFormat.RGB, + frame_id=self._color_optical_frame, + ts=ts, + ) + self.color_image.publish(color_img) + + # Publish depth (float32 meters) + if self.config.enable_depth: + depth_img = Image( + data=frame.depth, + format=ImageFormat.DEPTH, + frame_id=self._color_optical_frame, + ts=ts, + ) + self.depth_image.publish(depth_img) + + # Publish TF (world -> camera from MuJoCo pose) + self._publish_tf(ts, frame) + + published_count += 1 + if published_count == 1: + logger.info( + "MujocoCamera first frame published", + rgb_shape=frame.rgb.shape, + depth_shape=frame.depth.shape, + ) + + # Rate limit + elapsed = time.time() - ts + sleep_time = interval - elapsed + if sleep_time > 0: + time.sleep(sleep_time) + except Exception as e: + logger.error("MujocoCamera publish error", error=e) + time.sleep(1.0) + + def _publish_camera_info(self) -> None: + base = self._camera_info_base + if base is None: + return + ts = time.time() + color_info = CameraInfo( + height=base.height, + width=base.width, + distortion_model=base.distortion_model, + D=base.D, + K=base.K, + P=base.P, + frame_id=base.frame_id, + ts=ts, + ) + self.camera_info.publish(color_info) + self.depth_camera_info.publish(color_info) + + def _publish_tf(self, ts: float, frame: CameraFrame | None = None) -> None: + if frame is None: + return + + mj_rot = R.from_matrix(frame.cam_mat.reshape(3, 3)) + optical_rot = mj_rot * _RX180 + q = optical_rot.as_quat() # xyzw + + pos = Vector3( + float(frame.cam_pos[0]), + float(frame.cam_pos[1]), + float(frame.cam_pos[2]), + ) + rot = Quaternion(float(q[0]), float(q[1]), float(q[2]), float(q[3])) + + # Publish world -> all optical/link frames (all co-located in sim) + self.tf.publish( + Transform( + translation=pos, + rotation=rot, + frame_id="world", + child_frame_id=self._color_optical_frame, + ts=ts, + ), + Transform( + translation=pos, + rotation=rot, + frame_id="world", + child_frame_id=self._depth_optical_frame, + ts=ts, + ), + Transform( + translation=pos, + rotation=rot, + frame_id="world", + child_frame_id=self._camera_link, + ts=ts, + ), + ) + + def _generate_pointcloud(self) -> None: + """Generate pointcloud from latest depth frame (optional, for visualization).""" + if self._engine is None or self._camera_info_base is None: + return + frame = self._engine.read_camera(self.config.camera_name) + if frame is None: + return + try: + color_img = Image( + data=frame.rgb, + format=ImageFormat.RGB, + frame_id=self._color_optical_frame, + ts=frame.timestamp, + ) + depth_img = Image( + data=frame.depth, + format=ImageFormat.DEPTH, + frame_id=self._color_optical_frame, + ts=frame.timestamp, + ) + pcd = PointCloud2.from_rgbd( + color_image=color_img, + depth_image=depth_img, + camera_info=self._camera_info_base, + depth_scale=1.0, + ) + pcd = pcd.voxel_downsample(0.005) + self.pointcloud.publish(pcd) + except Exception as e: + logger.error("Pointcloud generation error", error=e) + + +__all__ = ["MujocoCamera", "MujocoCameraConfig"] diff --git a/dimos/simulation/sensors/test_mujoco_camera.py b/dimos/simulation/sensors/test_mujoco_camera.py new file mode 100644 index 0000000000..a67f673966 --- /dev/null +++ b/dimos/simulation/sensors/test_mujoco_camera.py @@ -0,0 +1,237 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for MujocoCamera and engine registry.""" + +from __future__ import annotations + +import math +from pathlib import Path +import threading +from unittest.mock import MagicMock, patch + +import numpy as np +import pytest + +from dimos.simulation.engines.mujoco_engine import ( + CameraConfig, + CameraFrame, + MujocoEngine, + _engine_registry, + get_or_create_engine, + unregister_engine, +) +from dimos.simulation.sensors.mujoco_camera import MujocoCamera + +ARM_DOF = 7 + + +def _make_camera_frame( + cam_pos: list[float] | None = None, + cam_mat: list[float] | None = None, +) -> CameraFrame: + """Create a CameraFrame with sensible defaults.""" + return CameraFrame( + rgb=np.zeros((480, 640, 3), dtype=np.uint8), + depth=np.ones((480, 640), dtype=np.float32), + cam_pos=np.array(cam_pos or [1.0, 2.0, 3.0]), + cam_mat=np.array(cam_mat or np.eye(3).flatten()), + fovy=45.0, + timestamp=1.0, + ) + + +def _make_mock_engine(fovy: float = 45.0) -> MagicMock: + mock_engine = MagicMock(spec=MujocoEngine) + mock_engine.get_camera_fovy.return_value = fovy + mock_engine.connected = True + mock_engine._camera_configs = [] + mock_engine._camera_lock = threading.Lock() + mock_engine.read_camera.return_value = _make_camera_frame() + return mock_engine + + +@pytest.fixture +def mock_engine() -> MagicMock: + return _make_mock_engine() + + +@pytest.fixture +def camera_with_mock_engine(mock_engine: MagicMock): + cam = MujocoCamera(camera_name="wrist_camera") + cam._engine = mock_engine + yield cam + cam.stop() + + +class TestEngineRegistry: + def test_creates_new(self, patched_mujoco_engine): + engine = get_or_create_engine(config_path=Path("/fake/scene.xml"), headless=True) + assert engine is not None + assert len(_engine_registry) == 1 + + def test_returns_same_instance(self, patched_mujoco_engine): + e1 = get_or_create_engine(config_path=Path("/fake/scene.xml"), headless=True) + e2 = get_or_create_engine(config_path=Path("/fake/scene.xml"), headless=True) + assert e1 is e2 + assert len(_engine_registry) == 1 + + def test_merges_new_cameras(self, patched_mujoco_engine): + e1 = get_or_create_engine( + config_path=Path("/fake/scene.xml"), + cameras=[CameraConfig(name="cam_a")], + ) + get_or_create_engine( + config_path=Path("/fake/scene.xml"), + cameras=[CameraConfig(name="cam_b")], + ) + names = {c.name for c in e1._camera_configs} + assert names == {"cam_a", "cam_b"} + + def test_deduplicates_cameras(self, patched_mujoco_engine): + get_or_create_engine( + config_path=Path("/fake/scene.xml"), + cameras=[CameraConfig(name="cam_a")], + ) + get_or_create_engine( + config_path=Path("/fake/scene.xml"), + cameras=[CameraConfig(name="cam_a")], + ) + engine = get_or_create_engine(config_path=Path("/fake/scene.xml")) + cam_names = [c.name for c in engine._camera_configs] + assert cam_names.count("cam_a") == 1 + + def test_unregister_removes(self, patched_mujoco_engine): + engine = get_or_create_engine(config_path=Path("/fake/scene.xml")) + assert len(_engine_registry) == 1 + unregister_engine(engine) + assert len(_engine_registry) == 0 + + +class TestCameraIntrinsics: + def test_fovy_45(self, camera_with_mock_engine: MujocoCamera): + cam = camera_with_mock_engine + cam._build_camera_info() + info = cam._camera_info_base + assert info is not None + + expected_fy = 480.0 / (2.0 * math.tan(math.radians(45.0) / 2.0)) + # K = [fx, 0, cx, 0, fy, cy, 0, 0, 1] + assert info.K[0] == pytest.approx(expected_fy, abs=0.01) # fx + assert info.K[4] == pytest.approx(expected_fy, abs=0.01) # fy + assert info.K[2] == pytest.approx(320.0) # cx + assert info.K[5] == pytest.approx(240.0) # cy + + def test_fovy_90(self, mock_engine: MagicMock): + mock_engine.get_camera_fovy.return_value = 90.0 + cam = MujocoCamera(camera_name="wrist_camera") + cam._engine = mock_engine + cam._build_camera_info() + info = cam._camera_info_base + assert info is not None + # tan(45°) = 1.0, so fy = 480 / 2 = 240 + assert info.K[0] == pytest.approx(240.0, abs=0.01) + assert info.K[4] == pytest.approx(240.0, abs=0.01) + cam.stop() + + def test_unknown_camera(self, mock_engine: MagicMock): + mock_engine.get_camera_fovy.return_value = None + cam = MujocoCamera(camera_name="nonexistent") + cam._engine = mock_engine + cam._build_camera_info() + assert cam._camera_info_base is None + cam.stop() + + def test_distortion_and_frame_id(self, camera_with_mock_engine: MujocoCamera): + cam = camera_with_mock_engine + cam._build_camera_info() + info = cam._camera_info_base + assert info is not None + assert info.D == [0.0, 0.0, 0.0, 0.0, 0.0] + assert info.distortion_model == "plumb_bob" + assert info.frame_id == "wrist_camera_color_optical_frame" + + +class TestMujocoCameraLifecycle: + def test_start_no_engine_raises(self): + cam = MujocoCamera(camera_name="cam", address="") + try: + with pytest.raises(RuntimeError, match="set address"): + cam.start() + finally: + cam.stop() + + def test_start_creates_thread(self, camera_with_mock_engine: MujocoCamera): + cam = camera_with_mock_engine + cam._engine.connected = False # thread will wait, won't spin + # Patch rx.interval to avoid spawning scheduler threads that leak + with patch("dimos.simulation.sensors.mujoco_camera.rx.interval", return_value=MagicMock()): + cam.start() + assert cam._thread is not None + assert cam._thread.is_alive() + cam.stop() + assert cam._thread is None + + def test_stop_sets_event(self, camera_with_mock_engine: MujocoCamera): + cam = camera_with_mock_engine + cam._engine.connected = False + with patch("dimos.simulation.sensors.mujoco_camera.rx.interval", return_value=MagicMock()): + cam.start() + cam.stop() + assert cam._stop_event.is_set() + assert cam._thread is None + + def test_frame_name_properties(self): + cam = MujocoCamera(camera_name="wrist_camera") + assert cam._camera_link == "wrist_camera_link" + assert cam._color_frame == "wrist_camera_color_frame" + assert cam._color_optical_frame == "wrist_camera_color_optical_frame" + assert cam._depth_frame == "wrist_camera_depth_frame" + assert cam._depth_optical_frame == "wrist_camera_depth_optical_frame" + cam.stop() + + +class TestTFPublishing: + def test_publish_tf_correct_frames(self, camera_with_mock_engine: MujocoCamera): + cam = camera_with_mock_engine + mock_tf = MagicMock() + frame = _make_camera_frame(cam_pos=[1.0, 2.0, 3.0]) + + with patch.object(type(cam), "tf", new_callable=lambda: property(lambda self: mock_tf)): + cam._publish_tf(ts=0.0, frame=frame) + + mock_tf.publish.assert_called_once() + transforms = mock_tf.publish.call_args.args + assert len(transforms) == 3 + + child_ids = {t.child_frame_id for t in transforms} + assert child_ids == { + "wrist_camera_color_optical_frame", + "wrist_camera_depth_optical_frame", + "wrist_camera_link", + } + for t in transforms: + assert t.frame_id == "world" + assert t.translation.x == pytest.approx(1.0) + assert t.translation.y == pytest.approx(2.0) + assert t.translation.z == pytest.approx(3.0) + + def test_publish_tf_none_noop(self, camera_with_mock_engine: MujocoCamera): + cam = camera_with_mock_engine + mock_tf = MagicMock() + + with patch.object(type(cam), "tf", new_callable=lambda: property(lambda self: mock_tf)): + cam._publish_tf(ts=0.0, frame=None) + + mock_tf.publish.assert_not_called()