diff --git a/examples/example_device_config.yaml b/examples/example_device_config.yaml index ad95cbe7..449cb785 100644 --- a/examples/example_device_config.yaml +++ b/examples/example_device_config.yaml @@ -17,8 +17,9 @@ TxConfiguration: channel_max_amplitude: [200, 6000, 6000, 6000] # Filter configuration of each transmit channel channel_filter_type: [0, 2, 2, 2] - # Configure which channels are terminated into 50 ohm impedance (true == 50 ohm), list length must at least match the number of enabled channels - channel_terminated_50ohm: [True, False, False, False] + # Configure if RF/gradients are terminated into 50 ohm impedance (true == 50 ohm) + rf_terminated_50ohm: True + gradients_terminated_50ohm: False # Calculate grad_to_volt per gradient channel: # Gradient efficiency in T/m/A gradient_efficiency: [0.37e-3, 0.451e-3, 0.4e-3] @@ -33,7 +34,7 @@ RxConfiguration: # Could be extended to a list device_path: "/dev/spcm0" # Number of channels - max_available_channels: 4 + max_available_channels: 8 # Device sampling rate in MHz sampling_rate: 20 # Enable the receive channels diff --git a/src/console/interfaces/device_configuration.py b/src/console/interfaces/device_configuration.py index f4de561c..1aa82b52 100644 --- a/src/console/interfaces/device_configuration.py +++ b/src/console/interfaces/device_configuration.py @@ -1,9 +1,15 @@ """Implementation of the device configuration models.""" -from pydantic import BaseModel, Field, field_validator, model_validator +from typing import Annotated -# Specify the number of gradient channels -NUM_GRADIENTS = 3 +from pydantic import BaseModel, Field, model_validator +# Ensure that max. amplitude is within 1 and 6000 mV. +# Limits may depend on specific configuration of spectrum cards. +AmplitudeInt = Annotated[int, Field(ge=1, le=6000)] +# Ensure valid filter type, channel filter type must be 0, 1, 2 or 3. +# See spectrum instrumentation manual for reference. +# This may depend on the specific card configuration. +FilterTypeInt = Annotated[int, Field(ge=0, le=3)] class TxConfiguration(BaseModel): """Transmit device configuration.""" @@ -12,50 +18,14 @@ class TxConfiguration(BaseModel): device_path: str = Field(..., strict=True) sampling_rate: int = Field(..., strict=True) - max_available_channels: int = Field(..., strict=True) - channel_max_amplitude: list[int] = Field(..., strict=True) - channel_filter_type: list[int] = Field(..., strict=True) - channel_terminated_50ohm: list[bool] = Field(..., strict=True) - gradient_efficiency: list[float] = Field(..., min_length=NUM_GRADIENTS, max_length=NUM_GRADIENTS, strict=True) - gpa_gain: list[float] = Field(..., min_length=NUM_GRADIENTS, max_length=NUM_GRADIENTS, strict=True) + channel_max_amplitude: tuple[AmplitudeInt, AmplitudeInt, AmplitudeInt, AmplitudeInt] = Field(...) + channel_filter_type: tuple[FilterTypeInt, FilterTypeInt, FilterTypeInt, FilterTypeInt] = Field(...) + rf_terminated_50ohm: bool = Field(..., strict=True) + gradients_terminated_50ohm: bool = Field(..., strict=True) + gradient_efficiency: tuple[float, float, float] = Field(...) + gpa_gain: tuple[float, float, float] = Field(...) rf_to_mvolt: float = Field(..., strict=True) - @model_validator(mode="after") - def validate_lists(self) -> "TxConfiguration": - """Validate the length of device-specific lists.""" - list_fields: dict[str, list[int] | list[bool]] = { - "channel_max_amplitude": self.channel_max_amplitude, - "channel_filter_type": self.channel_filter_type, - "channel_terminated_50ohm": self.channel_terminated_50ohm, - } - errors = [] - for name, val in list_fields.items(): - if len(val) < self.max_available_channels: - _err = f"Field '{name}': \ - length {len(val)} is less than max_available_channels {self.max_available_channels}" - errors.append(_err) - if errors: - raise ValueError("; ".join(errors)) - return self - - @field_validator("channel_max_amplitude") - @classmethod - def validate_max_amplitude(cls, values: list[int]) -> list[int]: - """Ensure that max. amplitude is within 1 and 6000 mV.""" - if not all(val in range(1, 6001) for val in values): - msg = "Channel max amplitude must be between 1 and 6000 mV." - raise ValueError(msg) - return values - - @field_validator("channel_filter_type") - @classmethod - def validate_filter_type(cls, values: list[int]) -> list[int]: - """Ensure that max. amplitude is within 1 and 6000 mV.""" - if not all(val in range(4) for val in values): - msg = "Channel filter type must be 0, 1, 2 or 3." - raise ValueError(msg) - return values - class RxConfiguration(BaseModel): """Receive device configuration.""" @@ -65,26 +35,34 @@ class RxConfiguration(BaseModel): device_path: str = Field(..., strict=True) sampling_rate: int = Field(..., strict=True) max_available_channels: int = Field(..., strict=True) - channel_enable: list[bool] = Field(..., strict=True) - channel_max_amplitude: list[int] = Field(..., strict=True) - channel_terminated_50ohm: list[bool] = Field(..., strict=True) + channel_enable: tuple[bool, ...] = Field(...) + channel_max_amplitude: tuple[AmplitudeInt, ...] = Field(...) + channel_terminated_50ohm: tuple[bool, ...] = Field(...) @model_validator(mode="after") - def validate_lists(self) -> "RxConfiguration": - """Validate the length of device-specific lists.""" - list_fields: dict[str, list[int] | list[bool]] = { - "channel_max_amplitude": self.channel_max_amplitude, - "channel_enable": self.channel_enable, - "channel_terminated_50ohm": self.channel_terminated_50ohm, - } + def validate_channel_lengths(self) -> "RxConfiguration": + """Validate channel lengths. + + This ensures that `max_available_channels` is length of `channel_enable`, + `channel_max_amplitude` and `channel_terminated_50ohm`. + """ + target_len = self.max_available_channels + dependent_fields = [ + "channel_enable", + "channel_max_amplitude", + "channel_terminated_50ohm", + ] errors = [] - for name, val in list_fields.items(): - if len(val) < self.max_available_channels: - _err = f"Field '{name}'\ - length {len(val)} is less than max_available_channels {self.max_available_channels}" - errors.append(_err) + for field_name in dependent_fields: + current_val = getattr(self, field_name) + if len(current_val) != target_len: + errors.append( + f"{field_name} length ({len(current_val)}) " + f"must be exactly {target_len}" + ) if errors: - raise ValueError("; ".join(errors)) + # Raising ValueError here is the standard Pydantic way + raise ValueError(" ; ".join(errors)) return self diff --git a/src/console/interfaces/unrolled_sequence.py b/src/console/interfaces/unrolled_sequence.py index c13bab21..15e180f6 100644 --- a/src/console/interfaces/unrolled_sequence.py +++ b/src/console/interfaces/unrolled_sequence.py @@ -28,22 +28,30 @@ class UnrolledSequence: rx_data: list[RxData] """List containing the data and metadata of all receive events""" - gpa_gain: list[float] + gpa_gain: tuple[float, float, float] """The gradient waveforms in pulseq are defined in Hz/m. The translation to mV is calculated by 1e3 / (gyro * gpa_gain * grad_efficiency). The gpa gain is given in V/A and accounts for the voltage required to generate an output of 1A. The gyromagnetic ratio defined by 42.58e6 MHz/T.""" - gradient_efficiency: list[float] + gradient_efficiency: tuple[float, float, float] """The gradient waveforms in pulseq are defined in Hz/m. The translation to mV is calculated by 1e3 / (gyro * gpa_gain * grad_efficiency). The gradient efficiency is given in mT/m/A and accounts for the gradient field which is generated per 1A. The gyromagnetic ratio defined by 42.58e6 MHz/T.""" + gradient_output_limits: tuple[int, int, int] + """Integer limits for each gradient output channel in mV. Gradient output limits already contain scaling + from channel termination into high impedance or 50 ohms respectively.""" + rf_to_mvolt: float """If sequence values are given as float values, they can be interpreted as output voltage [mV] directly. This conversion factor represents the scaling from original pulseq RF values [Hz] to card output voltage.""" + rf_output_limit: int + """Integer limit of rf output channel in mV. RF output limits already contains scaling + from channel termination into high impedance or 50 ohms respectively.""" + dwell_time: float """Dwell time of the spectrum card replay data (unrolled sequence). Defines the distance in time between to sample points. diff --git a/src/console/pulseq_interpreter/sequence_provider.py b/src/console/pulseq_interpreter/sequence_provider.py index de14e90f..fe10ae00 100644 --- a/src/console/pulseq_interpreter/sequence_provider.py +++ b/src/console/pulseq_interpreter/sequence_provider.py @@ -5,8 +5,6 @@ from types import SimpleNamespace from typing import Any -import matplotlib as mpl -import matplotlib.pyplot as plt import numpy as np from pypulseq.opts import Opts from pypulseq.Sequence.sequence import Sequence @@ -56,30 +54,41 @@ class SequenceProvider(Sequence): def __init__( self, - gradient_efficiency: list[float], - gpa_gain: list[float], - high_impedance: list[bool], - output_limits: list[int], + gradient_efficiency: tuple[float, float, float], + gpa_gain: tuple[float, float, float], + gradient_output_limits: tuple[int, int, int], + gradients_50ohms: bool, + rf_output_limit: int, + rf_50ohms: bool, + rf_to_mvolt: float, + spcm_dwell_time: float, system_limits: SystemLimits, - spcm_dwell_time: float = 1 / 20e6, - rf_to_mvolt: float = 1., ): """Initialize sequence provider class which is used to unroll a pulseq sequence. Parameters ---------- - output_limits - Output limit per channel in mV, includes both, RF and gradients, e.g. [200, 6000, 6000, 6000] gradient_efficiency - Efficiency of the gradient coils in mT/m/A, e.g. [0.4e-3, 0.4e-3, 0.4e-3] + Efficiency of the gradient coils in mT/m/A, e.g. [0.4e-3, 0.4e-3, 0.4e-3]. gpa_gain - Gain factor of the GPA per gradient channel, e.g. [4.7, 4.7, 4.7] - system_limits - Maximum system limits - spcm_dwell_time, optional - Sampling time raster of the output waveform (depends on spectrum card), by default 1/20e6 + Gain factor of the GPA per gradient channel, e.g. [4.7, 4.7, 4.7]. + gradient_output_limits + Integer output limit per gradient channel in mV, e.g. [6000, 6000, 6000]. + gradients_50ohms + Boolean flag which indicates if the gradient output is terminated into 50 ohms or high impedance. + If terminated into high impedance, the card output doubles, + what needs to be considered when calculating the sequence. + rf_output_limit + Integer output limit of the RF channel in mV. + rf_50ohms + Boolean flag which indicates if the rf output is terminated into 50 ohms (see gradients_50ohms). rf_to_mvolt, optional - Translation of RF waveform from pulseq (Hz) to mV, by default 1 + Translation of RF waveform from pulseq (Hz) to mV. + spcm_dwell_time, optional + Sampling time raster of the output waveform (depends on spectrum card). + system_limits + Absolute maximum system limits defined in the device configuration. + Used to instantiate the pypulseq `Opts()` class. """ super().__init__( system=Opts( @@ -87,7 +96,7 @@ def __init__( B0=50e-3, grad_unit="Hz/m", # system limit is defined in this units slew_unit="Hz/m/s", # system limit is defined in this units - ) + ), ) self.log = logging.getLogger("SeqProv") @@ -96,17 +105,19 @@ def __init__( self.spcm_dwell_time = spcm_dwell_time self.spcm_freq = 1 / spcm_dwell_time self.system_limits = system_limits - self.high_impedance = high_impedance - self.gpa_gain: list[float] = gpa_gain - self.grad_eff: list[float] = gradient_efficiency - - # Set impedance scaling factor, 0.5 if impedance is high, 1 if impedance is 50 ohms - # Halve RF scaling factor if impedance is high, because the card output doubles for high impedance - self.imp_scaling = [0.5 if z else 1 for z in high_impedance] - self.output_limits: list[int] = output_limits if output_limits is not None else [] - - # Initialize larmor frequency and sequence cache variables, to be set later - self._sqnc_cache: np.ndarray | None = None + self.gpa_gain = gpa_gain + self.grad_eff = gradient_efficiency + + # Scale output limit dependent on high impedance flags: + # If output is terminated into high impedance (flag is true), the channel output is doubled. + # Otherwise, if output is terminated into 50 ohms impedance, output limit remains unchanged. + self.rf_out_limit = rf_output_limit if rf_50ohms else int(2 * rf_output_limit) + # Ensure tuple[int, int, int] for mypy typing + self.gradient_out_limits = ( + gradient_output_limits[0] if gradients_50ohms else int(2 * gradient_output_limits[0]), + gradient_output_limits[1] if gradients_50ohms else int(2 * gradient_output_limits[1]), + gradient_output_limits[2] if gradients_50ohms else int(2 * gradient_output_limits[2]), + ) # -------- PyPulseq interface -------- # @@ -186,7 +197,7 @@ def dict(self) -> dict: "spcm_dwell_time": self.spcm_dwell_time, "gpa_gain": self.gpa_gain, "gradient_efficiency": self.grad_eff, - "output_limits": self.output_limits, + "output_limits": self.gradient_out_limits, } def get_adc_events(self) -> list: @@ -227,6 +238,8 @@ def get_rf_events(self) -> list: def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence: """Unroll the pypulseq sequence description. + TODO: Reduce complexity. + Parameters ---------- parameter @@ -276,7 +289,6 @@ def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence: self.log.exception("Checks not passed") raise - self._sqnc_cache = None gradient_index: Dimensions = parameter.channel_assignment # Get list of all events and list of unique RF and ADC events, since they are frequently reused @@ -285,11 +297,13 @@ def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence: rf_events = self.get_rf_events() # Calculate rf pulse and unblanking waveforms from RF event - # Should probably be moved inside of get_rf_events() + # TODO: Should probably be moved inside of get_rf_events() rf_pulses = {} for rf_event in rf_events: rf_pulses[rf_event[0]] = self._calculate_rf( - block=rf_event[1], b1_scaling=parameter.b1_scaling, larmor_frequency=parameter.larmor_frequency, + block=rf_event[1], + b1_scaling=parameter.b1_scaling, + larmor_frequency=parameter.larmor_frequency, ) seq_duration, _, _ = self.duration() @@ -329,7 +343,9 @@ def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence: # must be considered with respect to the target output channel! # Offsets mapping: x -> channel 1, y -> channel 2, z -> channel 3 offset=parameter.gradient_offset.to_list()[int(gradient_index.x-1)], - output_channel=int(gradient_index.x), + # Gradient indexing starts at 1 (RF is channel 0) + # -> correct indexing to match tuple index + output_channel=int(gradient_index.x-1), ) delay = block.gx.delay delay_samples = round(delay * self.spcm_freq) @@ -350,7 +366,9 @@ def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence: # must be considered with respect to the target output channel! # Offsets mapping: x -> channel 1, y -> channel 2, z -> channel 3 offset=parameter.gradient_offset.to_list()[int(gradient_index.y-1)], - output_channel=int(gradient_index.y), + # Gradient indexing starts at 1 (RF is channel 0) + # -> correct indexing to match tuple index + output_channel=int(gradient_index.x-1), ) delay = block.gy.delay delay_samples = round(delay * self.spcm_freq) @@ -371,7 +389,9 @@ def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence: # must be considered with respect to the target output channel! # Offsets mapping: x -> channel 1, y -> channel 2, z -> channel 3 offset=parameter.gradient_offset.to_list()[int(gradient_index.z-1)], - output_channel=int(gradient_index.z), + # Gradient indexing starts at 1 (RF is channel 0) + # -> correct indexing to match tuple index + output_channel=int(gradient_index.x-1), ) delay = block.gz.delay delay_samples = round(delay * self.spcm_freq) @@ -443,9 +463,6 @@ def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence: len(block_durations), ) - # Save unrolled sequence in class - self._sqnc_cache = _seq - return UnrolledSequence( seq=_seq, sample_count=seq_samples, @@ -453,70 +470,14 @@ def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence: gradient_efficiency=self.grad_eff, rf_to_mvolt=self.rf_to_mvolt, dwell_time=self.spcm_dwell_time, + gradient_output_limits=self.gradient_out_limits, + rf_output_limit=self.rf_out_limit, duration=self.duration()[0], adc_count=adc_count, parameter=parameter, rx_data=_rx_data, ) - def plot_unrolled(self, time_range: tuple[float, float] = (0, -1)) -> tuple[mpl.figure.Figure, np.ndarray]: - """Plot unrolled waveforms for replay. - - Parameters - ---------- - time_range, default = (0, -1) - Specify the time range of the plot in seconds. - If the second value is smaller then the first or -1, the whole sequence is plotted. - - Returns - ------- - Matplotlib figure and axis - """ - fig, axis = plt.subplots(5, 1, figsize=(16, 9)) - - if (sqnc := self._sqnc_cache) is None: - msg = "No unrolled sequence. Execute `unroll_sequence(...)` first" - raise RuntimeError(msg) - - seq_start = int(time_range[0] * self.spcm_freq) - seq_end = int(time_range[1] * self.spcm_freq) if time_range[1] > time_range[0] else -1 - samples = np.arange(len(sqnc) // 4, dtype=float)[seq_start:seq_end] * self.spcm_dwell_time * 1e3 - - rf_signal = sqnc[0::4][seq_start:seq_end] - gx_signal = sqnc[1::4][seq_start:seq_end] - gy_signal = sqnc[2::4][seq_start:seq_end] - gz_signal = sqnc[3::4][seq_start:seq_end] - - # Get digital signals - adc_gate = gx_signal.astype(np.uint16) >> 15 - unblanking = gz_signal.astype(np.uint16) >> 15 - - # Get gradient waveforms - rf_signal = rf_signal / np.abs(np.iinfo(np.int16).min) - gx_signal = np.array((np.uint16(gx_signal) << 1).astype(np.int16) / 2**15) - gy_signal = np.array((np.uint16(gy_signal) << 1).astype(np.int16) / 2**15) - gz_signal = np.array((np.uint16(gz_signal) << 1).astype(np.int16) / 2**15) - - axis[0].plot(samples, self.output_limits[0] * rf_signal / self.imp_scaling[0]) - axis[1].plot(samples, self.output_limits[1] * gx_signal / self.imp_scaling[1]) - axis[2].plot(samples, self.output_limits[2] * gy_signal / self.imp_scaling[2]) - axis[3].plot(samples, self.output_limits[3] * gz_signal / self.imp_scaling[3]) - axis[4].plot(samples, adc_gate, label="ADC gate") - axis[4].plot(samples, unblanking, label="RF unblanking") - - axis[0].set_ylabel("RF [mV]") - axis[1].set_ylabel("Gx [mV]") - axis[2].set_ylabel("Gy [mV]") - axis[3].set_ylabel("Gz [mV]") - axis[4].set_ylabel("Digital") - axis[4].legend(loc="upper right") - - _ = [ax.grid(axis="x") for ax in axis] - - axis[4].set_xlabel("Time [ms]") - - return fig, axis - # -------- Private waveform calculation functions -------- # @profile @@ -579,8 +540,8 @@ def _calculate_rf( # Perform this step here to save computation time, num. of envelope samples << num. of resampled signal try: # RF scaling according to B1 calibration and "device" (translation from pulseq to output voltage) - rf_scaling = b1_scaling * self.rf_to_mvolt * self.imp_scaling[0] / self.output_limits[0] - if np.abs(np.amax(envelope_scaled := block.signal * phase_offset * rf_scaling)) > 1: + rf_scaling = b1_scaling * self.rf_to_mvolt * phase_offset / self.rf_out_limit + if np.abs(np.amax(envelope_scaled := block.signal * rf_scaling)) > 1: raise ValueError("RF magnitude exceeds output limit.") except ValueError as err: self.log.exception(err, exc_info=True) @@ -634,32 +595,32 @@ def _calculate_gradient( gradient amplitude exceeds channel maximum output level """ try: - # Calculate gradient waveform scaling, subtract gain and efficiency index by 1, - # because these lists do not include the RF channel (i.e. gradient channel 1 corresponds to index 0) - scaling = fov_scaling * self.imp_scaling[output_channel] / ( - self.system.gamma * 1e-3 * self.gpa_gain[output_channel-1] * self.grad_eff[output_channel-1]) + # Calculate gradient waveform scaling + scaling = fov_scaling / ( + self.system.gamma * 1e-3 * self.gpa_gain[output_channel] * self.grad_eff[output_channel] + ) # Calculate the gradient waveform relative to max output (within the interval [0, 1]) if block.type == "grad": # Arbitrary gradient waveform, interpolate linearly # This function requires float input => cast to int16 afterwards - waveform = block.waveform * scaling - self._check_amplitude(output_channel, np.amax(waveform), self.output_limits[output_channel]) + waveform = block.waveform * scaling / self.gradient_out_limits[output_channel] + self._check_gradient_amplitude(output_channel, np.abs(np.amax(waveform))) # Transfer mV floating point waveform values to int16 if amplitude check passed - waveform *= INT16_MAX / self.output_limits[output_channel] + waveform_i16 = waveform * INT16_MAX # Interpolate waveform on spectrum card time raster gradient = np.interp( x=np.linspace(block.tt[0], block.tt[-1], round(block.shape_dur / self.spcm_dwell_time)), xp=block.tt, - fp=waveform, + fp=waveform_i16, ) elif block.type == "trap": # Construct trapezoidal gradient from rise, flat and fall sections - flat_amp = block.amplitude * scaling - self._check_amplitude(output_channel, np.amax(flat_amp), self.output_limits[output_channel]) - # Transfer mV floating point flat amplitude to int16 if amplitude check passed - flat_amp_i16 = flat_amp * INT16_MAX / self.output_limits[output_channel] + flat_amp = block.amplitude * scaling / self.gradient_out_limits[output_channel] + self._check_gradient_amplitude(output_channel, np.abs(np.amax(flat_amp))) + # Transfer relative floating point flat amplitude to int16 if amplitude check passed + flat_amp_i16 = flat_amp * INT16_MAX # Define rise, flat and fall sections of trapezoidal gradient on spectrum card time raster rise = np.linspace(0, flat_amp_i16, round(block.rise_time / self.spcm_dwell_time)) flat = np.full(round(block.flat_time / self.spcm_dwell_time), fill_value=flat_amp_i16) @@ -672,12 +633,12 @@ def _calculate_gradient( # Calculate gradient offset int16 value from mV # Gradient offset is used for calculating output limits but is not added to the waveform - offset_i16 = offset * INT16_MAX / self.output_limits[output_channel] + offset_i16 = offset * INT16_MAX / self.gradient_out_limits[output_channel] # This is the combined int16 gradient and offset waveform as float dtype combined_i16 = gradient + offset_i16 if (max_strength_i16 := np.amax(combined_i16)) > INT16_MAX: # Report maximum strength in mV - max_strength = max_strength_i16 * self.output_limits[output_channel] / INT16_MAX + max_strength = max_strength_i16 * self.gradient_out_limits[output_channel] / INT16_MAX msg = f"Amplitude of combined gradient and shim waveforms {max_strength} exceed max gradient amplitude" raise ValueError(msg) @@ -690,10 +651,11 @@ def _calculate_gradient( # -------- Private validation methods -------- # - def _check_amplitude(self, idx: int, value: float, limit: float) -> None: + def _check_gradient_amplitude(self, idx: int, rel_value: float) -> None: """Raise error if amplitude exceeds output limit.""" - if value > limit: - msg = f"Amplitude of channel {idx} ({value}) exceeded output limit ({limit}))" + limit = self.gradient_out_limits[idx] + if np.abs(rel_value) > 1.: + msg = f"Amplitude of gradient channel {idx+1} ({rel_value*limit}) exceeded output limit ({limit}))" raise ValueError(msg) def _check_parameter(self, parameter: AcquisitionParameter) -> None: diff --git a/src/console/spcm_control/acquisition_control.py b/src/console/spcm_control/acquisition_control.py index 1dff8a79..2e5fa06f 100644 --- a/src/console/spcm_control/acquisition_control.py +++ b/src/console/spcm_control/acquisition_control.py @@ -9,6 +9,7 @@ from datetime import datetime from pathlib import Path +import matplotlib as mpl import numpy as np from console.interfaces.acquisition_data import AcquisitionData @@ -20,6 +21,7 @@ from console.spcm_control.rx_device import RxCard from console.spcm_control.tx_device import TxCard from console.utilities.load_configuration import load_nexus_config +from console.utilities.plot import plot_unrolled_sequence LOG_LEVELS = [ logging.DEBUG, @@ -72,13 +74,16 @@ def __init__( self.log.info("--- Acquisition control started\n") # Load device configuration and create instances + # TODO: Generalize Nexus configuration to allow different setups self.config: NexusConfiguration = load_nexus_config(configuration_file) # Create sequence provider instance self.seq_provider: SequenceProvider = SequenceProvider( gradient_efficiency=self.config.tx.gradient_efficiency, gpa_gain=self.config.tx.gpa_gain, - high_impedance=[not val for val in self.config.tx.channel_terminated_50ohm], - output_limits=self.config.tx.channel_max_amplitude, + gradients_50ohms=self.config.tx.gradients_terminated_50ohm, + rf_50ohms=self.config.tx.rf_terminated_50ohm, + gradient_output_limits=self.config.tx.channel_max_amplitude[1:], + rf_output_limit=self.config.tx.channel_max_amplitude[0], spcm_dwell_time=1 / (self.config.tx.sampling_rate * 1e6), rf_to_mvolt=self.config.tx.rf_to_mvolt, system_limits=self.config.system, @@ -239,7 +244,8 @@ def run(self, store_unprocessed: bool = False) -> AcquisitionData: # Set gradient offset values self.tx_card.set_gradient_offsets( - self.sequence.parameter.gradient_offset, self.seq_provider.high_impedance[1:] + offsets=self.sequence.parameter.gradient_offset, + is_50ohms=self.config.tx.gradients_terminated_50ohm, ) for k in range(self.sequence.parameter.num_averages): @@ -289,7 +295,10 @@ def run(self, store_unprocessed: bool = False) -> AcquisitionData: time.sleep(self.sequence.parameter.averaging_delay) # Reset gradient offset values - self.tx_card.set_gradient_offsets(Dimensions(x=0, y=0, z=0), self.seq_provider.high_impedance[1:]) + self.tx_card.set_gradient_offsets( + offsets=Dimensions(x=0, y=0, z=0), + is_50ohms=self.config.tx.gradients_terminated_50ohm, + ) if len(self.receive_data) > 0: self.log.debug(f"Total number of ADC events: {len(self.receive_data)}") @@ -341,3 +350,13 @@ def post_processing(self, parameter: AcquisitionParameter) -> None: with ThreadPoolExecutor() as executor: executor.map(lambda rx_obj: rx_obj.process_data(store_unprocessed=self.store_unprocessed) , self.receive_data) + + def plot_waveforms( + self, + time_range: tuple[float, float], + ) -> tuple[mpl.figure.Figure, np.ndarray] | None: + """Plot internally stored waveforms.""" + if self.sequence is not None: + return plot_unrolled_sequence(self.sequence, time_range=time_range) + self.log.warning("No sequence to plot. Set sequence first.") + return None diff --git a/src/console/spcm_control/rx_device.py b/src/console/spcm_control/rx_device.py index 0417cb7a..5ab44be5 100644 --- a/src/console/spcm_control/rx_device.py +++ b/src/console/spcm_control/rx_device.py @@ -56,9 +56,9 @@ def __init__( self, path: str, sample_rate: int, - channel_enable: list[bool], - max_amplitude: list[int], - impedance_50_ohms: list[bool], + channel_enable: tuple[bool, ...], + max_amplitude: tuple[int, ...], + impedance_50_ohms: tuple[bool, ...], ) -> None: """Execute after init function to do further class setup.""" self.log = logging.getLogger(self.__name__) diff --git a/src/console/spcm_control/tx_device.py b/src/console/spcm_control/tx_device.py index edc5f2d9..175548d2 100644 --- a/src/console/spcm_control/tx_device.py +++ b/src/console/spcm_control/tx_device.py @@ -36,8 +36,8 @@ class TxCard(SpectrumDevice): def __init__( self, path: str, - max_amplitude: list[int], - filter_type: list[int], + max_amplitude: tuple[int, int, int, int], + filter_type: tuple[int, int, int, int], sample_rate: int, ) -> None: self.log = logging.getLogger(self.__name__) @@ -184,15 +184,16 @@ class attribute is overwritten. )) self.log.debug("Device setup completed") - # _ = self.get_status() - def set_gradient_offsets(self, offsets: Dimensions, high_impedance: list[bool] = [True, True, True]) -> None: + def set_gradient_offsets(self, offsets: Dimensions, is_50ohms: bool = False) -> None: """Set offset values of the gradient output channels. Parameters ---------- offsets - Offset values as Dimension datatype as defined in acquisition parameter + Offset values given by Dimensions interface with integers in mV + is_50ohms + Boolean flag indicating if the gradient outputs are terminated into 50 ohms or into high impedance. Returns ------- @@ -214,18 +215,14 @@ def set_gradient_offsets(self, offsets: Dimensions, high_impedance: list[bool] = if abs(offsets.z) > self.max_amplitude[3]: self.log.error("Gradient offset of channel z exceeds maximum amplitude.") - # Extract per channel flag for high impedance termination - try: - x_high_imp, y_high_imp, z_high_imp = high_impedance - except ValueError as err: - # Raise error if list does not have 3 values to unpack - self.log.exception(err, exc_info=True) - raise err + # If the outputs are terminated with high impedance, only half of the offsets need to be set, + # as the value at the card output automatically doubles. + z_scaling = 1. if is_50ohms else 0.5 # Set offset values, scale offset by 0.5 if channel is terminated into high impedance - spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_OFFS1, int(offsets.x / 2) if x_high_imp else int(offsets.x)) - spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_OFFS2, int(offsets.y / 2) if y_high_imp else int(offsets.y)) - spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_OFFS3, int(offsets.z / 2) if z_high_imp else int(offsets.z)) + spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_OFFS1, int(offsets.x * z_scaling)) + spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_OFFS2, int(offsets.y * z_scaling)) + spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_OFFS3, int(offsets.z * z_scaling)) # Write setup spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_M2CMD, spcm.M2CMD_CARD_WRITESETUP) diff --git a/src/console/utilities/plot.py b/src/console/utilities/plot.py index 7146bdda..3db74f33 100644 --- a/src/console/utilities/plot.py +++ b/src/console/utilities/plot.py @@ -1,9 +1,81 @@ """Plotting methods.""" +import matplotlib as mpl import matplotlib.pyplot as plt import numpy as np +from console.interfaces.unrolled_sequence import UnrolledSequence -def plot_slices(img: np.ndarray, vmin: float | None = None, vmax: float | None = None): + +def plot_unrolled_sequence( + sequence: UnrolledSequence, + time_range: tuple[float, float] = (0, -1), +) -> tuple[mpl.figure.Figure, np.ndarray]: + """Plot unrolled waveforms for replay. + + Parameters + ---------- + sequence + The unrolled/calculated sequence to be plotted. + time_range + Specify the time range of the plot in seconds. + If the second value is smaller then the first or -1, the whole sequence is plotted. + + Returns + ------- + Matplotlib figure and axis + """ + fig, axis = plt.subplots(5, 1, figsize=(16, 9)) + spcm_freq = 1 / sequence.dwell_time + + if (seq := sequence.seq).size == 0: + msg = "No unrolled sequence. Execute `unroll_sequence(...)` first" + raise RuntimeError(msg) + + seq_start = int(time_range[0] * spcm_freq) + seq_end = int(time_range[1] * spcm_freq) if time_range[1] > time_range[0] else -1 + samples = np.arange(len(seq) // 4, dtype=float)[seq_start:seq_end] * sequence.dwell_time * 1e3 + + rf_signal = seq[0::4][seq_start:seq_end] + gx_signal = seq[1::4][seq_start:seq_end] + gy_signal = seq[2::4][seq_start:seq_end] + gz_signal = seq[3::4][seq_start:seq_end] + + # Get digital signals + adc_gate = gx_signal.astype(np.uint16) >> 15 + unblanking = gz_signal.astype(np.uint16) >> 15 + + # Get gradient waveforms + rf_signal = rf_signal / np.abs(np.iinfo(np.int16).min) + gx_signal = np.array((np.uint16(gx_signal) << 1).astype(np.int16) / 2**15) + gy_signal = np.array((np.uint16(gy_signal) << 1).astype(np.int16) / 2**15) + gz_signal = np.array((np.uint16(gz_signal) << 1).astype(np.int16) / 2**15) + + axis[0].plot(samples, sequence.rf_output_limit * rf_signal) + axis[1].plot(samples, sequence.gradient_output_limits[0] * gx_signal) + axis[2].plot(samples, sequence.gradient_output_limits[1] * gy_signal) + axis[3].plot(samples, sequence.gradient_output_limits[2] * gz_signal) + axis[4].plot(samples, adc_gate, label="ADC gate") + axis[4].plot(samples, unblanking, label="RF unblanking") + + axis[0].set_ylabel("RF [mV]") + axis[1].set_ylabel("Gx [mV]") + axis[2].set_ylabel("Gy [mV]") + axis[3].set_ylabel("Gz [mV]") + axis[4].set_ylabel("Digital") + axis[4].legend(loc="upper right") + + _ = [ax.grid(axis="x") for ax in axis] + + axis[4].set_xlabel("Time [ms]") + + return fig, axis + + +def plot_slices( + img: np.ndarray, + vmin: float | None = None, + vmax: float | None = None, +) -> tuple[mpl.figure.Figure, np.ndarray]: """Return sliced plot of 3D image data.""" num_slices = img.shape[0] num_cols = int(np.ceil(np.sqrt(num_slices))) diff --git a/tests/conftest.py b/tests/conftest.py index fa626b3c..1c39f822 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -21,12 +21,14 @@ def seq_provider() -> SequenceProvider: """Construct default sequence provider as fixture for testing.""" system_limits: SystemLimits = load_system_limits(Path("examples/example_device_config.yaml")) return SequenceProvider( - gradient_efficiency=[0.4, 0.4, 0.4], - gpa_gain=[1.0, 1.0, 1.0], - output_limits=[200, 6000, 6000, 6000], - spcm_dwell_time=5e-8, + gradient_efficiency=(0.4, 0.4, 0.4), + gpa_gain=(1.0, 1.0, 1.0), + gradient_output_limits=(6000, 6000, 6000), + gradients_50ohms=False, + rf_output_limit=200, + rf_50ohms=True, rf_to_mvolt=5e-3, - high_impedance=[False, True, True, True], + spcm_dwell_time=5e-8, system_limits=system_limits, ) diff --git a/tests/sequence_tests/test_sequence_provider.py b/tests/sequence_tests/test_sequence_provider.py index 7f665a69..78455265 100644 --- a/tests/sequence_tests/test_sequence_provider.py +++ b/tests/sequence_tests/test_sequence_provider.py @@ -67,7 +67,7 @@ def test_dict_contains_basic_config(seq_provider: SequenceProvider): # Values agree with constructor np.testing.assert_approx_equal(d["spcm_freq"], 1 / seq_provider.spcm_dwell_time) assert d["rf_to_mvolt"] == seq_provider.rf_to_mvolt - assert d["output_limits"] == seq_provider.output_limits + assert d["output_limits"] == seq_provider.gradient_out_limits def test_from_pypulseq_system_limit_violation(seq_provider: SequenceProvider, test_sequence): @@ -174,7 +174,7 @@ def test_calculate_arbitrary_gradient_block(seq_provider: SequenceProvider): # Test exceptions with pytest.raises(ValueError): _ = seq_provider._calculate_gradient( - block=block, fov_scaling=1., offset=seq_provider.output_limits[1], output_channel=1, + block=block, fov_scaling=1., offset=seq_provider.gradient_out_limits[1], output_channel=1, ) @@ -200,7 +200,7 @@ def test_calculate_trapezoid_gradient_block(seq_provider: SequenceProvider): # Test exceptions with pytest.raises(ValueError): _ = seq_provider._calculate_gradient( - block=block, fov_scaling=1., offset=seq_provider.output_limits[1], output_channel=1, + block=block, fov_scaling=1., offset=seq_provider.gradient_out_limits[1], output_channel=1, ) diff --git a/tests/utilities_tests/test_sequence_plot.py b/tests/utilities_tests/test_sequence_plot.py new file mode 100644 index 00000000..a8ce99a6 --- /dev/null +++ b/tests/utilities_tests/test_sequence_plot.py @@ -0,0 +1,27 @@ +"""Testing plot function for unrolled sequences.""" +import matplotlib as mpl +import numpy as np +from pypulseq import Sequence + +from console.interfaces.acquisition_parameter import AcquisitionParameter +from console.interfaces.unrolled_sequence import UnrolledSequence +from console.pulseq_interpreter.sequence_provider import SequenceProvider +from console.utilities.plot import plot_unrolled_sequence + +NUM_SUBPLOTS = 5 + +def test_unrolled_sequence_plot( + seq_provider: SequenceProvider, + test_sequence: Sequence, + acquisition_parameter: AcquisitionParameter, +) -> None: + """Test unrolled sequence plot.""" + assert test_sequence.check_timing()[0] + + seq_provider.from_pypulseq(test_sequence) + unrolled_seq: UnrolledSequence = seq_provider.unroll_sequence(acquisition_parameter) + + fig, ax = plot_unrolled_sequence(unrolled_seq) + assert isinstance(fig, mpl.figure.Figure) + assert isinstance(ax, np.ndarray) + assert len(ax) == NUM_SUBPLOTS