Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/console/interfaces/unrolled_sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class UnrolledSequence:
`unroll_sequence` function.
"""

seq: np.ndarray
seq: list[np.ndarray]
"""Replay data as int16 values in a list of numpy arrays. The sequence data already
contains the digital adc and unblanking signals in the channels gx and gy."""

Expand Down
138 changes: 134 additions & 4 deletions src/console/pulseq_interpreter/sequence_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def __init__(
self.spcm_freq = 1 / spcm_dwell_time
self.spcm_dwell_time = spcm_dwell_time
self.system_limits = system_limits
self.tx_notify_size: float | None = None

# Set impedance scaling factor, 0.5 if impedance is high, 1 if impedance is 50 ohms
# Halve RF scaling factor if impedance is high, because the card output doubles for high impedance
Expand Down Expand Up @@ -484,7 +485,6 @@ def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence:

All the gradient channels contain a digital signal encoded by the 15th bit.
- `gx`: ADC gate signal
- `gy`: Reference signal for phase correction
- `gz`: RF unblanking signal
The following example shows, how to extract the digital signals

Expand Down Expand Up @@ -522,7 +522,7 @@ def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence:
rf_events = self.get_rf_events()

# Calculate rf pulse and unblanking waveforms from RF event
# Should probably be moved inside of get_rf_events()
# TODO: Should probably be moved inside of get_rf_events()
rf_pulses = {}
for rf_event in rf_events:
rf_pulses[rf_event[0]] = self.calculate_rf(
Expand All @@ -532,15 +532,40 @@ def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence:

seq_duration, _, _ = self.duration()
seq_samples = int(round(seq_duration * self.spcm_freq))
seq_bytes = seq_samples * 4 * 2 # 4 channels, 2 bytes per channel

if self.tx_notify_size is None or not isinstance(self.tx_notify_size, int):
# Check if its set and passed as int, other checks done within tx_card class
raise RuntimeError("Tx notify size is not set properly, it should be an integer")

tx_notify_samples = self.tx_notify_size // 2 # Two bytes per sample

if tx_notify_samples % 4 != 0:
raise ValueError("Notify cannot be devided in to integer number of sample sized chunks")

# Calculate the total number of notify_size chunks that the sequence has to be subdivided in to
num_notifies = int(np.ceil(seq_bytes / self.tx_notify_size))

# create list of notify_sized arrays to store sequence in
_seq_notified = [np.zeros(tx_notify_samples, dtype=np.int16) for idx in range(num_notifies)]

self.log.debug("Number of samples in sequence: %d" % (seq_samples))
self.log.debug("Number of samples per notify: %d, number of notifies in sequence: %d" % (tx_notify_samples, num_notifies))

# Calculate the start time (and sample position) and duration of each block
block_durations = np.array(
[self.get_block(block_idx).block_duration for block_idx in list(events_list.keys())]
)
block_durations = np.round(block_durations * self.spcm_freq).astype(int)
# Calculate the start position of each block in number of samples from the start of the sequence
block_pos = np.cumsum(block_durations, dtype=np.int64)
block_pos = np.insert(block_pos, 0, 0)

# Calculate which notify number and offset for the start of each block
block_pos_sam = 4 * block_pos
block_notify = 4 * block_pos // tx_notify_samples
block_offset = (4 * block_pos) % tx_notify_samples

if seq_samples != block_pos[-1]:
raise IndexError(
"Number of sequence samples does not match total number of block samples"
Expand All @@ -565,8 +590,27 @@ def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence:
)
delay = block.gx.delay
delay_samples = round(delay * self.spcm_freq)
waveform_start_gx = waveform_start + 4 * delay_samples
delay_offset = block_pos_sam[event_idx] + 4 * delay_samples
waveform_samples = np.size(waveform)
remaining_samples = waveform_samples
copied_samples = 0
while remaining_samples > 0:
current_notify = (delay_offset + copied_samples * 4) // tx_notify_samples
current_offset = (delay_offset + copied_samples * 4) % tx_notify_samples
if remaining_samples * 4 > (tx_notify_samples - current_offset):
# Can fill the remainder of the notify
samples_available = (tx_notify_samples - current_offset) // 4
_seq_notified[current_notify][current_offset + 1::4] = \
waveform[copied_samples:copied_samples + samples_available]
remaining_samples -= samples_available
copied_samples += samples_available
else:
# Can only fill part of the notify
_seq_notified[current_notify][current_offset + 1:current_offset + remaining_samples * 4 + 1: 4] = \
waveform[copied_samples:]
remaining_samples = 0

waveform_start_gx = waveform_start + 4 * delay_samples
_seq[waveform_start_gx + 1:waveform_start_gx + 4 * waveform_samples + 1:4] = waveform

if block.gy is not None: # Gy event
Expand All @@ -575,6 +619,26 @@ def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence:
)
delay = block.gy.delay
delay_samples = round(delay * self.spcm_freq)
delay_offset = block_pos_sam[event_idx] + 4 * delay_samples
waveform_samples = np.size(waveform)
remaining_samples = waveform_samples
copied_samples = 0
while remaining_samples > 0:
current_notify = (delay_offset + copied_samples * 4) // tx_notify_samples
current_offset = (delay_offset + copied_samples * 4) % tx_notify_samples
if remaining_samples * 4 > (tx_notify_samples - current_offset):
# Can fill the remainder of the notify
samples_available = (tx_notify_samples - current_offset) // 4
_seq_notified[current_notify][current_offset + 2::4] = \
waveform[copied_samples:copied_samples + samples_available]
remaining_samples -= samples_available
copied_samples += samples_available
else:
# Can only fill part of the notify
_seq_notified[current_notify][current_offset + 2:current_offset + remaining_samples * 4 + 2: 4] = \
waveform[copied_samples:]
remaining_samples = 0

waveform_start_gy = waveform_start + 4 * delay_samples
waveform_samples = np.size(waveform)
_seq[waveform_start_gy + 2:waveform_start_gy + 4 * waveform_samples + 2:4] = waveform
Expand All @@ -585,6 +649,26 @@ def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence:
)
delay = block.gz.delay
delay_samples = round(delay * self.spcm_freq)
delay_offset = block_pos_sam[event_idx] + 4 * delay_samples
waveform_samples = np.size(waveform)
remaining_samples = waveform_samples
copied_samples = 0
while remaining_samples > 0:
current_notify = (delay_offset + copied_samples * 4) // tx_notify_samples
current_offset = (delay_offset + copied_samples * 4) % tx_notify_samples
if remaining_samples * 4 > (tx_notify_samples - current_offset):
# Can fill the remainder of the notify
samples_available = (tx_notify_samples - current_offset) // 4
_seq_notified[current_notify][current_offset + 3::4] = \
waveform[copied_samples:copied_samples + samples_available]
remaining_samples -= samples_available
copied_samples += samples_available
else:
# Can only fill part of the notify
_seq_notified[current_notify][current_offset + 3:current_offset + remaining_samples * 4 + 3: 4] = \
waveform[copied_samples:]
remaining_samples = 0

waveform_start_gz = waveform_start + 4 * delay_samples
waveform_samples = np.size(waveform)
_seq[waveform_start_gz + 3:waveform_start_gz + 4 * waveform_samples + 3:4] = waveform
Expand All @@ -604,6 +688,30 @@ def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence:
rf_start = block_pos[event_idx] * 4
rf_end = (block_pos[event_idx] + rf_size) * 4

delay_offset = block_pos_sam[event_idx]
waveform_samples = np.size(rf_waveform)
remaining_samples = waveform_samples
copied_samples = 0
while remaining_samples > 0:
current_notify = (delay_offset + copied_samples * 4) // tx_notify_samples
current_offset = (delay_offset + copied_samples * 4) % tx_notify_samples
if remaining_samples * 4 > (tx_notify_samples - current_offset):
# Can fill the remainder of the notify
samples_available = (tx_notify_samples - current_offset) // 4
_seq_notified[current_notify][current_offset::4] = \
rf_waveform[copied_samples:copied_samples + samples_available]
_seq_notified[current_notify][current_offset + 3::4] = \
_seq_notified[current_notify][current_offset + 3::4] | rf_unblanking[copied_samples:copied_samples + samples_available]
remaining_samples -= samples_available
copied_samples += samples_available
else:
# Can only fill part of the notify
_seq_notified[current_notify][current_offset:current_offset+remaining_samples * 4:4] = \
rf_waveform[copied_samples:]
_seq_notified[current_notify][current_offset + 3:current_offset+remaining_samples * 4 + 3:4] = \
_seq_notified[current_notify][current_offset + 3:current_offset+remaining_samples * 4 + 3:4] | rf_unblanking[copied_samples:]
remaining_samples = 0

# Add RF waveform
_seq[rf_start:rf_end:4] = rf_waveform
# Add deblanking signal to Z gradient
Expand All @@ -624,6 +732,27 @@ def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence:
adc_start = block_pos[event_idx] * 4
adc_end = (block_pos[event_idx] + np.size(adc_waveform)) * 4

delay_offset = block_pos_sam[event_idx]
waveform_samples = np.size(adc_waveform)
remaining_samples = waveform_samples
copied_samples = 0
while remaining_samples > 0:
current_notify = (delay_offset + copied_samples * 4) // tx_notify_samples
current_offset = (delay_offset + copied_samples * 4) % tx_notify_samples
if remaining_samples * 4 > (tx_notify_samples - current_offset):
# Can fill the remainder of the notify
samples_available = (tx_notify_samples - current_offset) // 4
_seq_notified[current_notify][current_offset + 1::4] = \
_seq_notified[current_notify][current_offset + 1::4] | \
adc_waveform[copied_samples:copied_samples + samples_available]
remaining_samples -= samples_available
copied_samples += samples_available
else:
# Can only fill part of the notify
_seq_notified[current_notify][current_offset + 1:current_offset+remaining_samples * 4 + 1:4] = \
_seq_notified[current_notify][current_offset + 1:current_offset+remaining_samples * 4 + 1:4] | adc_waveform[copied_samples:]
remaining_samples = 0

# Add ADC gate to X gradient
_seq[adc_start + 1:adc_end + 1:4] = _seq[adc_start + 1:adc_end + 1:4] | adc_waveform

Expand All @@ -650,9 +779,10 @@ def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence:

# Save unrolled sequence in class
self._sqnc_cache = _seq
self._sqnc_cache_notified = _seq_notified

return UnrolledSequence(
seq=_seq,
seq=_seq_notified,
sample_count=seq_samples,
gpa_gain=self.gpa_gain,
gradient_efficiency=self.grad_eff,
Expand Down
3 changes: 3 additions & 0 deletions src/console/spcm_control/acquisition_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ def __init__(
impedance_50_ohms=self.config.rx.channel_terminated_50ohm,
)

# Set notify rate of the Tx card in the sequence provider
self.seq_provider.tx_notify_size = self.tx_card.notify_size

# Setup the cards
self.is_setup: bool = False
try:
Expand Down
Loading
Loading