diff --git a/src/console/interfaces/unrolled_sequence.py b/src/console/interfaces/unrolled_sequence.py index c13bab21..04e92b60 100644 --- a/src/console/interfaces/unrolled_sequence.py +++ b/src/console/interfaces/unrolled_sequence.py @@ -18,7 +18,7 @@ class UnrolledSequence: `unroll_sequence` function. """ - seq: np.ndarray + seq: list[np.ndarray] """Replay data as int16 values in a list of numpy arrays. The sequence data already contains the digital adc and unblanking signals in the channels gx and gy.""" diff --git a/src/console/pulseq_interpreter/sequence_provider.py b/src/console/pulseq_interpreter/sequence_provider.py index 593a53b0..316ca3f0 100644 --- a/src/console/pulseq_interpreter/sequence_provider.py +++ b/src/console/pulseq_interpreter/sequence_provider.py @@ -108,6 +108,7 @@ def __init__( self.spcm_freq = 1 / spcm_dwell_time self.spcm_dwell_time = spcm_dwell_time self.system_limits = system_limits + self.tx_notify_size: float | None = None # Set impedance scaling factor, 0.5 if impedance is high, 1 if impedance is 50 ohms # Halve RF scaling factor if impedance is high, because the card output doubles for high impedance @@ -484,7 +485,6 @@ def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence: All the gradient channels contain a digital signal encoded by the 15th bit. - `gx`: ADC gate signal - - `gy`: Reference signal for phase correction - `gz`: RF unblanking signal The following example shows, how to extract the digital signals @@ -522,7 +522,7 @@ def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence: rf_events = self.get_rf_events() # Calculate rf pulse and unblanking waveforms from RF event - # Should probably be moved inside of get_rf_events() + # TODO: Should probably be moved inside of get_rf_events() rf_pulses = {} for rf_event in rf_events: rf_pulses[rf_event[0]] = self.calculate_rf( @@ -532,15 +532,40 @@ def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence: seq_duration, _, _ = self.duration() seq_samples = int(round(seq_duration * self.spcm_freq)) + seq_bytes = seq_samples * 4 * 2 # 4 channels, 2 bytes per channel + + if self.tx_notify_size is None or not isinstance(self.tx_notify_size, int): + # Check if its set and passed as int, other checks done within tx_card class + raise RuntimeError("Tx notify size is not set properly, it should be an integer") + + tx_notify_samples = self.tx_notify_size // 2 # Two bytes per sample + + if tx_notify_samples % 4 != 0: + raise ValueError("Notify cannot be devided in to integer number of sample sized chunks") + + # Calculate the total number of notify_size chunks that the sequence has to be subdivided in to + num_notifies = int(np.ceil(seq_bytes / self.tx_notify_size)) + + # create list of notify_sized arrays to store sequence in + _seq_notified = [np.zeros(tx_notify_samples, dtype=np.int16) for idx in range(num_notifies)] + + self.log.debug("Number of samples in sequence: %d" % (seq_samples)) + self.log.debug("Number of samples per notify: %d, number of notifies in sequence: %d" % (tx_notify_samples, num_notifies)) # Calculate the start time (and sample position) and duration of each block block_durations = np.array( [self.get_block(block_idx).block_duration for block_idx in list(events_list.keys())] ) block_durations = np.round(block_durations * self.spcm_freq).astype(int) + # Calculate the start position of each block in number of samples from the start of the sequence block_pos = np.cumsum(block_durations, dtype=np.int64) block_pos = np.insert(block_pos, 0, 0) + # Calculate which notify number and offset for the start of each block + block_pos_sam = 4 * block_pos + block_notify = 4 * block_pos // tx_notify_samples + block_offset = (4 * block_pos) % tx_notify_samples + if seq_samples != block_pos[-1]: raise IndexError( "Number of sequence samples does not match total number of block samples" @@ -565,8 +590,27 @@ def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence: ) delay = block.gx.delay delay_samples = round(delay * self.spcm_freq) - waveform_start_gx = waveform_start + 4 * delay_samples + delay_offset = block_pos_sam[event_idx] + 4 * delay_samples waveform_samples = np.size(waveform) + remaining_samples = waveform_samples + copied_samples = 0 + while remaining_samples > 0: + current_notify = (delay_offset + copied_samples * 4) // tx_notify_samples + current_offset = (delay_offset + copied_samples * 4) % tx_notify_samples + if remaining_samples * 4 > (tx_notify_samples - current_offset): + # Can fill the remainder of the notify + samples_available = (tx_notify_samples - current_offset) // 4 + _seq_notified[current_notify][current_offset + 1::4] = \ + waveform[copied_samples:copied_samples + samples_available] + remaining_samples -= samples_available + copied_samples += samples_available + else: + # Can only fill part of the notify + _seq_notified[current_notify][current_offset + 1:current_offset + remaining_samples * 4 + 1: 4] = \ + waveform[copied_samples:] + remaining_samples = 0 + + waveform_start_gx = waveform_start + 4 * delay_samples _seq[waveform_start_gx + 1:waveform_start_gx + 4 * waveform_samples + 1:4] = waveform if block.gy is not None: # Gy event @@ -575,6 +619,26 @@ def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence: ) delay = block.gy.delay delay_samples = round(delay * self.spcm_freq) + delay_offset = block_pos_sam[event_idx] + 4 * delay_samples + waveform_samples = np.size(waveform) + remaining_samples = waveform_samples + copied_samples = 0 + while remaining_samples > 0: + current_notify = (delay_offset + copied_samples * 4) // tx_notify_samples + current_offset = (delay_offset + copied_samples * 4) % tx_notify_samples + if remaining_samples * 4 > (tx_notify_samples - current_offset): + # Can fill the remainder of the notify + samples_available = (tx_notify_samples - current_offset) // 4 + _seq_notified[current_notify][current_offset + 2::4] = \ + waveform[copied_samples:copied_samples + samples_available] + remaining_samples -= samples_available + copied_samples += samples_available + else: + # Can only fill part of the notify + _seq_notified[current_notify][current_offset + 2:current_offset + remaining_samples * 4 + 2: 4] = \ + waveform[copied_samples:] + remaining_samples = 0 + waveform_start_gy = waveform_start + 4 * delay_samples waveform_samples = np.size(waveform) _seq[waveform_start_gy + 2:waveform_start_gy + 4 * waveform_samples + 2:4] = waveform @@ -585,6 +649,26 @@ def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence: ) delay = block.gz.delay delay_samples = round(delay * self.spcm_freq) + delay_offset = block_pos_sam[event_idx] + 4 * delay_samples + waveform_samples = np.size(waveform) + remaining_samples = waveform_samples + copied_samples = 0 + while remaining_samples > 0: + current_notify = (delay_offset + copied_samples * 4) // tx_notify_samples + current_offset = (delay_offset + copied_samples * 4) % tx_notify_samples + if remaining_samples * 4 > (tx_notify_samples - current_offset): + # Can fill the remainder of the notify + samples_available = (tx_notify_samples - current_offset) // 4 + _seq_notified[current_notify][current_offset + 3::4] = \ + waveform[copied_samples:copied_samples + samples_available] + remaining_samples -= samples_available + copied_samples += samples_available + else: + # Can only fill part of the notify + _seq_notified[current_notify][current_offset + 3:current_offset + remaining_samples * 4 + 3: 4] = \ + waveform[copied_samples:] + remaining_samples = 0 + waveform_start_gz = waveform_start + 4 * delay_samples waveform_samples = np.size(waveform) _seq[waveform_start_gz + 3:waveform_start_gz + 4 * waveform_samples + 3:4] = waveform @@ -604,6 +688,30 @@ def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence: rf_start = block_pos[event_idx] * 4 rf_end = (block_pos[event_idx] + rf_size) * 4 + delay_offset = block_pos_sam[event_idx] + waveform_samples = np.size(rf_waveform) + remaining_samples = waveform_samples + copied_samples = 0 + while remaining_samples > 0: + current_notify = (delay_offset + copied_samples * 4) // tx_notify_samples + current_offset = (delay_offset + copied_samples * 4) % tx_notify_samples + if remaining_samples * 4 > (tx_notify_samples - current_offset): + # Can fill the remainder of the notify + samples_available = (tx_notify_samples - current_offset) // 4 + _seq_notified[current_notify][current_offset::4] = \ + rf_waveform[copied_samples:copied_samples + samples_available] + _seq_notified[current_notify][current_offset + 3::4] = \ + _seq_notified[current_notify][current_offset + 3::4] | rf_unblanking[copied_samples:copied_samples + samples_available] + remaining_samples -= samples_available + copied_samples += samples_available + else: + # Can only fill part of the notify + _seq_notified[current_notify][current_offset:current_offset+remaining_samples * 4:4] = \ + rf_waveform[copied_samples:] + _seq_notified[current_notify][current_offset + 3:current_offset+remaining_samples * 4 + 3:4] = \ + _seq_notified[current_notify][current_offset + 3:current_offset+remaining_samples * 4 + 3:4] | rf_unblanking[copied_samples:] + remaining_samples = 0 + # Add RF waveform _seq[rf_start:rf_end:4] = rf_waveform # Add deblanking signal to Z gradient @@ -624,6 +732,27 @@ def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence: adc_start = block_pos[event_idx] * 4 adc_end = (block_pos[event_idx] + np.size(adc_waveform)) * 4 + delay_offset = block_pos_sam[event_idx] + waveform_samples = np.size(adc_waveform) + remaining_samples = waveform_samples + copied_samples = 0 + while remaining_samples > 0: + current_notify = (delay_offset + copied_samples * 4) // tx_notify_samples + current_offset = (delay_offset + copied_samples * 4) % tx_notify_samples + if remaining_samples * 4 > (tx_notify_samples - current_offset): + # Can fill the remainder of the notify + samples_available = (tx_notify_samples - current_offset) // 4 + _seq_notified[current_notify][current_offset + 1::4] = \ + _seq_notified[current_notify][current_offset + 1::4] | \ + adc_waveform[copied_samples:copied_samples + samples_available] + remaining_samples -= samples_available + copied_samples += samples_available + else: + # Can only fill part of the notify + _seq_notified[current_notify][current_offset + 1:current_offset+remaining_samples * 4 + 1:4] = \ + _seq_notified[current_notify][current_offset + 1:current_offset+remaining_samples * 4 + 1:4] | adc_waveform[copied_samples:] + remaining_samples = 0 + # Add ADC gate to X gradient _seq[adc_start + 1:adc_end + 1:4] = _seq[adc_start + 1:adc_end + 1:4] | adc_waveform @@ -650,9 +779,10 @@ def unroll_sequence(self, parameter: AcquisitionParameter) -> UnrolledSequence: # Save unrolled sequence in class self._sqnc_cache = _seq + self._sqnc_cache_notified = _seq_notified return UnrolledSequence( - seq=_seq, + seq=_seq_notified, sample_count=seq_samples, gpa_gain=self.gpa_gain, gradient_efficiency=self.grad_eff, diff --git a/src/console/spcm_control/acquisition_control.py b/src/console/spcm_control/acquisition_control.py index 7fff46e3..d5fa21ed 100644 --- a/src/console/spcm_control/acquisition_control.py +++ b/src/console/spcm_control/acquisition_control.py @@ -99,6 +99,9 @@ def __init__( impedance_50_ohms=self.config.rx.channel_terminated_50ohm, ) + # Set notify rate of the Tx card in the sequence provider + self.seq_provider.tx_notify_size = self.tx_card.notify_size + # Setup the cards self.is_setup: bool = False try: diff --git a/src/console/spcm_control/tx_device.py b/src/console/spcm_control/tx_device.py index edc5f2d9..395ef52d 100644 --- a/src/console/spcm_control/tx_device.py +++ b/src/console/spcm_control/tx_device.py @@ -54,10 +54,25 @@ def __init__( # Define maximum ring buffer size, 512 MSamples * 2 Bytes = 1024 MB self.max_ring_buffer_size: spcm.uint64 = spcm.uint64(1024**3) + # Set notify size independent of sequence + if self.max_ring_buffer_size.value % TX_NOTIFY_RATE != 0: + error_string = "Ring buffer size (%d) is not an integer multiple of notify rate (%d)"%( + self.max_ring_buffer_size.value, TX_NOTIFY_RATE + ) + self.log.debug(error_string) + raise ValueError(error_string) + + self._notify_size = self.max_ring_buffer_size.value // TX_NOTIFY_RATE + # Threading class attributes self.worker: threading.Thread | None = None self.is_running = threading.Event() + @property + def notify_size(self) -> int: + """Getter for notify size.""" + return self._notify_size + def setup_card(self) -> None: """Set up spectrum card in transmit (TX) mode. @@ -274,11 +289,21 @@ def start_operation(self, data: UnrolledSequence | None = None) -> None: # Data must have a default value as start_operation is an abstract method and data is optional if not data: raise ValueError("No unrolled sequence data provided.") + sqnc = data.seq - # Check if sequence datatype is valid - if sqnc.dtype != np.int16: - raise ValueError("Sequence replay data is not int16, please unroll sequence to int16.") + # Check if sequence is list + if not isinstance(sqnc, list): + raise ValueError("Sequence replay data is not a list.") + + # Check if sequence length is greater than 0 + if len(sqnc) == 0: + raise RuntimeError("Sequence has length 0") + + # Check if sequence elements are all numpy arrays with dtype int16 + if any((not isinstance(seq_element, np.ndarray)) and + (seq_element.dtype == np.int16) for seq_element in sqnc): + raise TypeError("Sequence datatype is not np.int16") # Check if card connection is established if not self.card: @@ -316,7 +341,7 @@ def stop_operation(self) -> None: else: print("No active replay thread found...") - def _fifo_stream_worker(self, data: np.ndarray) -> None: + def _fifo_stream_worker(self, data: list[np.ndarray]) -> None: """Continuous FIFO mode examples. Parameters @@ -327,25 +352,18 @@ def _fifo_stream_worker(self, data: np.ndarray) -> None: >>> [c0_0, c1_0, c2_0, c3_0, c0_1, c1_1, c2_1, c3_1, ..., cX_N] Here, X denotes the channel and the subsequent index N the sample index. """ - # Get total size of data buffer to be played out - self.data_buffer_size = data.nbytes - self.log.debug("Replay data buffer: %s bytes", self.data_buffer_size) - - # Calculate notify size is set to 1/16 of the replay buffer size - # Ensure that minimum notify size is 4096 bytes - notify_size = spcm.int32(max(4096, min( - int(((self.data_buffer_size / TX_NOTIFY_RATE) // 4096) * 4096), - int(((self.max_ring_buffer_size.value / TX_NOTIFY_RATE) // 4096) * 4096), - ))) + # Get sequence length (in units of notify size) + seq_length = len(data) + self.log.debug("Sequence length: %d notifies"%(seq_length)) + self.log.debug("Chunk size: %d"%(np.size(data[0]))) # >> Define software buffer - # Setup replay data buffer - data_buffer = data.ctypes.data_as(ctypes.POINTER(ctypes.c_int16)) - # Allocate continuous ring buffer with minimimum necessary amount of memory, ensure multiple of notify size - min_ring_buffer_size = int(np.ceil(self.data_buffer_size / notify_size.value) * notify_size.value) - # Create page-aligned ring buffer - ring_buffer = create_dma_buffer(min(self.max_ring_buffer_size.value, min_ring_buffer_size)) + # Allocate continuous ring buffer wih tminimimum necessary amount of memory, ensure multiple of notify size + ring_buffer = create_dma_buffer(np.min((seq_length, TX_NOTIFY_RATE))*self._notify_size) + + # Check size of ring buffer and get its address in memory ring_buffer_size = spcm.uint64(len(ring_buffer)) + ring_buffer_base_addr = ctypes.cast(ring_buffer, ctypes.c_void_p).value try: # Check if ring buffer size is multiple of 2*num_ch (2 bytes per sample per channel) @@ -354,11 +372,7 @@ def _fifo_stream_worker(self, data: np.ndarray) -> None: "Ring buffer size is not a multiple of channel sample product \ (number of enables channels times 2 byte per sample)" ) - # Check size of data buffer - if self.data_buffer_size % (self.num_ch * 2) != 0: - raise MemoryError( - "Replay data size is not a multiple of enabled channels times 2 (bytes per sample)..." - ) + except MemoryError as err: self.log.exception(err, exc_info=True) raise err @@ -366,37 +380,45 @@ def _fifo_stream_worker(self, data: np.ndarray) -> None: self.log.debug( "Ring buffer size: %s; Notify size: %s", ring_buffer_size.value, - notify_size.value, + self.notify_size, ) - try: - # Perform initial memory transfer: Fill the whole ring buffer - if (_ring_buffer_pos := ctypes.cast(ring_buffer, ctypes.c_void_p).value) and \ - (_data_buffer_pos := ctypes.cast(data_buffer, ctypes.c_void_p).value): - if self.data_buffer_size < ring_buffer_size.value: - ctypes.memmove(_ring_buffer_pos, _data_buffer_pos, self.data_buffer_size) - transferred_bytes = self.data_buffer_size - else: - ctypes.memmove(_ring_buffer_pos, _data_buffer_pos, ring_buffer_size.value) - transferred_bytes = ring_buffer_size.value - else: - raise RuntimeError("Could not get ring or data buffer position.") - except RuntimeError as err: - self.log.exception(err, exc_info=True) - raise err + # Perform initial memory transfe + current_element = 0 + current_position = 0 + if ring_buffer_base_addr: + # Grab the total number of notifies to transfer initially + initial_elements = min((seq_length, TX_NOTIFY_RATE)) + # Transfer those sequence elements to the ring buffer + for idx in range(initial_elements): + seq_element = data[idx] + seq_element_ptr = seq_element.ctypes.data_as(ctypes.POINTER(ctypes.c_int16)) + ctypes.memmove( + ring_buffer_base_addr + current_position, + ctypes.cast(seq_element_ptr, ctypes.c_void_p).value, + seq_element.nbytes + ) + current_position += seq_element.nbytes + current_element += 1 + # Remove that sequence element from memory + data[idx] = None + initial_transfer_size = current_position + else: + raise RuntimeError("Could not get ring buffer address") - # Perform initial data transfer to completely fill continuous buffer + # Perform initial data transfer to completely fill continuous buffer (on card) spcm.spcm_dwDefTransfer_i64( self.card, spcm.SPCM_BUF_DATA, spcm.SPCM_DIR_PCTOCARD, - notify_size, + spcm.int32(self._notify_size), ring_buffer, spcm.uint64(0), ring_buffer_size, ) - self.handle_error(spcm.spcm_dwSetParam_i64(self.card, spcm.SPC_DATA_AVAIL_CARD_LEN, ring_buffer_size)) + # Tell the card how much data is available + self.handle_error(spcm.spcm_dwSetParam_i64(self.card, spcm.SPC_DATA_AVAIL_CARD_LEN, initial_transfer_size)) self.log.debug("Starting card memory transfer") self.handle_error(spcm.spcm_dwSetParam_i32( @@ -415,53 +437,54 @@ def _fifo_stream_worker(self, data: np.ndarray) -> None: avail_bytes = spcm.int32(0) usr_position = spcm.int32(0) - transfer_count = 0 - while (transferred_bytes < self.data_buffer_size) and not self.is_running.is_set(): + while (current_element < len(data)) and not self.is_running.is_set(): # Read available bytes and user position spcm.spcm_dwGetParam_i32(self.card, spcm.SPC_DATA_AVAIL_USER_LEN, ctypes.byref(avail_bytes)) spcm.spcm_dwGetParam_i32(self.card, spcm.SPC_DATA_AVAIL_USER_POS, ctypes.byref(usr_position)) # Calculate new data for the transfer, when notify_size is available on continous buffer - if avail_bytes.value >= notify_size.value: - transfer_count += 1 - - ring_buffer_position = ctypes.cast(( - ctypes.c_char * (self.max_ring_buffer_size.value - usr_position.value)).from_buffer( - ring_buffer, usr_position.value), ctypes.c_void_p - ).value - - current_data_buffer = ctypes.cast(data_buffer, ctypes.c_void_p).value - - # Get new buffer positions - if ring_buffer_position and current_data_buffer: - data_buffer_position = current_data_buffer + transferred_bytes - - if (bytes_remaining := self.data_buffer_size - transferred_bytes) >= notify_size.value: - # Enough data available -> copy notify size + if avail_bytes.value >= self._notify_size: + + # Get memory position of where to transfer data to + ring_buffer_position = ring_buffer_base_addr + usr_position.value + if ring_buffer_position: + # Get next chunk + seq_element = data[current_element] + seq_element_ptr = seq_element.ctypes.data_as(ctypes.POINTER(ctypes.c_int16)) + + # Check if this is the last chunk and might be partial + if current_element == len(data) - 1 and seq_element.nbytes < self._notify_size: + # Copy partial chunk and pad with zeros ctypes.memmove( ring_buffer_position, - data_buffer_position, - notify_size.value, + ctypes.cast(seq_element_ptr, ctypes.c_void_p).value, + seq_element.nbytes + ) + ctypes.memset( + ring_buffer_position + seq_element.nbytes, + 0, + self._notify_size - seq_element.nbytes ) else: - # Not enough data availabe -> set remaining bytes to zero + # Copy full chunk ctypes.memmove( ring_buffer_position, - data_buffer_position, - bytes_remaining, - ) - ctypes.memset( - ring_buffer_position + bytes_remaining, - 0, - notify_size.value - bytes_remaining, + ctypes.cast(seq_element_ptr, ctypes.c_void_p).value, + self._notify_size ) + # Remove sequence element from memory + data[current_element] = None + # Notify card of new data self.handle_error( - spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_DATA_AVAIL_CARD_LEN, notify_size) + spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_DATA_AVAIL_CARD_LEN, spcm.int32(self._notify_size)) ) - transferred_bytes += notify_size.value + current_element += 1 + self.log.debug("Transferred chunk %d/%d", current_element, len(data)) + + # Wait for transfer self.handle_error(spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_M2CMD, spcm.M2CMD_DATA_WAITDMA)) self.handle_error(spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_M2CMD, spcm.M2CMD_DATA_WAITDMA))