diff --git a/camera_info_manager_py/camera_info_manager/camera_info_manager.py b/camera_info_manager_py/camera_info_manager/camera_info_manager.py index 1360e289..ec056e22 100644 --- a/camera_info_manager_py/camera_info_manager/camera_info_manager.py +++ b/camera_info_manager_py/camera_info_manager/camera_info_manager.py @@ -221,7 +221,7 @@ def __str__(self): :returns: Return string representation of :class:CameraInfoManager. """ - return '[' + self.cname + ']' + str(self.utm) + return '[' + self.cname + ']' + str(self.url) def getCameraInfo(self): """ @@ -326,11 +326,12 @@ def loadCameraInfo(self): """ self._loadCalibration(self.url, self.cname) - def setCameraInfo(self, req): + def setCameraInfo(self, req, rsp): """ Set camera info request callback. :param req: SetCameraInfo request message. + :param rsp: SetCameraInfo response message to populate. :returns: SetCameraInfo response message, success is True if message handled. @@ -339,7 +340,6 @@ def setCameraInfo(self, req): """ self.node.get_logger().debug('SetCameraInfo received for ' + self.cname) self.camera_info = req.camera_info - rsp = SetCameraInfo.Response() rsp.success = saveCalibration(req.camera_info, self.url, self.cname) if not rsp.success: rsp.status_message = 'Error storing camera calibration.' @@ -643,16 +643,21 @@ def saveCalibrationFile(ci, filename, cname): :param cname: Camera name. :returns: True if able to save the data. """ - # make calibration dictionary from CameraInfo fields and camera name + # CameraInfo numeric fields are array.array or numpy arrays on the rclpy side, + # whose elements (numpy.float64) PyYAML's SafeDumper cannot represent; coerce + # to plain Python floats before dumping. + def _to_floats(seq): + return [float(x) for x in seq] + calib = { - 'image_width': ci.width, - 'image_height': ci.height, + 'image_width': int(ci.width), + 'image_height': int(ci.height), 'camera_name': cname, 'distortion_model': ci.distortion_model, - 'distortion_coefficients': {'data': ci.d, 'rows': 1, 'cols': len(ci.d)}, - 'camera_matrix': {'data': ci.k, 'rows': 3, 'cols': 3}, - 'rectification_matrix': {'data': ci.r, 'rows': 3, 'cols': 3}, - 'projection_matrix': {'data': ci.p, 'rows': 3, 'cols': 4}, + 'distortion_coefficients': {'data': _to_floats(ci.d), 'rows': 1, 'cols': len(ci.d)}, + 'camera_matrix': {'data': _to_floats(ci.k), 'rows': 3, 'cols': 3}, + 'rectification_matrix': {'data': _to_floats(ci.r), 'rows': 3, 'cols': 3}, + 'projection_matrix': {'data': _to_floats(ci.p), 'rows': 3, 'cols': 4}, } # make sure the directory exists and the file is writable diff --git a/camera_info_manager_py/camera_info_manager/zoom_camera_info_manager.py b/camera_info_manager_py/camera_info_manager/zoom_camera_info_manager.py index 76c7a83d..ac3108dd 100644 --- a/camera_info_manager_py/camera_info_manager/zoom_camera_info_manager.py +++ b/camera_info_manager_py/camera_info_manager/zoom_camera_info_manager.py @@ -65,10 +65,11 @@ class ZoomCameraInfoManager(CameraInfoManager): calibrate a zoom camera. """ - def __init__(self, min_zoom, max_zoom, cname='camera', url='', namespace=''): + def __init__(self, node, min_zoom, max_zoom, cname='camera', url='', namespace=''): """ Construct the manager. + :param node: rclpy Node used to create the ``set_camera_info`` service and logger. :param int min_zoom: The minimum zoom level. The zoom values should linearly affect the focal distance. :param int max_zoom: The maximum zoom level. @@ -80,7 +81,7 @@ def __init__(self, min_zoom, max_zoom, cname='camera', url='', namespace=''): If a namespace is specified, the '/' separator required between it and ``set_camera_info`` will be supplied automatically. """ - CameraInfoManager.__init__(self, cname, url, namespace) + CameraInfoManager.__init__(self, node, cname, url, namespace) self._min_zoom = min_zoom self._max_zoom = max_zoom @@ -110,7 +111,7 @@ def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): - self.svc.shutdown() + self.node.destroy_service(self.svc) class ApproximateZoomCameraInfoManager(ZoomCameraInfoManager): @@ -140,6 +141,7 @@ class ApproximateZoomCameraInfoManager(ZoomCameraInfoManager): def __init__( self, + node, min_fov, max_fov, initial_image_width, @@ -153,6 +155,7 @@ def __init__( """ Construct the manager. + :param node: rclpy Node used to create the ``set_camera_info`` service and logger. :param float min_fov: The minimum horizontal field of view. Corresponds to maximum zoom level. :param float max_fov: The maximum horizontal field of view. Corresponds to @@ -170,7 +173,7 @@ def __init__( If a namespace is specified, the '/' separator required between it and ``set_camera_info`` will be supplied automatically. """ - ZoomCameraInfoManager.__init__(self, min_zoom, max_zoom, cname, url, namespace) + ZoomCameraInfoManager.__init__(self, node, min_zoom, max_zoom, cname, url, namespace) self._min_fov = min_fov self._max_fov = max_fov @@ -267,11 +270,13 @@ class InterpolatingZoomCameraInfoManager(ZoomCameraInfoManager): """ def __init__( - self, calibration_url_template, zoom_levels, cname='camera', url='', namespace='' + self, node, calibration_url_template, zoom_levels, + cname='camera', url='', namespace='' ): """ Construct the manager. + :param node: rclpy Node used to create the ``set_camera_info`` service and logger. :param string calibration_url_template: Template of the URL that contains the calibration files. The template string should contain a single '%d' substitution, which will be substituted with zoom level. @@ -286,7 +291,7 @@ def __init__( ``set_camera_info`` will be supplied automatically. """ ZoomCameraInfoManager.__init__( - self, min(zoom_levels), max(zoom_levels), cname, url, namespace + self, node, min(zoom_levels), max(zoom_levels), cname, url, namespace ) self._calibration_url_template = calibration_url_template @@ -295,7 +300,10 @@ def __init__( self._camera_infos = None def _update_camera_info(self): - if not self.isCalibrated() or self._camera_infos is None: + # Use the per-zoom calibrations as the source of truth: the base + # camera_info loaded by the parent may be empty (no top-level URL is + # required for InterpolatingZoom). + if not self._camera_infos: return if self._zoom in list(self._camera_infos.keys()): diff --git a/camera_info_manager_py/test/test_zoom_camera_info_manager.py b/camera_info_manager_py/test/test_zoom_camera_info_manager.py new file mode 100644 index 00000000..c74c635b --- /dev/null +++ b/camera_info_manager_py/test/test_zoom_camera_info_manager.py @@ -0,0 +1,250 @@ +# Copyright 2026, Open Source Robotics Foundation, Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# * Neither the name of the the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import pytest +import rclpy +from rclpy.node import Node +from sensor_msgs.msg import CameraInfo +from sensor_msgs.srv import SetCameraInfo + +from camera_info_manager import ( + ApproximateZoomCameraInfoManager, + CameraInfoError, + CameraInfoManager, + CameraInfoMissingError, + InterpolatingZoomCameraInfoManager, + loadCalibrationFile, + saveCalibrationFile, + ZoomCameraInfoManager, +) + + +@pytest.fixture(scope='module', autouse=True) +def rclpy_context(): + rclpy.init() + yield + rclpy.shutdown() + + +@pytest.fixture +def node(): + n = Node('test_camera_info_manager') + yield n + n.destroy_node() + + +def test_camera_info_manager_constructs(node): + cim = CameraInfoManager(node, cname='cam0', url='', namespace='') + assert cim.getCameraName() == 'cam0' + assert cim.getURL() == '' + + +def test_camera_info_manager_str_does_not_reference_undefined_attrs(node): + cim = CameraInfoManager(node, cname='cam0', url='file:///tmp/x.yaml') + assert str(cim) == '[cam0]file:///tmp/x.yaml' + + +def test_get_camera_info_raises_before_load(node): + cim = CameraInfoManager(node, cname='cam0') + with pytest.raises(CameraInfoMissingError): + cim.getCameraInfo() + + +def test_set_camera_name_validation(node): + cim = CameraInfoManager(node, cname='cam0') + assert cim.setCameraName('new_name') is True + assert cim.getCameraName() == 'new_name' + assert cim.setCameraName('') is False + assert cim.setCameraName('bad name') is False + + +def test_zoom_manager_constructs_with_node(node): + z = ZoomCameraInfoManager(node, min_zoom=0, max_zoom=10, cname='zoom0') + assert z.getCameraName() == 'zoom0' + assert z._min_zoom == 0 + assert z._max_zoom == 10 + assert z._zoom == 0 + + +def test_zoom_manager_set_zoom_in_range(node): + z = ApproximateZoomCameraInfoManager( + node, min_fov=30.0, max_fov=70.0, + initial_image_width=640, initial_image_height=480, + min_zoom=0, max_zoom=100, cname='zoom0', + ) + z.set_zoom(50) + assert z._zoom == 50 + + +def test_zoom_manager_set_zoom_out_of_range_raises(node): + z = ApproximateZoomCameraInfoManager( + node, min_fov=30.0, max_fov=70.0, + initial_image_width=640, initial_image_height=480, + min_zoom=0, max_zoom=100, cname='zoom0', + ) + with pytest.raises(CameraInfoError): + z.set_zoom(101) + with pytest.raises(CameraInfoError): + z.set_zoom(-1) + + +def test_approximate_zoom_updates_k_without_calibration(node): + z = ApproximateZoomCameraInfoManager( + node, min_fov=30.0, max_fov=70.0, + initial_image_width=640, initial_image_height=480, + min_zoom=0, max_zoom=100, cname='zoom0', + ) + z.loadCameraInfo() + z.set_zoom(50) + info = z.getCameraInfo() + assert info.width == 640 + assert info.height == 480 + assert info.k[0] > 0.0 + assert info.k[4] > 0.0 + assert info.k[2] == 320.0 + assert info.k[5] == 240.0 + + +def test_approximate_zoom_set_resolution(node): + z = ApproximateZoomCameraInfoManager( + node, min_fov=30.0, max_fov=70.0, + initial_image_width=640, initial_image_height=480, + min_zoom=0, max_zoom=100, cname='zoom0', + ) + z.loadCameraInfo() + z.set_resolution(1280, 720) + info = z.getCameraInfo() + assert info.width == 1280 + assert info.height == 720 + + +def test_interpolating_zoom_constructs(node): + iz = InterpolatingZoomCameraInfoManager( + node, calibration_url_template='file:///tmp/cal_%d.yaml', + zoom_levels=[0, 50, 100], cname='zoom0', + ) + assert iz._min_zoom == 0 + assert iz._max_zoom == 100 + assert iz._calibration_url_template == 'file:///tmp/cal_%d.yaml' + + +def test_zoom_manager_context_exit_does_not_raise(node): + z = ZoomCameraInfoManager(node, min_zoom=0, max_zoom=10, cname='zoom0') + with z: + pass + + +def _make_camera_info(width=640, height=480, fx=500.0, fy=500.0, cx=320.0, cy=240.0): + ci = CameraInfo() + ci.width = width + ci.height = height + ci.distortion_model = 'plumb_bob' + ci.d = [0.0, 0.0, 0.0, 0.0, 0.0] + ci.k = [fx, 0.0, cx, 0.0, fy, cy, 0.0, 0.0, 1.0] + ci.r = [1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0] + ci.p = [fx, 0.0, cx, 0.0, 0.0, fy, cy, 0.0, 0.0, 0.0, 1.0, 0.0] + return ci + + +def test_yaml_round_trip(tmp_path): + cname = 'cam0' + src = _make_camera_info(width=1024, height=768, fx=900.0, fy=901.0) + yaml_file = tmp_path / 'cam0.yaml' + assert saveCalibrationFile(src, str(yaml_file), cname) is True + + loaded = loadCalibrationFile(str(yaml_file), cname) + assert loaded.width == src.width + assert loaded.height == src.height + assert loaded.distortion_model == src.distortion_model + assert list(loaded.d) == list(src.d) + assert list(loaded.k) == list(src.k) + assert list(loaded.r) == list(src.r) + assert list(loaded.p) == list(src.p) + + +def test_camera_info_manager_loads_from_file(node, tmp_path): + cname = 'cam0' + src = _make_camera_info(fx=777.0) + yaml_file = tmp_path / 'cam0.yaml' + saveCalibrationFile(src, str(yaml_file), cname) + + cim = CameraInfoManager(node, cname=cname, url='file://' + str(yaml_file)) + cim.loadCameraInfo() + assert cim.isCalibrated() + info = cim.getCameraInfo() + assert info.k[0] == 777.0 + + +def test_set_camera_info_service_round_trip(node, tmp_path): + cname = 'set_svc_cam' + yaml_file = tmp_path / (cname + '.yaml') + cim = CameraInfoManager(node, cname=cname, url='file://' + str(yaml_file)) + + client = node.create_client(SetCameraInfo, 'set_camera_info') + assert client.wait_for_service(timeout_sec=5.0) + + req = SetCameraInfo.Request() + req.camera_info = _make_camera_info(fx=1234.0, fy=1234.0) + future = client.call_async(req) + rclpy.spin_until_future_complete(node, future, timeout_sec=5.0) + assert future.done() + rsp = future.result() + assert rsp.success + assert yaml_file.exists() + + # Service callback updates the in-memory CameraInfo too + info = cim.getCameraInfo() + assert info.k[0] == 1234.0 + + +def test_interpolating_zoom_interpolates_k(node, tmp_path): + cname = 'izoom' + # Calibrate at zoom 0 (fx=400) and zoom 100 (fx=800) + low = _make_camera_info(fx=400.0, fy=400.0) + high = _make_camera_info(fx=800.0, fy=800.0) + saveCalibrationFile(low, str(tmp_path / 'cal_0.yaml'), cname) + saveCalibrationFile(high, str(tmp_path / 'cal_100.yaml'), cname) + + template = 'file://' + str(tmp_path) + '/cal_%d.yaml' + iz = InterpolatingZoomCameraInfoManager( + node, calibration_url_template=template, zoom_levels=[0, 100], cname=cname, + ) + iz.loadCameraInfo() + + # Exact-zoom hit + iz.set_zoom(0) + assert iz.getCameraInfo().k[0] == pytest.approx(400.0) + iz.set_zoom(100) + assert iz.getCameraInfo().k[0] == pytest.approx(800.0) + + # Midpoint interpolation: ratio = (50 - 0) / (100 - 0) = 0.5 + # camera_info.k = ratio * low + (1 - ratio) * high = 0.5*400 + 0.5*800 = 600 + iz.set_zoom(50) + assert iz.getCameraInfo().k[0] == pytest.approx(600.0) + assert iz.getCameraInfo().k[4] == pytest.approx(600.0)