From 46345d044d6906945ca2373f22c41778f51e334b Mon Sep 17 00:00:00 2001 From: David Schote Date: Tue, 3 Feb 2026 12:11:24 +0100 Subject: [PATCH 01/12] Extended attributes of unrolled sequence --- src/console/interfaces/unrolled_sequence.py | 6 ++ .../pulseq_interpreter/sequence_provider.py | 68 +++---------------- 2 files changed, 14 insertions(+), 60 deletions(-) diff --git a/src/console/interfaces/unrolled_sequence.py b/src/console/interfaces/unrolled_sequence.py index c13bab21..fed89dc4 100644 --- a/src/console/interfaces/unrolled_sequence.py +++ b/src/console/interfaces/unrolled_sequence.py @@ -44,6 +44,12 @@ class UnrolledSequence: """If sequence values are given as float values, they can be interpreted as output voltage [mV] directly. This conversion factor represents the scaling from original pulseq RF values [Hz] to card output voltage.""" + output_limits: list[int] + """Limit of each output channel in mV.""" + + impedance_scaling: list[float] + """Waveform scaling based on impedance, the channel is terminated in.""" + dwell_time: float """Dwell time of the spectrum card replay data (unrolled sequence). Defines the distance in time between to sample points. diff --git a/src/console/pulseq_interpreter/sequence_provider.py b/src/console/pulseq_interpreter/sequence_provider.py index de14e90f..0ae9e5e2 100644 --- a/src/console/pulseq_interpreter/sequence_provider.py +++ b/src/console/pulseq_interpreter/sequence_provider.py @@ -68,12 +68,16 @@ def __init__( Parameters ---------- - output_limits - Output limit per channel in mV, includes both, RF and gradients, e.g. [200, 6000, 6000, 6000] gradient_efficiency Efficiency of the gradient coils in mT/m/A, e.g. [0.4e-3, 0.4e-3, 0.4e-3] gpa_gain Gain factor of the GPA per gradient channel, e.g. [4.7, 4.7, 4.7] + high_impedance + Flag which indicates if the output is terminated into high impedance or not. + If terminated into high impedance, the card output doubles, + what needs to be considered when calculating the sequence. + output_limits + Output limit per channel in mV, includes both, RF and gradients, e.g. [200, 6000, 6000, 6000] system_limits Maximum system limits spcm_dwell_time, optional @@ -453,70 +457,14 @@ def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence: gradient_efficiency=self.grad_eff, rf_to_mvolt=self.rf_to_mvolt, dwell_time=self.spcm_dwell_time, + output_limits=self.output_limits, + impedance_scaling=self.imp_scaling, duration=self.duration()[0], adc_count=adc_count, parameter=parameter, rx_data=_rx_data, ) - def plot_unrolled(self, time_range: tuple[float, float] = (0, -1)) -> tuple[mpl.figure.Figure, np.ndarray]: - """Plot unrolled waveforms for replay. - - Parameters - ---------- - time_range, default = (0, -1) - Specify the time range of the plot in seconds. - If the second value is smaller then the first or -1, the whole sequence is plotted. - - Returns - ------- - Matplotlib figure and axis - """ - fig, axis = plt.subplots(5, 1, figsize=(16, 9)) - - if (sqnc := self._sqnc_cache) is None: - msg = "No unrolled sequence. Execute `unroll_sequence(...)` first" - raise RuntimeError(msg) - - seq_start = int(time_range[0] * self.spcm_freq) - seq_end = int(time_range[1] * self.spcm_freq) if time_range[1] > time_range[0] else -1 - samples = np.arange(len(sqnc) // 4, dtype=float)[seq_start:seq_end] * self.spcm_dwell_time * 1e3 - - rf_signal = sqnc[0::4][seq_start:seq_end] - gx_signal = sqnc[1::4][seq_start:seq_end] - gy_signal = sqnc[2::4][seq_start:seq_end] - gz_signal = sqnc[3::4][seq_start:seq_end] - - # Get digital signals - adc_gate = gx_signal.astype(np.uint16) >> 15 - unblanking = gz_signal.astype(np.uint16) >> 15 - - # Get gradient waveforms - rf_signal = rf_signal / np.abs(np.iinfo(np.int16).min) - gx_signal = np.array((np.uint16(gx_signal) << 1).astype(np.int16) / 2**15) - gy_signal = np.array((np.uint16(gy_signal) << 1).astype(np.int16) / 2**15) - gz_signal = np.array((np.uint16(gz_signal) << 1).astype(np.int16) / 2**15) - - axis[0].plot(samples, self.output_limits[0] * rf_signal / self.imp_scaling[0]) - axis[1].plot(samples, self.output_limits[1] * gx_signal / self.imp_scaling[1]) - axis[2].plot(samples, self.output_limits[2] * gy_signal / self.imp_scaling[2]) - axis[3].plot(samples, self.output_limits[3] * gz_signal / self.imp_scaling[3]) - axis[4].plot(samples, adc_gate, label="ADC gate") - axis[4].plot(samples, unblanking, label="RF unblanking") - - axis[0].set_ylabel("RF [mV]") - axis[1].set_ylabel("Gx [mV]") - axis[2].set_ylabel("Gy [mV]") - axis[3].set_ylabel("Gz [mV]") - axis[4].set_ylabel("Digital") - axis[4].legend(loc="upper right") - - _ = [ax.grid(axis="x") for ax in axis] - - axis[4].set_xlabel("Time [ms]") - - return fig, axis - # -------- Private waveform calculation functions -------- # @profile From 28e5d16c0837f57aa7a3609b248e5c9bb85c1527 Mon Sep 17 00:00:00 2001 From: David Schote Date: Tue, 3 Feb 2026 12:11:39 +0100 Subject: [PATCH 02/12] Moved sequence plot function to utilities/plot --- src/console/utilities/plot.py | 62 +++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/src/console/utilities/plot.py b/src/console/utilities/plot.py index 7146bdda..5681e833 100644 --- a/src/console/utilities/plot.py +++ b/src/console/utilities/plot.py @@ -1,6 +1,68 @@ """Plotting methods.""" +import matplotlib as mpl import matplotlib.pyplot as plt import numpy as np +from console.interfaces.unrolled_sequence import UnrolledSequence + + +def plot_unrolled(sequence: UnrolledSequence, time_range: tuple[float, float] = (0, -1)) -> tuple[mpl.figure.Figure, np.ndarray]: + """Plot unrolled waveforms for replay. + + Parameters + ---------- + time_range, default = (0, -1) + Specify the time range of the plot in seconds. + If the second value is smaller then the first or -1, the whole sequence is plotted. + + Returns + ------- + Matplotlib figure and axis + """ + fig, axis = plt.subplots(5, 1, figsize=(16, 9)) + spcm_freq = 1 / sequence.dwell_time + + if (seq := sequence.seq).size == 0: + msg = "No unrolled sequence. Execute `unroll_sequence(...)` first" + raise RuntimeError(msg) + + seq_start = int(time_range[0] * spcm_freq) + seq_end = int(time_range[1] * spcm_freq) if time_range[1] > time_range[0] else -1 + samples = np.arange(len(seq) // 4, dtype=float)[seq_start:seq_end] * sequence.dwell_time * 1e3 + + rf_signal = seq[0::4][seq_start:seq_end] + gx_signal = seq[1::4][seq_start:seq_end] + gy_signal = seq[2::4][seq_start:seq_end] + gz_signal = seq[3::4][seq_start:seq_end] + + # Get digital signals + adc_gate = gx_signal.astype(np.uint16) >> 15 + unblanking = gz_signal.astype(np.uint16) >> 15 + + # Get gradient waveforms + rf_signal = rf_signal / np.abs(np.iinfo(np.int16).min) + gx_signal = np.array((np.uint16(gx_signal) << 1).astype(np.int16) / 2**15) + gy_signal = np.array((np.uint16(gy_signal) << 1).astype(np.int16) / 2**15) + gz_signal = np.array((np.uint16(gz_signal) << 1).astype(np.int16) / 2**15) + + axis[0].plot(samples, sequence.output_limits[0] * rf_signal / sequence.imp_scaling[0]) + axis[1].plot(samples, sequence.output_limits[1] * gx_signal / sequence.imp_scaling[1]) + axis[2].plot(samples, sequence.output_limits[2] * gy_signal / sequence.imp_scaling[2]) + axis[3].plot(samples, sequence.output_limits[3] * gz_signal / sequence.imp_scaling[3]) + axis[4].plot(samples, adc_gate, label="ADC gate") + axis[4].plot(samples, unblanking, label="RF unblanking") + + axis[0].set_ylabel("RF [mV]") + axis[1].set_ylabel("Gx [mV]") + axis[2].set_ylabel("Gy [mV]") + axis[3].set_ylabel("Gz [mV]") + axis[4].set_ylabel("Digital") + axis[4].legend(loc="upper right") + + _ = [ax.grid(axis="x") for ax in axis] + + axis[4].set_xlabel("Time [ms]") + + return fig, axis def plot_slices(img: np.ndarray, vmin: float | None = None, vmax: float | None = None): From b14b4528301ef3f63333263d030ad0fa9812e724 Mon Sep 17 00:00:00 2001 From: David Schote Date: Tue, 3 Feb 2026 12:13:11 +0100 Subject: [PATCH 03/12] Updated docs, added return type --- src/console/utilities/plot.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/console/utilities/plot.py b/src/console/utilities/plot.py index 5681e833..1bb3e200 100644 --- a/src/console/utilities/plot.py +++ b/src/console/utilities/plot.py @@ -2,14 +2,20 @@ import matplotlib as mpl import matplotlib.pyplot as plt import numpy as np + from console.interfaces.unrolled_sequence import UnrolledSequence -def plot_unrolled(sequence: UnrolledSequence, time_range: tuple[float, float] = (0, -1)) -> tuple[mpl.figure.Figure, np.ndarray]: +def plot_unrolled( + sequence: UnrolledSequence, + time_range: tuple[float, float] = (0, -1), +) -> tuple[mpl.figure.Figure, np.ndarray]: """Plot unrolled waveforms for replay. Parameters ---------- + sequence + The unrolled/calculated sequence to be plotted. time_range, default = (0, -1) Specify the time range of the plot in seconds. If the second value is smaller then the first or -1, the whole sequence is plotted. @@ -65,7 +71,11 @@ def plot_unrolled(sequence: UnrolledSequence, time_range: tuple[float, float] = return fig, axis -def plot_slices(img: np.ndarray, vmin: float | None = None, vmax: float | None = None): +def plot_slices( + img: np.ndarray, + vmin: float | None = None, + vmax: float | None = None, +) -> tuple[mpl.figure.Figure, np.ndarray]: """Return sliced plot of 3D image data.""" num_slices = img.shape[0] num_cols = int(np.ceil(np.sqrt(num_slices))) From 2a31e03ae722195c84c16eeffb6b7ea7cd7e3db7 Mon Sep 17 00:00:00 2001 From: David Schote Date: Tue, 3 Feb 2026 12:16:02 +0100 Subject: [PATCH 04/12] Renamed function to plot sequence --- src/console/utilities/plot.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/console/utilities/plot.py b/src/console/utilities/plot.py index 1bb3e200..bdb36615 100644 --- a/src/console/utilities/plot.py +++ b/src/console/utilities/plot.py @@ -6,7 +6,7 @@ from console.interfaces.unrolled_sequence import UnrolledSequence -def plot_unrolled( +def plot_unrolled_sequence( sequence: UnrolledSequence, time_range: tuple[float, float] = (0, -1), ) -> tuple[mpl.figure.Figure, np.ndarray]: @@ -50,10 +50,10 @@ def plot_unrolled( gy_signal = np.array((np.uint16(gy_signal) << 1).astype(np.int16) / 2**15) gz_signal = np.array((np.uint16(gz_signal) << 1).astype(np.int16) / 2**15) - axis[0].plot(samples, sequence.output_limits[0] * rf_signal / sequence.imp_scaling[0]) - axis[1].plot(samples, sequence.output_limits[1] * gx_signal / sequence.imp_scaling[1]) - axis[2].plot(samples, sequence.output_limits[2] * gy_signal / sequence.imp_scaling[2]) - axis[3].plot(samples, sequence.output_limits[3] * gz_signal / sequence.imp_scaling[3]) + axis[0].plot(samples, sequence.output_limits[0] * rf_signal / sequence.impedance_scaling[0]) + axis[1].plot(samples, sequence.output_limits[1] * gx_signal / sequence.impedance_scaling[1]) + axis[2].plot(samples, sequence.output_limits[2] * gy_signal / sequence.impedance_scaling[2]) + axis[3].plot(samples, sequence.output_limits[3] * gz_signal / sequence.impedance_scaling[3]) axis[4].plot(samples, adc_gate, label="ADC gate") axis[4].plot(samples, unblanking, label="RF unblanking") From cb167f1b527d76e989095ed87f2ef3ddde6bcb7d Mon Sep 17 00:00:00 2001 From: David Schote Date: Tue, 3 Feb 2026 12:32:07 +0100 Subject: [PATCH 05/12] Removed cached sequence from sequence provider --- src/console/pulseq_interpreter/sequence_provider.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/console/pulseq_interpreter/sequence_provider.py b/src/console/pulseq_interpreter/sequence_provider.py index 0ae9e5e2..a121284d 100644 --- a/src/console/pulseq_interpreter/sequence_provider.py +++ b/src/console/pulseq_interpreter/sequence_provider.py @@ -109,9 +109,6 @@ def __init__( self.imp_scaling = [0.5 if z else 1 for z in high_impedance] self.output_limits: list[int] = output_limits if output_limits is not None else [] - # Initialize larmor frequency and sequence cache variables, to be set later - self._sqnc_cache: np.ndarray | None = None - # -------- PyPulseq interface -------- # def from_pypulseq(self, seq: Sequence) -> None: @@ -280,7 +277,6 @@ def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence: self.log.exception("Checks not passed") raise - self._sqnc_cache = None gradient_index: Dimensions = parameter.channel_assignment # Get list of all events and list of unique RF and ADC events, since they are frequently reused @@ -447,9 +443,6 @@ def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence: len(block_durations), ) - # Save unrolled sequence in class - self._sqnc_cache = _seq - return UnrolledSequence( seq=_seq, sample_count=seq_samples, From ac001ebc522581a77fb361aca8eaa6fe1c88727a Mon Sep 17 00:00:00 2001 From: David Schote Date: Tue, 3 Feb 2026 12:33:37 +0100 Subject: [PATCH 06/12] Fixed linter findings --- src/console/pulseq_interpreter/sequence_provider.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/console/pulseq_interpreter/sequence_provider.py b/src/console/pulseq_interpreter/sequence_provider.py index a121284d..f241b859 100644 --- a/src/console/pulseq_interpreter/sequence_provider.py +++ b/src/console/pulseq_interpreter/sequence_provider.py @@ -5,8 +5,6 @@ from types import SimpleNamespace from typing import Any -import matplotlib as mpl -import matplotlib.pyplot as plt import numpy as np from pypulseq.opts import Opts from pypulseq.Sequence.sequence import Sequence From 88af4e0b3d65abaedfda67bd6a36c853166ae897 Mon Sep 17 00:00:00 2001 From: David Schote Date: Wed, 4 Feb 2026 12:01:26 +0100 Subject: [PATCH 07/12] Updated sequence provider to use tuple typing, simplified impedance based scaling logic --- src/console/interfaces/unrolled_sequence.py | 16 ++- .../pulseq_interpreter/sequence_provider.py | 129 +++++++++++------- 2 files changed, 85 insertions(+), 60 deletions(-) diff --git a/src/console/interfaces/unrolled_sequence.py b/src/console/interfaces/unrolled_sequence.py index fed89dc4..15e180f6 100644 --- a/src/console/interfaces/unrolled_sequence.py +++ b/src/console/interfaces/unrolled_sequence.py @@ -28,27 +28,29 @@ class UnrolledSequence: rx_data: list[RxData] """List containing the data and metadata of all receive events""" - gpa_gain: list[float] + gpa_gain: tuple[float, float, float] """The gradient waveforms in pulseq are defined in Hz/m. The translation to mV is calculated by 1e3 / (gyro * gpa_gain * grad_efficiency). The gpa gain is given in V/A and accounts for the voltage required to generate an output of 1A. The gyromagnetic ratio defined by 42.58e6 MHz/T.""" - gradient_efficiency: list[float] + gradient_efficiency: tuple[float, float, float] """The gradient waveforms in pulseq are defined in Hz/m. The translation to mV is calculated by 1e3 / (gyro * gpa_gain * grad_efficiency). The gradient efficiency is given in mT/m/A and accounts for the gradient field which is generated per 1A. The gyromagnetic ratio defined by 42.58e6 MHz/T.""" + gradient_output_limits: tuple[int, int, int] + """Integer limits for each gradient output channel in mV. Gradient output limits already contain scaling + from channel termination into high impedance or 50 ohms respectively.""" + rf_to_mvolt: float """If sequence values are given as float values, they can be interpreted as output voltage [mV] directly. This conversion factor represents the scaling from original pulseq RF values [Hz] to card output voltage.""" - output_limits: list[int] - """Limit of each output channel in mV.""" - - impedance_scaling: list[float] - """Waveform scaling based on impedance, the channel is terminated in.""" + rf_output_limit: int + """Integer limit of rf output channel in mV. RF output limits already contains scaling + from channel termination into high impedance or 50 ohms respectively.""" dwell_time: float """Dwell time of the spectrum card replay data (unrolled sequence). diff --git a/src/console/pulseq_interpreter/sequence_provider.py b/src/console/pulseq_interpreter/sequence_provider.py index f241b859..fe10ae00 100644 --- a/src/console/pulseq_interpreter/sequence_provider.py +++ b/src/console/pulseq_interpreter/sequence_provider.py @@ -54,34 +54,41 @@ class SequenceProvider(Sequence): def __init__( self, - gradient_efficiency: list[float], - gpa_gain: list[float], - high_impedance: list[bool], - output_limits: list[int], + gradient_efficiency: tuple[float, float, float], + gpa_gain: tuple[float, float, float], + gradient_output_limits: tuple[int, int, int], + gradients_50ohms: bool, + rf_output_limit: int, + rf_50ohms: bool, + rf_to_mvolt: float, + spcm_dwell_time: float, system_limits: SystemLimits, - spcm_dwell_time: float = 1 / 20e6, - rf_to_mvolt: float = 1., ): """Initialize sequence provider class which is used to unroll a pulseq sequence. Parameters ---------- gradient_efficiency - Efficiency of the gradient coils in mT/m/A, e.g. [0.4e-3, 0.4e-3, 0.4e-3] + Efficiency of the gradient coils in mT/m/A, e.g. [0.4e-3, 0.4e-3, 0.4e-3]. gpa_gain - Gain factor of the GPA per gradient channel, e.g. [4.7, 4.7, 4.7] - high_impedance - Flag which indicates if the output is terminated into high impedance or not. + Gain factor of the GPA per gradient channel, e.g. [4.7, 4.7, 4.7]. + gradient_output_limits + Integer output limit per gradient channel in mV, e.g. [6000, 6000, 6000]. + gradients_50ohms + Boolean flag which indicates if the gradient output is terminated into 50 ohms or high impedance. If terminated into high impedance, the card output doubles, what needs to be considered when calculating the sequence. - output_limits - Output limit per channel in mV, includes both, RF and gradients, e.g. [200, 6000, 6000, 6000] - system_limits - Maximum system limits - spcm_dwell_time, optional - Sampling time raster of the output waveform (depends on spectrum card), by default 1/20e6 + rf_output_limit + Integer output limit of the RF channel in mV. + rf_50ohms + Boolean flag which indicates if the rf output is terminated into 50 ohms (see gradients_50ohms). rf_to_mvolt, optional - Translation of RF waveform from pulseq (Hz) to mV, by default 1 + Translation of RF waveform from pulseq (Hz) to mV. + spcm_dwell_time, optional + Sampling time raster of the output waveform (depends on spectrum card). + system_limits + Absolute maximum system limits defined in the device configuration. + Used to instantiate the pypulseq `Opts()` class. """ super().__init__( system=Opts( @@ -89,7 +96,7 @@ def __init__( B0=50e-3, grad_unit="Hz/m", # system limit is defined in this units slew_unit="Hz/m/s", # system limit is defined in this units - ) + ), ) self.log = logging.getLogger("SeqProv") @@ -98,14 +105,19 @@ def __init__( self.spcm_dwell_time = spcm_dwell_time self.spcm_freq = 1 / spcm_dwell_time self.system_limits = system_limits - self.high_impedance = high_impedance - self.gpa_gain: list[float] = gpa_gain - self.grad_eff: list[float] = gradient_efficiency - - # Set impedance scaling factor, 0.5 if impedance is high, 1 if impedance is 50 ohms - # Halve RF scaling factor if impedance is high, because the card output doubles for high impedance - self.imp_scaling = [0.5 if z else 1 for z in high_impedance] - self.output_limits: list[int] = output_limits if output_limits is not None else [] + self.gpa_gain = gpa_gain + self.grad_eff = gradient_efficiency + + # Scale output limit dependent on high impedance flags: + # If output is terminated into high impedance (flag is true), the channel output is doubled. + # Otherwise, if output is terminated into 50 ohms impedance, output limit remains unchanged. + self.rf_out_limit = rf_output_limit if rf_50ohms else int(2 * rf_output_limit) + # Ensure tuple[int, int, int] for mypy typing + self.gradient_out_limits = ( + gradient_output_limits[0] if gradients_50ohms else int(2 * gradient_output_limits[0]), + gradient_output_limits[1] if gradients_50ohms else int(2 * gradient_output_limits[1]), + gradient_output_limits[2] if gradients_50ohms else int(2 * gradient_output_limits[2]), + ) # -------- PyPulseq interface -------- # @@ -185,7 +197,7 @@ def dict(self) -> dict: "spcm_dwell_time": self.spcm_dwell_time, "gpa_gain": self.gpa_gain, "gradient_efficiency": self.grad_eff, - "output_limits": self.output_limits, + "output_limits": self.gradient_out_limits, } def get_adc_events(self) -> list: @@ -226,6 +238,8 @@ def get_rf_events(self) -> list: def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence: """Unroll the pypulseq sequence description. + TODO: Reduce complexity. + Parameters ---------- parameter @@ -283,11 +297,13 @@ def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence: rf_events = self.get_rf_events() # Calculate rf pulse and unblanking waveforms from RF event - # Should probably be moved inside of get_rf_events() + # TODO: Should probably be moved inside of get_rf_events() rf_pulses = {} for rf_event in rf_events: rf_pulses[rf_event[0]] = self._calculate_rf( - block=rf_event[1], b1_scaling=parameter.b1_scaling, larmor_frequency=parameter.larmor_frequency, + block=rf_event[1], + b1_scaling=parameter.b1_scaling, + larmor_frequency=parameter.larmor_frequency, ) seq_duration, _, _ = self.duration() @@ -327,7 +343,9 @@ def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence: # must be considered with respect to the target output channel! # Offsets mapping: x -> channel 1, y -> channel 2, z -> channel 3 offset=parameter.gradient_offset.to_list()[int(gradient_index.x-1)], - output_channel=int(gradient_index.x), + # Gradient indexing starts at 1 (RF is channel 0) + # -> correct indexing to match tuple index + output_channel=int(gradient_index.x-1), ) delay = block.gx.delay delay_samples = round(delay * self.spcm_freq) @@ -348,7 +366,9 @@ def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence: # must be considered with respect to the target output channel! # Offsets mapping: x -> channel 1, y -> channel 2, z -> channel 3 offset=parameter.gradient_offset.to_list()[int(gradient_index.y-1)], - output_channel=int(gradient_index.y), + # Gradient indexing starts at 1 (RF is channel 0) + # -> correct indexing to match tuple index + output_channel=int(gradient_index.x-1), ) delay = block.gy.delay delay_samples = round(delay * self.spcm_freq) @@ -369,7 +389,9 @@ def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence: # must be considered with respect to the target output channel! # Offsets mapping: x -> channel 1, y -> channel 2, z -> channel 3 offset=parameter.gradient_offset.to_list()[int(gradient_index.z-1)], - output_channel=int(gradient_index.z), + # Gradient indexing starts at 1 (RF is channel 0) + # -> correct indexing to match tuple index + output_channel=int(gradient_index.x-1), ) delay = block.gz.delay delay_samples = round(delay * self.spcm_freq) @@ -448,8 +470,8 @@ def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence: gradient_efficiency=self.grad_eff, rf_to_mvolt=self.rf_to_mvolt, dwell_time=self.spcm_dwell_time, - output_limits=self.output_limits, - impedance_scaling=self.imp_scaling, + gradient_output_limits=self.gradient_out_limits, + rf_output_limit=self.rf_out_limit, duration=self.duration()[0], adc_count=adc_count, parameter=parameter, @@ -518,8 +540,8 @@ def _calculate_rf( # Perform this step here to save computation time, num. of envelope samples << num. of resampled signal try: # RF scaling according to B1 calibration and "device" (translation from pulseq to output voltage) - rf_scaling = b1_scaling * self.rf_to_mvolt * self.imp_scaling[0] / self.output_limits[0] - if np.abs(np.amax(envelope_scaled := block.signal * phase_offset * rf_scaling)) > 1: + rf_scaling = b1_scaling * self.rf_to_mvolt * phase_offset / self.rf_out_limit + if np.abs(np.amax(envelope_scaled := block.signal * rf_scaling)) > 1: raise ValueError("RF magnitude exceeds output limit.") except ValueError as err: self.log.exception(err, exc_info=True) @@ -573,32 +595,32 @@ def _calculate_gradient( gradient amplitude exceeds channel maximum output level """ try: - # Calculate gradient waveform scaling, subtract gain and efficiency index by 1, - # because these lists do not include the RF channel (i.e. gradient channel 1 corresponds to index 0) - scaling = fov_scaling * self.imp_scaling[output_channel] / ( - self.system.gamma * 1e-3 * self.gpa_gain[output_channel-1] * self.grad_eff[output_channel-1]) + # Calculate gradient waveform scaling + scaling = fov_scaling / ( + self.system.gamma * 1e-3 * self.gpa_gain[output_channel] * self.grad_eff[output_channel] + ) # Calculate the gradient waveform relative to max output (within the interval [0, 1]) if block.type == "grad": # Arbitrary gradient waveform, interpolate linearly # This function requires float input => cast to int16 afterwards - waveform = block.waveform * scaling - self._check_amplitude(output_channel, np.amax(waveform), self.output_limits[output_channel]) + waveform = block.waveform * scaling / self.gradient_out_limits[output_channel] + self._check_gradient_amplitude(output_channel, np.abs(np.amax(waveform))) # Transfer mV floating point waveform values to int16 if amplitude check passed - waveform *= INT16_MAX / self.output_limits[output_channel] + waveform_i16 = waveform * INT16_MAX # Interpolate waveform on spectrum card time raster gradient = np.interp( x=np.linspace(block.tt[0], block.tt[-1], round(block.shape_dur / self.spcm_dwell_time)), xp=block.tt, - fp=waveform, + fp=waveform_i16, ) elif block.type == "trap": # Construct trapezoidal gradient from rise, flat and fall sections - flat_amp = block.amplitude * scaling - self._check_amplitude(output_channel, np.amax(flat_amp), self.output_limits[output_channel]) - # Transfer mV floating point flat amplitude to int16 if amplitude check passed - flat_amp_i16 = flat_amp * INT16_MAX / self.output_limits[output_channel] + flat_amp = block.amplitude * scaling / self.gradient_out_limits[output_channel] + self._check_gradient_amplitude(output_channel, np.abs(np.amax(flat_amp))) + # Transfer relative floating point flat amplitude to int16 if amplitude check passed + flat_amp_i16 = flat_amp * INT16_MAX # Define rise, flat and fall sections of trapezoidal gradient on spectrum card time raster rise = np.linspace(0, flat_amp_i16, round(block.rise_time / self.spcm_dwell_time)) flat = np.full(round(block.flat_time / self.spcm_dwell_time), fill_value=flat_amp_i16) @@ -611,12 +633,12 @@ def _calculate_gradient( # Calculate gradient offset int16 value from mV # Gradient offset is used for calculating output limits but is not added to the waveform - offset_i16 = offset * INT16_MAX / self.output_limits[output_channel] + offset_i16 = offset * INT16_MAX / self.gradient_out_limits[output_channel] # This is the combined int16 gradient and offset waveform as float dtype combined_i16 = gradient + offset_i16 if (max_strength_i16 := np.amax(combined_i16)) > INT16_MAX: # Report maximum strength in mV - max_strength = max_strength_i16 * self.output_limits[output_channel] / INT16_MAX + max_strength = max_strength_i16 * self.gradient_out_limits[output_channel] / INT16_MAX msg = f"Amplitude of combined gradient and shim waveforms {max_strength} exceed max gradient amplitude" raise ValueError(msg) @@ -629,10 +651,11 @@ def _calculate_gradient( # -------- Private validation methods -------- # - def _check_amplitude(self, idx: int, value: float, limit: float) -> None: + def _check_gradient_amplitude(self, idx: int, rel_value: float) -> None: """Raise error if amplitude exceeds output limit.""" - if value > limit: - msg = f"Amplitude of channel {idx} ({value}) exceeded output limit ({limit}))" + limit = self.gradient_out_limits[idx] + if np.abs(rel_value) > 1.: + msg = f"Amplitude of gradient channel {idx+1} ({rel_value*limit}) exceeded output limit ({limit}))" raise ValueError(msg) def _check_parameter(self, parameter: AcquisitionParameter) -> None: From fbeb2f8ff623bbc138751366b0a22b29f0435222 Mon Sep 17 00:00:00 2001 From: David Schote Date: Wed, 4 Feb 2026 12:02:08 +0100 Subject: [PATCH 08/12] Adjusted rx/tx device and acquisition control to new tuple typing --- .../interfaces/device_configuration.py | 100 +++++++----------- .../spcm_control/acquisition_control.py | 7 +- src/console/spcm_control/rx_device.py | 6 +- src/console/spcm_control/tx_device.py | 4 +- 4 files changed, 49 insertions(+), 68 deletions(-) diff --git a/src/console/interfaces/device_configuration.py b/src/console/interfaces/device_configuration.py index f4de561c..1aa82b52 100644 --- a/src/console/interfaces/device_configuration.py +++ b/src/console/interfaces/device_configuration.py @@ -1,9 +1,15 @@ """Implementation of the device configuration models.""" -from pydantic import BaseModel, Field, field_validator, model_validator +from typing import Annotated -# Specify the number of gradient channels -NUM_GRADIENTS = 3 +from pydantic import BaseModel, Field, model_validator +# Ensure that max. amplitude is within 1 and 6000 mV. +# Limits may depend on specific configuration of spectrum cards. +AmplitudeInt = Annotated[int, Field(ge=1, le=6000)] +# Ensure valid filter type, channel filter type must be 0, 1, 2 or 3. +# See spectrum instrumentation manual for reference. +# This may depend on the specific card configuration. +FilterTypeInt = Annotated[int, Field(ge=0, le=3)] class TxConfiguration(BaseModel): """Transmit device configuration.""" @@ -12,50 +18,14 @@ class TxConfiguration(BaseModel): device_path: str = Field(..., strict=True) sampling_rate: int = Field(..., strict=True) - max_available_channels: int = Field(..., strict=True) - channel_max_amplitude: list[int] = Field(..., strict=True) - channel_filter_type: list[int] = Field(..., strict=True) - channel_terminated_50ohm: list[bool] = Field(..., strict=True) - gradient_efficiency: list[float] = Field(..., min_length=NUM_GRADIENTS, max_length=NUM_GRADIENTS, strict=True) - gpa_gain: list[float] = Field(..., min_length=NUM_GRADIENTS, max_length=NUM_GRADIENTS, strict=True) + channel_max_amplitude: tuple[AmplitudeInt, AmplitudeInt, AmplitudeInt, AmplitudeInt] = Field(...) + channel_filter_type: tuple[FilterTypeInt, FilterTypeInt, FilterTypeInt, FilterTypeInt] = Field(...) + rf_terminated_50ohm: bool = Field(..., strict=True) + gradients_terminated_50ohm: bool = Field(..., strict=True) + gradient_efficiency: tuple[float, float, float] = Field(...) + gpa_gain: tuple[float, float, float] = Field(...) rf_to_mvolt: float = Field(..., strict=True) - @model_validator(mode="after") - def validate_lists(self) -> "TxConfiguration": - """Validate the length of device-specific lists.""" - list_fields: dict[str, list[int] | list[bool]] = { - "channel_max_amplitude": self.channel_max_amplitude, - "channel_filter_type": self.channel_filter_type, - "channel_terminated_50ohm": self.channel_terminated_50ohm, - } - errors = [] - for name, val in list_fields.items(): - if len(val) < self.max_available_channels: - _err = f"Field '{name}': \ - length {len(val)} is less than max_available_channels {self.max_available_channels}" - errors.append(_err) - if errors: - raise ValueError("; ".join(errors)) - return self - - @field_validator("channel_max_amplitude") - @classmethod - def validate_max_amplitude(cls, values: list[int]) -> list[int]: - """Ensure that max. amplitude is within 1 and 6000 mV.""" - if not all(val in range(1, 6001) for val in values): - msg = "Channel max amplitude must be between 1 and 6000 mV." - raise ValueError(msg) - return values - - @field_validator("channel_filter_type") - @classmethod - def validate_filter_type(cls, values: list[int]) -> list[int]: - """Ensure that max. amplitude is within 1 and 6000 mV.""" - if not all(val in range(4) for val in values): - msg = "Channel filter type must be 0, 1, 2 or 3." - raise ValueError(msg) - return values - class RxConfiguration(BaseModel): """Receive device configuration.""" @@ -65,26 +35,34 @@ class RxConfiguration(BaseModel): device_path: str = Field(..., strict=True) sampling_rate: int = Field(..., strict=True) max_available_channels: int = Field(..., strict=True) - channel_enable: list[bool] = Field(..., strict=True) - channel_max_amplitude: list[int] = Field(..., strict=True) - channel_terminated_50ohm: list[bool] = Field(..., strict=True) + channel_enable: tuple[bool, ...] = Field(...) + channel_max_amplitude: tuple[AmplitudeInt, ...] = Field(...) + channel_terminated_50ohm: tuple[bool, ...] = Field(...) @model_validator(mode="after") - def validate_lists(self) -> "RxConfiguration": - """Validate the length of device-specific lists.""" - list_fields: dict[str, list[int] | list[bool]] = { - "channel_max_amplitude": self.channel_max_amplitude, - "channel_enable": self.channel_enable, - "channel_terminated_50ohm": self.channel_terminated_50ohm, - } + def validate_channel_lengths(self) -> "RxConfiguration": + """Validate channel lengths. + + This ensures that `max_available_channels` is length of `channel_enable`, + `channel_max_amplitude` and `channel_terminated_50ohm`. + """ + target_len = self.max_available_channels + dependent_fields = [ + "channel_enable", + "channel_max_amplitude", + "channel_terminated_50ohm", + ] errors = [] - for name, val in list_fields.items(): - if len(val) < self.max_available_channels: - _err = f"Field '{name}'\ - length {len(val)} is less than max_available_channels {self.max_available_channels}" - errors.append(_err) + for field_name in dependent_fields: + current_val = getattr(self, field_name) + if len(current_val) != target_len: + errors.append( + f"{field_name} length ({len(current_val)}) " + f"must be exactly {target_len}" + ) if errors: - raise ValueError("; ".join(errors)) + # Raising ValueError here is the standard Pydantic way + raise ValueError(" ; ".join(errors)) return self diff --git a/src/console/spcm_control/acquisition_control.py b/src/console/spcm_control/acquisition_control.py index 1dff8a79..36813cfe 100644 --- a/src/console/spcm_control/acquisition_control.py +++ b/src/console/spcm_control/acquisition_control.py @@ -72,13 +72,16 @@ def __init__( self.log.info("--- Acquisition control started\n") # Load device configuration and create instances + # TODO: Generalize Nexus configuration to allow different setups self.config: NexusConfiguration = load_nexus_config(configuration_file) # Create sequence provider instance self.seq_provider: SequenceProvider = SequenceProvider( gradient_efficiency=self.config.tx.gradient_efficiency, gpa_gain=self.config.tx.gpa_gain, - high_impedance=[not val for val in self.config.tx.channel_terminated_50ohm], - output_limits=self.config.tx.channel_max_amplitude, + gradients_50ohms=self.config.tx.gradients_terminated_50ohm, + rf_50ohms=self.config.tx.rf_terminated_50ohm, + gradient_output_limits=self.config.tx.channel_max_amplitude[1:], + rf_output_limit=self.config.tx.channel_max_amplitude[0], spcm_dwell_time=1 / (self.config.tx.sampling_rate * 1e6), rf_to_mvolt=self.config.tx.rf_to_mvolt, system_limits=self.config.system, diff --git a/src/console/spcm_control/rx_device.py b/src/console/spcm_control/rx_device.py index 0417cb7a..5ab44be5 100644 --- a/src/console/spcm_control/rx_device.py +++ b/src/console/spcm_control/rx_device.py @@ -56,9 +56,9 @@ def __init__( self, path: str, sample_rate: int, - channel_enable: list[bool], - max_amplitude: list[int], - impedance_50_ohms: list[bool], + channel_enable: tuple[bool, ...], + max_amplitude: tuple[int, ...], + impedance_50_ohms: tuple[bool, ...], ) -> None: """Execute after init function to do further class setup.""" self.log = logging.getLogger(self.__name__) diff --git a/src/console/spcm_control/tx_device.py b/src/console/spcm_control/tx_device.py index edc5f2d9..8c67c48f 100644 --- a/src/console/spcm_control/tx_device.py +++ b/src/console/spcm_control/tx_device.py @@ -36,8 +36,8 @@ class TxCard(SpectrumDevice): def __init__( self, path: str, - max_amplitude: list[int], - filter_type: list[int], + max_amplitude: tuple[int, int, int, int], + filter_type: tuple[int, int, int, int], sample_rate: int, ) -> None: self.log = logging.getLogger(self.__name__) From 57833661b5d18ff373dc13ae12a0841df1f3f5c5 Mon Sep 17 00:00:00 2001 From: David Schote Date: Wed, 4 Feb 2026 12:02:27 +0100 Subject: [PATCH 09/12] Adjustment of tests --- examples/example_device_config.yaml | 7 ++++--- tests/conftest.py | 12 +++++++----- tests/sequence_tests/test_sequence_provider.py | 6 +++--- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/examples/example_device_config.yaml b/examples/example_device_config.yaml index ad95cbe7..449cb785 100644 --- a/examples/example_device_config.yaml +++ b/examples/example_device_config.yaml @@ -17,8 +17,9 @@ TxConfiguration: channel_max_amplitude: [200, 6000, 6000, 6000] # Filter configuration of each transmit channel channel_filter_type: [0, 2, 2, 2] - # Configure which channels are terminated into 50 ohm impedance (true == 50 ohm), list length must at least match the number of enabled channels - channel_terminated_50ohm: [True, False, False, False] + # Configure if RF/gradients are terminated into 50 ohm impedance (true == 50 ohm) + rf_terminated_50ohm: True + gradients_terminated_50ohm: False # Calculate grad_to_volt per gradient channel: # Gradient efficiency in T/m/A gradient_efficiency: [0.37e-3, 0.451e-3, 0.4e-3] @@ -33,7 +34,7 @@ RxConfiguration: # Could be extended to a list device_path: "/dev/spcm0" # Number of channels - max_available_channels: 4 + max_available_channels: 8 # Device sampling rate in MHz sampling_rate: 20 # Enable the receive channels diff --git a/tests/conftest.py b/tests/conftest.py index fa626b3c..1c39f822 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -21,12 +21,14 @@ def seq_provider() -> SequenceProvider: """Construct default sequence provider as fixture for testing.""" system_limits: SystemLimits = load_system_limits(Path("examples/example_device_config.yaml")) return SequenceProvider( - gradient_efficiency=[0.4, 0.4, 0.4], - gpa_gain=[1.0, 1.0, 1.0], - output_limits=[200, 6000, 6000, 6000], - spcm_dwell_time=5e-8, + gradient_efficiency=(0.4, 0.4, 0.4), + gpa_gain=(1.0, 1.0, 1.0), + gradient_output_limits=(6000, 6000, 6000), + gradients_50ohms=False, + rf_output_limit=200, + rf_50ohms=True, rf_to_mvolt=5e-3, - high_impedance=[False, True, True, True], + spcm_dwell_time=5e-8, system_limits=system_limits, ) diff --git a/tests/sequence_tests/test_sequence_provider.py b/tests/sequence_tests/test_sequence_provider.py index 7f665a69..78455265 100644 --- a/tests/sequence_tests/test_sequence_provider.py +++ b/tests/sequence_tests/test_sequence_provider.py @@ -67,7 +67,7 @@ def test_dict_contains_basic_config(seq_provider: SequenceProvider): # Values agree with constructor np.testing.assert_approx_equal(d["spcm_freq"], 1 / seq_provider.spcm_dwell_time) assert d["rf_to_mvolt"] == seq_provider.rf_to_mvolt - assert d["output_limits"] == seq_provider.output_limits + assert d["output_limits"] == seq_provider.gradient_out_limits def test_from_pypulseq_system_limit_violation(seq_provider: SequenceProvider, test_sequence): @@ -174,7 +174,7 @@ def test_calculate_arbitrary_gradient_block(seq_provider: SequenceProvider): # Test exceptions with pytest.raises(ValueError): _ = seq_provider._calculate_gradient( - block=block, fov_scaling=1., offset=seq_provider.output_limits[1], output_channel=1, + block=block, fov_scaling=1., offset=seq_provider.gradient_out_limits[1], output_channel=1, ) @@ -200,7 +200,7 @@ def test_calculate_trapezoid_gradient_block(seq_provider: SequenceProvider): # Test exceptions with pytest.raises(ValueError): _ = seq_provider._calculate_gradient( - block=block, fov_scaling=1., offset=seq_provider.output_limits[1], output_channel=1, + block=block, fov_scaling=1., offset=seq_provider.gradient_out_limits[1], output_channel=1, ) From 591115c9ffbfce6ba10bd6a01adb312f5338b14a Mon Sep 17 00:00:00 2001 From: David Schote Date: Wed, 4 Feb 2026 12:02:46 +0100 Subject: [PATCH 10/12] Adjustment of plot function, added test for plotting --- src/console/utilities/plot.py | 10 ++++---- tests/utilities_tests/test_sequence_plot.py | 27 +++++++++++++++++++++ 2 files changed, 32 insertions(+), 5 deletions(-) create mode 100644 tests/utilities_tests/test_sequence_plot.py diff --git a/src/console/utilities/plot.py b/src/console/utilities/plot.py index bdb36615..3db74f33 100644 --- a/src/console/utilities/plot.py +++ b/src/console/utilities/plot.py @@ -16,7 +16,7 @@ def plot_unrolled_sequence( ---------- sequence The unrolled/calculated sequence to be plotted. - time_range, default = (0, -1) + time_range Specify the time range of the plot in seconds. If the second value is smaller then the first or -1, the whole sequence is plotted. @@ -50,10 +50,10 @@ def plot_unrolled_sequence( gy_signal = np.array((np.uint16(gy_signal) << 1).astype(np.int16) / 2**15) gz_signal = np.array((np.uint16(gz_signal) << 1).astype(np.int16) / 2**15) - axis[0].plot(samples, sequence.output_limits[0] * rf_signal / sequence.impedance_scaling[0]) - axis[1].plot(samples, sequence.output_limits[1] * gx_signal / sequence.impedance_scaling[1]) - axis[2].plot(samples, sequence.output_limits[2] * gy_signal / sequence.impedance_scaling[2]) - axis[3].plot(samples, sequence.output_limits[3] * gz_signal / sequence.impedance_scaling[3]) + axis[0].plot(samples, sequence.rf_output_limit * rf_signal) + axis[1].plot(samples, sequence.gradient_output_limits[0] * gx_signal) + axis[2].plot(samples, sequence.gradient_output_limits[1] * gy_signal) + axis[3].plot(samples, sequence.gradient_output_limits[2] * gz_signal) axis[4].plot(samples, adc_gate, label="ADC gate") axis[4].plot(samples, unblanking, label="RF unblanking") diff --git a/tests/utilities_tests/test_sequence_plot.py b/tests/utilities_tests/test_sequence_plot.py new file mode 100644 index 00000000..a8ce99a6 --- /dev/null +++ b/tests/utilities_tests/test_sequence_plot.py @@ -0,0 +1,27 @@ +"""Testing plot function for unrolled sequences.""" +import matplotlib as mpl +import numpy as np +from pypulseq import Sequence + +from console.interfaces.acquisition_parameter import AcquisitionParameter +from console.interfaces.unrolled_sequence import UnrolledSequence +from console.pulseq_interpreter.sequence_provider import SequenceProvider +from console.utilities.plot import plot_unrolled_sequence + +NUM_SUBPLOTS = 5 + +def test_unrolled_sequence_plot( + seq_provider: SequenceProvider, + test_sequence: Sequence, + acquisition_parameter: AcquisitionParameter, +) -> None: + """Test unrolled sequence plot.""" + assert test_sequence.check_timing()[0] + + seq_provider.from_pypulseq(test_sequence) + unrolled_seq: UnrolledSequence = seq_provider.unroll_sequence(acquisition_parameter) + + fig, ax = plot_unrolled_sequence(unrolled_seq) + assert isinstance(fig, mpl.figure.Figure) + assert isinstance(ax, np.ndarray) + assert len(ax) == NUM_SUBPLOTS From 46d74130778e118b707782013ab94045d034eff4 Mon Sep 17 00:00:00 2001 From: David Schote Date: Wed, 4 Feb 2026 17:48:43 +0100 Subject: [PATCH 11/12] Fixed function to set gradient offsets (aligned with new impedance convention) --- .../spcm_control/acquisition_control.py | 8 +++++-- src/console/spcm_control/tx_device.py | 23 ++++++++----------- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/src/console/spcm_control/acquisition_control.py b/src/console/spcm_control/acquisition_control.py index 36813cfe..e7f5265b 100644 --- a/src/console/spcm_control/acquisition_control.py +++ b/src/console/spcm_control/acquisition_control.py @@ -242,7 +242,8 @@ def run(self, store_unprocessed: bool = False) -> AcquisitionData: # Set gradient offset values self.tx_card.set_gradient_offsets( - self.sequence.parameter.gradient_offset, self.seq_provider.high_impedance[1:] + offsets=self.sequence.parameter.gradient_offset, + is_50ohms=self.config.tx.gradients_terminated_50ohm, ) for k in range(self.sequence.parameter.num_averages): @@ -292,7 +293,10 @@ def run(self, store_unprocessed: bool = False) -> AcquisitionData: time.sleep(self.sequence.parameter.averaging_delay) # Reset gradient offset values - self.tx_card.set_gradient_offsets(Dimensions(x=0, y=0, z=0), self.seq_provider.high_impedance[1:]) + self.tx_card.set_gradient_offsets( + offsets=Dimensions(x=0, y=0, z=0), + is_50ohms=self.config.tx.gradients_terminated_50ohm, + ) if len(self.receive_data) > 0: self.log.debug(f"Total number of ADC events: {len(self.receive_data)}") diff --git a/src/console/spcm_control/tx_device.py b/src/console/spcm_control/tx_device.py index 8c67c48f..175548d2 100644 --- a/src/console/spcm_control/tx_device.py +++ b/src/console/spcm_control/tx_device.py @@ -184,15 +184,16 @@ class attribute is overwritten. )) self.log.debug("Device setup completed") - # _ = self.get_status() - def set_gradient_offsets(self, offsets: Dimensions, high_impedance: list[bool] = [True, True, True]) -> None: + def set_gradient_offsets(self, offsets: Dimensions, is_50ohms: bool = False) -> None: """Set offset values of the gradient output channels. Parameters ---------- offsets - Offset values as Dimension datatype as defined in acquisition parameter + Offset values given by Dimensions interface with integers in mV + is_50ohms + Boolean flag indicating if the gradient outputs are terminated into 50 ohms or into high impedance. Returns ------- @@ -214,18 +215,14 @@ def set_gradient_offsets(self, offsets: Dimensions, high_impedance: list[bool] = if abs(offsets.z) > self.max_amplitude[3]: self.log.error("Gradient offset of channel z exceeds maximum amplitude.") - # Extract per channel flag for high impedance termination - try: - x_high_imp, y_high_imp, z_high_imp = high_impedance - except ValueError as err: - # Raise error if list does not have 3 values to unpack - self.log.exception(err, exc_info=True) - raise err + # If the outputs are terminated with high impedance, only half of the offsets need to be set, + # as the value at the card output automatically doubles. + z_scaling = 1. if is_50ohms else 0.5 # Set offset values, scale offset by 0.5 if channel is terminated into high impedance - spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_OFFS1, int(offsets.x / 2) if x_high_imp else int(offsets.x)) - spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_OFFS2, int(offsets.y / 2) if y_high_imp else int(offsets.y)) - spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_OFFS3, int(offsets.z / 2) if z_high_imp else int(offsets.z)) + spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_OFFS1, int(offsets.x * z_scaling)) + spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_OFFS2, int(offsets.y * z_scaling)) + spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_OFFS3, int(offsets.z * z_scaling)) # Write setup spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_M2CMD, spcm.M2CMD_CARD_WRITESETUP) From ee81f4336be9068d4cfd8ce097328ede2bd12adc Mon Sep 17 00:00:00 2001 From: David Schote Date: Thu, 5 Feb 2026 11:43:54 +0100 Subject: [PATCH 12/12] Added plot function to acquisition control --- src/console/spcm_control/acquisition_control.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/console/spcm_control/acquisition_control.py b/src/console/spcm_control/acquisition_control.py index e7f5265b..2e5fa06f 100644 --- a/src/console/spcm_control/acquisition_control.py +++ b/src/console/spcm_control/acquisition_control.py @@ -9,6 +9,7 @@ from datetime import datetime from pathlib import Path +import matplotlib as mpl import numpy as np from console.interfaces.acquisition_data import AcquisitionData @@ -20,6 +21,7 @@ from console.spcm_control.rx_device import RxCard from console.spcm_control.tx_device import TxCard from console.utilities.load_configuration import load_nexus_config +from console.utilities.plot import plot_unrolled_sequence LOG_LEVELS = [ logging.DEBUG, @@ -348,3 +350,13 @@ def post_processing(self, parameter: AcquisitionParameter) -> None: with ThreadPoolExecutor() as executor: executor.map(lambda rx_obj: rx_obj.process_data(store_unprocessed=self.store_unprocessed) , self.receive_data) + + def plot_waveforms( + self, + time_range: tuple[float, float], + ) -> tuple[mpl.figure.Figure, np.ndarray] | None: + """Plot internally stored waveforms.""" + if self.sequence is not None: + return plot_unrolled_sequence(self.sequence, time_range=time_range) + self.log.warning("No sequence to plot. Set sequence first.") + return None