From 501572ce91f98fed0ef259815cc4a72ee0f5a7f7 Mon Sep 17 00:00:00 2001 From: "Kevin M. Dean" Date: Mon, 9 Feb 2026 08:45:15 -0600 Subject: [PATCH] Add KINESIS stage + new galvo waveforms and tests Add support for Thorlabs Kinesis (serial) stages: new pylablib-backed API (pykinesis_controller), a KINESISStage implementation in the Thorlabs stage module, and configuration DB entry. Add quadratic and centered_cubic galvo waveforms and wire them into GalvoBase. Make MS2000 serial buffer sizing platform-safe (only attempt set_buffer_size on Windows). Update docs to list new waveform options and KINESIS stage config, and add/adjust unit tests for MS2000, waveforms, galvo base, and device startup flow (including a test ensuring the KINESIS start path uses the factory). --- .../01_supported_hardware/galvo.rst | 3 + .../01_supported_hardware/stage.rst | 58 ++++++- src/navigate/config/configuration_database.py | 1 + .../devices/APIs/asi/asi_MS2000_controller.py | 9 +- .../APIs/thorlabs/pykinesis_controller.py | 134 ++++++++++++++++ src/navigate/model/devices/galvo/base.py | 20 ++- src/navigate/model/devices/stage/thorlabs.py | 151 ++++++++++++++++-- src/navigate/model/waveforms.py | 48 ++++++ test/model/devices/APIs/asi/__init__.py | 2 + .../APIs/asi/test_asi_ms2000_controller.py | 34 ++++ test/model/devices/galvo/test_galvo_base.py | 2 +- test/model/test_device_startup_functions.py | 48 +++++- test/model/test_waveforms.py | 21 +++ 13 files changed, 512 insertions(+), 19 deletions(-) create mode 100644 src/navigate/model/devices/APIs/thorlabs/pykinesis_controller.py create mode 100644 test/model/devices/APIs/asi/__init__.py create mode 100644 test/model/devices/APIs/asi/test_asi_ms2000_controller.py diff --git a/docs/source/02_user_guide/01_supported_hardware/galvo.rst b/docs/source/02_user_guide/01_supported_hardware/galvo.rst index 674a358b1..8c4e4c0b7 100644 --- a/docs/source/02_user_guide/01_supported_hardware/galvo.rst +++ b/docs/source/02_user_guide/01_supported_hardware/galvo.rst @@ -12,6 +12,9 @@ National Instruments Multiple types of galvanometers have been used, including Cambridge Technologies/Novanta, Thorlabs, and ScannerMAX. Each of these devices are externally controlled via analog signals delivered from an NI-based data acquisition card. +For NI and synthetic galvos, the software supports the following waveform options: +``sawtooth``, ``sine``, ``halfsaw``, ``quadratic``, and ``centered_cubic``. + .. collapse:: Configuration File .. code-block:: yaml diff --git a/docs/source/02_user_guide/01_supported_hardware/stage.rst b/docs/source/02_user_guide/01_supported_hardware/stage.rst index 6732a2a67..415642d09 100644 --- a/docs/source/02_user_guide/01_supported_hardware/stage.rst +++ b/docs/source/02_user_guide/01_supported_hardware/stage.rst @@ -148,6 +148,10 @@ MFC2000 MS2000 ~~~~~~~ +.. note:: + + On Windows, **navigate** configures serial buffer sizes for MS2000 + communication. On Linux/macOS this tuning step is skipped for compatibility. .. collapse:: Configuration File @@ -419,7 +423,7 @@ positioning. stage: hardware: - - type: Thorlabs + type: KIM001 serial_number: 74000375 axes: [f] axes_mapping: [1] @@ -508,6 +512,58 @@ KST101 | +----------------- + +KINESIS (Serial) +~~~~~~~~~~~~~~~~ + +This mode supports Thorlabs Kinesis stepper communication through +``pylablib`` using a serial device path (for example ``/dev/ttyUSB1`` on Linux). + +.. note:: + + ``steps_per_um`` is the preferred scale parameter for KINESIS stages. + If omitted, **navigate** falls back to ``device_units_per_mm``. + +.. collapse:: Configuration File + + .. code-block:: yaml + + microscopes: + microscope_name: + stage: + hardware: + - + type: KINESIS + serial_number: /dev/ttyUSB1 + axes: [f] + axes_mapping: [1] + steps_per_um: 2008.623 + min: 0 + max: 25 + joystick_axes: [f] + x_min: -10000.0 + x_max: 10000.0 + y_min: -10000.0 + y_max: 10000.0 + z_min: -10000.0 + z_max: 10000.0 + theta_min: 0.0 + theta_max: 360.0 + f_min: -10000.0 + f_max: 10000.0 + x_offset: 0.0 + y_offset: 0.0 + z_offset: 0.0 + theta_offset: 0.0 + f_offset: 0.0 + flip_x: False + flip_y: False + flip_z: False + flip_f: False + +| + -------------- .. _galvo_stage: diff --git a/src/navigate/config/configuration_database.py b/src/navigate/config/configuration_database.py index b0990dcc6..14d9169c0 100644 --- a/src/navigate/config/configuration_database.py +++ b/src/navigate/config/configuration_database.py @@ -182,6 +182,7 @@ "Sutter Instruments": ("MP285", "sutter"), "ThorLabs KCube Inertial Device KIM001": ("KIM001", "thorlabs"), "ThorLabs KCube Inertial Device KST101": ("KST101", "thorlabs"), + "ThorLabs Kinesis Stepper Motor (Serial)": ("KINESIS", "thorlabs"), "Virtual Device": ("Synthetic", "synthetic"), } diff --git a/src/navigate/model/devices/APIs/asi/asi_MS2000_controller.py b/src/navigate/model/devices/APIs/asi/asi_MS2000_controller.py index 2268694e2..e75b7d30c 100644 --- a/src/navigate/model/devices/APIs/asi/asi_MS2000_controller.py +++ b/src/navigate/model/devices/APIs/asi/asi_MS2000_controller.py @@ -31,6 +31,7 @@ # Standard Imports +import platform import time import logging @@ -104,8 +105,12 @@ def connect_to_serial( self.serial.write_timeout = write_timeout self.serial.timeout = read_timeout - # set the size of the rx and tx buffers before calling open - self.serial.set_buffer_size(rx_size, tx_size) + # set_buffer_size is only available/reliable on some platforms + if platform.system() == "Windows" and hasattr(self.serial, "set_buffer_size"): + try: + self.serial.set_buffer_size(rx_size, tx_size) + except Exception as e: + logger.debug(f"Unable to set serial buffer size on Windows: {e}") try: self.serial.open() except SerialException: diff --git a/src/navigate/model/devices/APIs/thorlabs/pykinesis_controller.py b/src/navigate/model/devices/APIs/thorlabs/pykinesis_controller.py new file mode 100644 index 000000000..3c44bd770 --- /dev/null +++ b/src/navigate/model/devices/APIs/thorlabs/pykinesis_controller.py @@ -0,0 +1,134 @@ +# Copyright (c) 2021-2025 The University of Texas Southwestern Medical Center. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted for academic and research use only (subject to the +# limitations in the disclaimer below) provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holders nor the names of its +# contributors may be used to endorse or promote products derived from this +# software without specific prior written permission. +# +# NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY +# THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND +# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A +# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR +# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER +# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +"""Thorlabs Kinesis controller wrapper backed by pylablib.""" + +import logging +from time import sleep +from typing import Any + +# Logger Setup +p = __name__.split(".")[1] +logger = logging.getLogger(p) + +SLEEP_AFTER_WAIT = 0.100 + + +class KinesisStage: + """Simple wrapper around pylablib's Kinesis motor API.""" + + def __init__(self, dev_path: str, verbose: bool = False): + self.verbose = verbose + self.dev_path = str(dev_path) + self.stage = None + self.move_params: dict[str, float] = { + "min_velocity": 0.0, + "max_velocity": 0.0, + "acceleration": 0.0, + } + self.open() + + @staticmethod + def _load_thorlabs_backend() -> Any: + """Load pylablib Thorlabs backend lazily.""" + try: + from pylablib.devices import Thorlabs + except ImportError as e: + raise ImportError( + "pylablib is required for KINESIS stage support." + ) from e + return Thorlabs + + def open(self) -> None: + """Open the device for communications.""" + thorlabs = self._load_thorlabs_backend() + connection = {"port": self.dev_path, "baudrate": 115200, "rtscts": True} + try: + self.stage = thorlabs.KinesisMotor(("serial", connection), scale="step") + except Exception as e: + raise ConnectionError(f"KINESIS stage connection failed: {e}") from e + + def close(self) -> None: + """Disconnect and close the device.""" + if self.stage is None: + return + try: + self.stage.stop() + except Exception: + pass + self.stage.close() + + def move_to_position( + self, position_um: float, steps_per_um: float, wait_till_done: bool + ) -> None: + """Move to absolute position in microns.""" + current_steps = self.stage.get_position(channel=1, scale=False) + target_steps = int(round(float(position_um) * float(steps_per_um))) + delta_steps = target_steps - int(current_steps) + self.stage.move_by(delta_steps, channel=1, scale=False) + if wait_till_done: + self.stage.wait_move(channel=1) + if SLEEP_AFTER_WAIT: + sleep(SLEEP_AFTER_WAIT) + + def get_current_position(self, steps_per_um: float) -> float: + """Get current position in microns.""" + current_steps = self.stage.get_position(channel=1, scale=False) + position_um = float(current_steps) / float(steps_per_um) + return round(position_um, 2) + + def stop(self) -> None: + """Halt motion.""" + self.stage.stop() + + def home_stage(self) -> None: + """Run homing sequence.""" + self.stage.home() + + def set_velocity_params( + self, + min_velocity: float, + max_velocity: float, + acceleration: float, + steps_per_um: float, + ) -> None: + """Set motion profile parameters.""" + min_velocity_steps = min_velocity * steps_per_um + max_velocity_steps = max_velocity * steps_per_um + acceleration_steps = acceleration * steps_per_um + self.stage.set_move_params( + min_velocity_steps, max_velocity_steps, acceleration_steps + ) + self.move_params = { + "min_velocity": min_velocity_steps, + "max_velocity": max_velocity_steps, + "acceleration": acceleration_steps, + } diff --git a/src/navigate/model/devices/galvo/base.py b/src/navigate/model/devices/galvo/base.py index 8be0c11ac..2e165eca5 100644 --- a/src/navigate/model/devices/galvo/base.py +++ b/src/navigate/model/devices/galvo/base.py @@ -38,7 +38,7 @@ # Third Party Imports # Local Imports -from navigate.model.waveforms import sawtooth, sine_wave +from navigate.model.waveforms import centered_cubic, quadratic, sawtooth, sine_wave from navigate.tools.decorators import log_initialization # # Logger Setup @@ -220,6 +220,24 @@ def adjust( duty_cycle=galvo_rising_ramp, phase=self.camera_delay, ) + elif self.galvo_waveform == "centered_cubic": + self.waveform_dict[channel_key] = centered_cubic( + sample_rate=self.sample_rate, + sweep_time=self.sweep_time, + exposure=exposure_time, + delay=self.camera_delay, + amplitude=galvo_amplitude, + offset=galvo_offset, + ) + elif self.galvo_waveform == "quadratic": + self.waveform_dict[channel_key] = quadratic( + sample_rate=self.sample_rate, + sweep_time=self.sweep_time, + exposure=exposure_time, + delay=self.camera_delay, + amplitude=galvo_amplitude, + offset=galvo_offset, + ) elif self.galvo_waveform == "sine": self.waveform_dict[channel_key] = sine_wave( sample_rate=self.sample_rate, diff --git a/src/navigate/model/devices/stage/thorlabs.py b/src/navigate/model/devices/stage/thorlabs.py index a00cc8a81..f3f4658e8 100644 --- a/src/navigate/model/devices/stage/thorlabs.py +++ b/src/navigate/model/devices/stage/thorlabs.py @@ -34,7 +34,7 @@ import logging import time from multiprocessing.managers import ListProxy -from typing import Any, Optional +from typing import Any # Local Imports from navigate.model.devices.stage.base import StageBase @@ -46,6 +46,32 @@ logger = logging.getLogger(p) +def _get_stage_device_entry( + configuration: dict[str, Any], microscope_name: str, device_id: int +) -> dict[str, Any]: + """Return stage device config entry regardless of list/single configuration.""" + stage_hardware = configuration["configuration"]["microscopes"][microscope_name][ + "stage" + ]["hardware"] + if type(stage_hardware) == ListProxy: + return stage_hardware[device_id] + return stage_hardware + + +def _get_device_unit_scale(device_entry: dict[str, Any]) -> float: + """Return device-units-per-mm with safe fallbacks for legacy configs.""" + device_units_per_mm = device_entry.get("device_units_per_mm") + if device_units_per_mm is not None: + return float(device_units_per_mm) + + # Legacy KINESIS configs may define steps_per_um instead. + steps_per_um = device_entry.get("steps_per_um") + if steps_per_um is not None: + return float(steps_per_um) * 1000.0 + + return 1000.0 + + @log_initialization class KIM001Stage(StageBase, IntegratedDevice): """Thorlabs KIM Stage""" @@ -300,18 +326,13 @@ def __init__( #: list: List of KST axes available. self.KST_axes = list(self.axes_mapping.values()) - device_config = configuration["configuration"]["microscopes"][microscope_name][ - "stage" - ]["hardware"] - if type(device_config) == ListProxy: - #: str: Serial number of the stage. - self.serial_number = str(device_config[device_id]["serial_number"]) - - #: float: Device units per mm. - self.device_unit_scale = device_config[device_id]["device_units_per_mm"] - else: - self.serial_number = device_config["serial_number"] - self.device_unit_scale = device_config["device_units_per_mm"] + device_entry = _get_stage_device_entry( + configuration, microscope_name, device_id + ) + #: str: Serial number of the stage. + self.serial_number = str(device_entry.get("serial_number", "")) + #: float: Device units per mm. + self.device_unit_scale = _get_device_unit_scale(device_entry) if device_connection is not None: #: object: Thorlabs KST Stage controller @@ -515,3 +536,107 @@ def stop(self) -> None: Stop all stage channels move """ self.kst_controller.KST_MoveStop(self.serial_number) + + +@log_initialization +class KINESISStage(StageBase): + """Thorlabs Kinesis stage via pylablib (serial backend).""" + + def __init__( + self, + microscope_name: str, + device_connection: Any, + configuration: dict[str, Any], + device_id: int = 0, + ) -> None: + super().__init__(microscope_name, device_connection, configuration, device_id) + + axes_mapping = {"x": 1, "y": 1, "z": 1, "f": 1} + if not self.axes_mapping: + if self.axes[0] not in axes_mapping: + raise KeyError(f"KINESIS stage does not support axis: {self.axes[0]}") + self.axes_mapping = {self.axes[0]: axes_mapping[self.axes[0]]} + + device_entry = _get_stage_device_entry( + configuration, microscope_name, device_id + ) + self.serial_number = str(device_entry.get("serial_number", "")) + + steps_per_um = device_entry.get("steps_per_um") + if steps_per_um is None: + # Fallback to existing stage field so configs do not need to change. + device_units_per_mm = _get_device_unit_scale(device_entry) + steps_per_um = float(device_units_per_mm) / 1000.0 + self.steps_per_um = float(steps_per_um) + + self.kinesis_controller = device_connection + + def __del__(self) -> None: + try: + self.stop() + self.kinesis_controller.close() + except Exception: + pass + + @classmethod + def get_connect_params(cls) -> list[str]: + return ["serial_number"] + + @classmethod + def connect(cls, serial_number: str) -> Any: + kinesis_module = importlib.import_module( + "navigate.model.devices.APIs.thorlabs.pykinesis_controller" + ) + return kinesis_module.KinesisStage(str(serial_number), False) + + def report_position(self) -> dict[str, float]: + try: + pos = self.kinesis_controller.get_current_position(self.steps_per_um) + setattr(self, f"{self.axes[0]}_pos", pos) + except Exception: + pass + return self.get_position_dict() + + def move_axis_absolute( + self, axis: str, abs_pos: float, wait_until_done: bool = False + ) -> bool: + if axis not in self.axes_mapping: + return False + + axis_abs = self.get_abs_position(axis, abs_pos) + if axis_abs == -1e50: + return False + + self.kinesis_controller.move_to_position( + axis_abs, self.steps_per_um, wait_until_done + ) + return True + + def move_absolute( + self, move_dictionary: dict[str, float], wait_until_done: bool = False + ) -> bool: + result = True + for axis in self.axes_mapping.keys(): + if f"{axis}_abs" not in move_dictionary: + continue + result = ( + self.move_axis_absolute( + axis, move_dictionary[f"{axis}_abs"], wait_until_done + ) + and result + ) + return result + + def move_to_position(self, position: float, wait_until_done: bool = False) -> bool: + return self.move_axis_absolute(self.axes[0], position, wait_until_done) + + def run_homing(self) -> None: + self.kinesis_controller.home_stage() + axis = self.axes[0] + midpoint = ( + getattr(self, f"{axis}_min", 0) + getattr(self, f"{axis}_max", 25) + ) / 2 + self.move_axis_absolute(axis, midpoint, wait_until_done=True) + + def stop(self) -> None: + self.kinesis_controller.stop() diff --git a/src/navigate/model/waveforms.py b/src/navigate/model/waveforms.py index 293707759..f76509f41 100644 --- a/src/navigate/model/waveforms.py +++ b/src/navigate/model/waveforms.py @@ -462,6 +462,54 @@ def sine_wave( return waveform +def quadratic( + sample_rate=100000, + sweep_time=0.4, + exposure=0.100, + amplitude=0.1, + offset=0, + delay=0, +): + """Quadratic galvo waveform used for curved ASLM.""" + samples = int(sample_rate * sweep_time) + exposure_samples = int(sample_rate * exposure) + waveform = np.full(samples, offset) + start_sample = int(delay * sample_rate) if delay else 0 + stop_sample = min(start_sample + exposure_samples, samples) + if stop_sample <= start_sample: + return waveform + + tn = np.linspace(-1, 1, stop_sample - start_sample) + quadratic_waveform = amplitude * tn**2 + offset + waveform[start_sample:stop_sample] = quadratic_waveform + return waveform + + +def centered_cubic( + sample_rate=100000, + sweep_time=0.4, + exposure=0.100, + amplitude=0.0, + offset=0, + delay=0, +): + """Centered cubic galvo waveform used for curved ASLM.""" + samples = int(sample_rate * sweep_time) + exposure_samples = int(sample_rate * exposure) + waveform = np.full(samples, offset) + start_sample = int(delay * sample_rate) if delay else 0 + stop_sample = min(start_sample + exposure_samples, samples) + if stop_sample <= start_sample: + return waveform + + tn = np.linspace(-1, 1, stop_sample - start_sample) + b = (offset - amplitude) / 2 + a = offset - b + cubic_waveform = a * tn**2 + b * tn**3 + waveform[start_sample:stop_sample] = cubic_waveform + return waveform + + def smooth_waveform(waveform, percent_smoothing=10): """Smooths a numpy array via convolution diff --git a/test/model/devices/APIs/asi/__init__.py b/test/model/devices/APIs/asi/__init__.py new file mode 100644 index 000000000..c6f9a5bae --- /dev/null +++ b/test/model/devices/APIs/asi/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) 2021-2025 The University of Texas Southwestern Medical Center. +# All rights reserved. diff --git a/test/model/devices/APIs/asi/test_asi_ms2000_controller.py b/test/model/devices/APIs/asi/test_asi_ms2000_controller.py new file mode 100644 index 000000000..9f9849724 --- /dev/null +++ b/test/model/devices/APIs/asi/test_asi_ms2000_controller.py @@ -0,0 +1,34 @@ +# Copyright (c) 2021-2025 The University of Texas Southwestern Medical Center. +# All rights reserved. + +from unittest.mock import MagicMock, patch + +from navigate.model.devices.APIs.asi.asi_MS2000_controller import MS2000Controller + + +def _build_controller(): + controller = MS2000Controller("COM1", 115200) + controller.serial = MagicMock() + controller.serial.is_open = True + controller.report_to_console = MagicMock() + return controller + + +def test_connect_to_serial_sets_buffer_size_on_windows(): + controller = _build_controller() + with patch( + "navigate.model.devices.APIs.asi.asi_MS2000_controller.platform.system", + return_value="Windows", + ): + controller.connect_to_serial() + controller.serial.set_buffer_size.assert_called_once_with(12800, 12800) + + +def test_connect_to_serial_skips_buffer_size_on_linux(): + controller = _build_controller() + with patch( + "navigate.model.devices.APIs.asi.asi_MS2000_controller.platform.system", + return_value="Linux", + ): + controller.connect_to_serial() + controller.serial.set_buffer_size.assert_not_called() diff --git a/test/model/devices/galvo/test_galvo_base.py b/test/model/devices/galvo/test_galvo_base.py index 7ba8e226e..4017215ae 100644 --- a/test/model/devices/galvo/test_galvo_base.py +++ b/test/model/devices/galvo/test_galvo_base.py @@ -108,7 +108,7 @@ def test_galvo_base_initialization(self): def test_adjust_with_valid_input(self): # Test the method with valid input data - for waveform in ["sawtooth", "sine"]: + for waveform in ["sawtooth", "sine", "quadratic", "centered_cubic"]: self.galvo.galvo_waveform = waveform result = self.galvo.adjust(self.exposure_times, self.sweep_times) diff --git a/test/model/test_device_startup_functions.py b/test/model/test_device_startup_functions.py index 5f9acab27..fd425079d 100644 --- a/test/model/test_device_startup_functions.py +++ b/test/model/test_device_startup_functions.py @@ -31,7 +31,7 @@ # Standard library imports import unittest -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import multiprocessing # Third party imports @@ -208,3 +208,49 @@ def test_start_device_plugin(self): plugin_devices[device_category][device_class_name][ "start_device" ].assert_called_once() + + def test_start_device_kinesis_stage_uses_factory(self): + """Ensure KINESIS stage instantiates via start_device factory flow.""" + with multiprocessing.Manager() as manager: + stage_hardware = manager.list( + [ + { + "type": "KINESIS", + "serial_number": "/dev/ttyUSB1", + "axes": ["f"], + "axes_mapping": [1], + "steps_per_um": 2000.0, + } + ] + ) + configuration = { + "configuration": { + "microscopes": { + "TestMicroscope": { + "stage": { + "hardware": stage_hardware, + "f_min": 0, + "f_max": 25000, + } + } + } + } + } + + with patch( + "navigate.model.devices.stage.thorlabs.KINESISStage.connect", + return_value=MagicMock(), + ) as connect_mock: + stage = start_device( + "TestMicroscope", + configuration, + "stage", + 0, + False, + None, + {}, + ) + + connect_mock.assert_called_once_with("/dev/ttyUSB1") + assert stage.__class__.__name__ == "KINESISStage" + assert stage.steps_per_um == 2000.0 diff --git a/test/model/test_waveforms.py b/test/model/test_waveforms.py index ca5811f40..ae46fd5d3 100644 --- a/test/model/test_waveforms.py +++ b/test/model/test_waveforms.py @@ -232,3 +232,24 @@ def test_camera_exposure_short(self): sample_rate=sr, sweep_time=st, exposure=ex, camera_delay=cd ) assert np.sum(v > 0) == int(sr * (ex - cd)) + + def test_quadratic_waveform_edges(self): + sr, st, ex = 10000, 0.1, 0.04 + amp, off = 2.0, 0.5 + v = waveforms.quadratic( + sample_rate=sr, sweep_time=st, exposure=ex, amplitude=amp, offset=off + ) + assert len(v) == int(sr * st) + assert np.isclose(np.max(v), amp + off) + assert np.isclose(np.min(v), off) + + def test_centered_cubic_start_end(self): + sr, st, ex = 10000, 0.1, 0.04 + amp, off = 1.2, -0.3 + v = waveforms.centered_cubic( + sample_rate=sr, sweep_time=st, exposure=ex, amplitude=amp, offset=off + ) + exposure_samples = int(sr * ex) + active = v[:exposure_samples] + assert np.isclose(active[0], amp) + assert np.isclose(active[-1], off)