diff --git a/brainbox/io/one.py b/brainbox/io/one.py index c1c86726e..326b240f2 100644 --- a/brainbox/io/one.py +++ b/brainbox/io/one.py @@ -1050,8 +1050,8 @@ def raster(self, spikes, channels, save_dir=None, br=None, label='raster', time_ :param **kwargs: kwargs passed to `driftmap()` (optional) :return: """ - br = br or BrainRegions() - time_series = time_series or {} + br = BrainRegions() if br is None else br + time_series = {} if time_series is None else time_series fig, axs = plt.subplots(2, 2, gridspec_kw={ 'width_ratios': [.95, .05], 'height_ratios': [.1, .9]}, figsize=(16, 9), sharex='col') axs[0, 1].set_axis_off() @@ -1094,13 +1094,20 @@ def plot_rawdata_snippet(self, sr, spikes, clusters, t0, save_dir=None, label='raster', gain=-93, - title=None): + title=None, + alpha=0.3, + processing='destripe'): # compute the raw data offset and destripe, we take 400ms around t0 first_sample, last_sample = (int((t0 - 0.2) * sr.fs), int((t0 + 0.2) * sr.fs)) raw = sr[first_sample:last_sample, :-sr.nsync].T channel_labels = channels['labels'] if (channels is not None) and ('labels' in channels) else True - destriped = ibldsp.voltage.destripe(raw, sr.fs, channel_labels=channel_labels) + if processing == 'destripe': + samples = ibldsp.voltage.destripe(raw, sr.fs, channel_labels=channel_labels) + else: + import scipy.signal + sos = scipy.signal.butter(**{"N": 3, "Wn": 300 / sr.fs * 2, "btype": "highpass"}, output="sos") + samples = scipy.signal.sosfiltfilt(sos, raw) # filter out the spikes according to good/bad clusters and to the time slice spike_sel = slice(*np.searchsorted(spikes['samples'], [first_sample, last_sample])) ss = spikes['samples'][spike_sel] @@ -1110,9 +1117,9 @@ def plot_rawdata_snippet(self, sr, spikes, clusters, t0, title = self._default_plot_title(spikes) # display the raw data snippet with spikes overlaid fig, axs = plt.subplots(1, 2, gridspec_kw={'width_ratios': [.95, .05]}, figsize=(16, 9), sharex='col') - Density(destriped, fs=sr.fs, taxis=1, gain=gain, ax=axs[0], t0=t0 - 0.2, unit='s') - axs[0].scatter(ss[sok] / sr.fs, sc[sok], color="green", alpha=0.5) - axs[0].scatter(ss[~sok] / sr.fs, sc[~sok], color="red", alpha=0.5) + Density(samples, fs=sr.fs, taxis=1, gain=gain, ax=axs[0], t0=t0 - 0.2, unit='s') + axs[0].scatter(ss[sok] / sr.fs, sc[sok], color="green", alpha=alpha) + axs[0].scatter(ss[~sok] / sr.fs, sc[~sok], color="red", alpha=alpha) axs[0].set(title=title, xlim=[t0 - 0.035, t0 + 0.035]) # adds the channel locations if available if (channels is not None) and ('atlas_id' in channels): @@ -1314,7 +1321,7 @@ def _find_behaviour_collection(self, obj): f'e.g sl.load_{obj}(collection="{collections[0]}")') raise ALFMultipleCollectionsFound - def load_trials(self, collection=None): + def load_trials(self, collection=None, revision=None): """ Function to load trials data into SessionLoader.trials @@ -1323,13 +1330,13 @@ def load_trials(self, collection=None): collection: str Alf collection of trials data """ - + revision = self.revision if revision is None else revision if not collection: collection = self._find_behaviour_collection('trials') # itiDuration frequently has a mismatched dimension, and we don't need it, exclude using regex self.one.wildcards = False self.trials = self.one.load_object( - self.eid, 'trials', collection=collection, attribute=r'(?!itiDuration).*', revision=self.revision or None).to_df() + self.eid, 'trials', collection=collection, attribute=r'(?!itiDuration).*', revision=revision or None).to_df() self.one.wildcards = True self.data_info.loc[self.data_info['name'] == 'trials', 'is_loaded'] = True diff --git a/ibllib/pipes/ephys_tasks.py b/ibllib/pipes/ephys_tasks.py index e9d152963..4511c5e29 100644 --- a/ibllib/pipes/ephys_tasks.py +++ b/ibllib/pipes/ephys_tasks.py @@ -1,8 +1,10 @@ +import importlib import logging from pathlib import Path import re import shutil import subprocess +import sys import traceback import packaging.version @@ -124,7 +126,7 @@ class EphysCompressNP1(base_tasks.EphysTask): priority = 90 cpu = 2 io_charge = 100 # this jobs reads raw ap files - job_size = 'small' + job_size = 'large' @property def signature(self): @@ -592,7 +594,6 @@ class SpikeSorting(base_tasks.EphysTask, CellQCMixin): SHELL_SCRIPT = Path.home().joinpath( f"Documents/PYTHON/iblscripts/deploy/serverpc/{_sortername}/sort_recording.sh" ) - SPIKE_SORTER_NAME = 'iblsorter' SORTER_REPOSITORY = Path.home().joinpath('Documents/PYTHON/SPIKE_SORTING/ibl-sorter') @property @@ -608,11 +609,12 @@ def signature(self): # ./raw_ephys_data/{self.pname}/ ('_iblqc_ephysTimeRmsAP.rms.npy', f'{self.device_collection}/{self.pname}/', True), ('_iblqc_ephysTimeRmsAP.timestamps.npy', f'{self.device_collection}/{self.pname}/', True), - ('_iblqc_ephysSaturation.samples.npy', f'{self.device_collection}/{self.pname}/', True), + ('_iblqc_ephysSaturation.samples.pqt', f'{self.device_collection}/{self.pname}/', True), # ./spike_sorters/iblsorter/{self.pname} ('_kilosort_raw.output.tar', f'spike_sorters/{self._sortername}/{self.pname}/', True), # ./alf/{self.pname}/iblsorter - (f'_ibl_log.info_{self.SPIKE_SORTER_NAME}.log', f'alf/{self.pname}/{self._sortername}', True), + (f'{self._sortername}_parameters.yaml', f'alf/{self.pname}/{self._sortername}', True), + (f'_ibl_log.info_{self._sortername}.log', f'alf/{self.pname}/{self._sortername}', True), ('_kilosort_whitening.matrix.npy', f'alf/{self.pname}/{self._sortername}/', True), ('_phy_spikes_subset.channels.npy', f'alf/{self.pname}/{self._sortername}/', True), ('_phy_spikes_subset.spikes.npy', f'alf/{self.pname}/{self._sortername}/', True), @@ -657,15 +659,7 @@ def scratch_folder_run(self): For a scratch drive at /mnt/h0 we would have the following temp dir: /mnt/h0/iblsorter_1.8.0_CSHL071_2020-10-04_001_probe01/ """ - # get the scratch drive from the shell script - if self.scratch_folder is None: - with open(self.SHELL_SCRIPT) as fid: - lines = fid.readlines() - line = [line for line in lines if line.startswith("SCRATCH_DRIVE=")][0] - m = re.search(r"\=(.*?)(\#|\n)", line)[0] - scratch_drive = Path(m[1:-1].strip()) - else: - scratch_drive = self.scratch_folder + scratch_drive = self.scratch_folder if self.scratch_folder else Path('/scratch') assert scratch_drive.exists(), f"Scratch drive {scratch_drive} not found" # get the version of the sorter self.version = self._fetch_iblsorter_version(self.SORTER_REPOSITORY) @@ -718,15 +712,15 @@ def _fetch_iblsorter_run_version(log_file): def _run_iblsort(self, ap_file): """ Runs the ks2 matlab spike sorting for one probe dataset - the raw spike sorting output is in session_path/spike_sorters/{self.SPIKE_SORTER_NAME}/probeXX folder + the raw spike sorting output is in session_path/spike_sorters/{self._sortername}/probeXX folder (discontinued support for old spike sortings in the probe folder <1.5.5) :return: path of the folder containing ks2 spike sorting output """ iblutil.util.setup_logger('iblsorter', level='INFO') - sorter_dir = self.session_path.joinpath("spike_sorters", self.SPIKE_SORTER_NAME, self.pname) + sorter_dir = self.session_path.joinpath("spike_sorters", self._sortername, self.pname) self.FORCE_RERUN = False if not self.FORCE_RERUN: - log_file = sorter_dir.joinpath(f"_ibl_log.info_{self.SPIKE_SORTER_NAME}.log") + log_file = sorter_dir.joinpath(f"_ibl_log.info_{self._sortername}.log") if log_file.exists(): run_version = self._fetch_iblsorter_run_version(log_file) if packaging.version.parse(run_version) >= packaging.version.parse('1.7.0'): @@ -737,11 +731,11 @@ def _run_iblsort(self, ap_file): self.FORCE_RERUN = True self.scratch_folder_run.mkdir(parents=True, exist_ok=True) check_nvidia_driver() - try: - # if pykilosort is in the environment, use the installed version within the task + # this is the best way I found to check if iblsorter is installed and available without a try block + if 'iblsorter' in sys.modules and importlib.util.find_spec('iblsorter.ibl') is not None: import iblsorter.ibl # noqa iblsorter.ibl.run_spike_sorting_ibl(bin_file=ap_file, scratch_dir=self.scratch_folder_run, delete=False) - except ImportError: + else: command2run = f"{self.SHELL_SCRIPT} {ap_file} {self.scratch_folder_run}" _logger.info(command2run) process = subprocess.Popen( @@ -762,7 +756,7 @@ def _run_iblsort(self, ap_file): log = fid.read() _logger.error(log) break - raise RuntimeError(f"{self.SPIKE_SORTER_NAME} {info_str}, {error_str}") + raise RuntimeError(f"{self._sortername} {info_str}, {error_str}") shutil.copytree(self.scratch_folder_run.joinpath('output'), sorter_dir, dirs_exist_ok=True) return sorter_dir @@ -783,7 +777,7 @@ def _run(self): out_files = [] sorter_dir = self._run_iblsort(ap_file) # runs the sorter, skips if it already ran # convert the data to ALF in the ./alf/probeXX/SPIKE_SORTER_NAME folder - probe_out_path = self.session_path.joinpath("alf", label, self.SPIKE_SORTER_NAME) + probe_out_path = self.session_path.joinpath("alf", label, self._sortername) shutil.rmtree(probe_out_path, ignore_errors=True) probe_out_path.mkdir(parents=True, exist_ok=True) ibllib.ephys.spikes.ks2_to_alf( @@ -793,9 +787,9 @@ def _run(self): bin_file=ap_file, ampfactor=self._sample2v(ap_file), ) - logfile = sorter_dir.joinpath(f"_ibl_log.info_{self.SPIKE_SORTER_NAME}.log") + logfile = sorter_dir.joinpath(f"_ibl_log.info_{self._sortername}.log") if logfile.exists(): - shutil.copyfile(logfile, probe_out_path.joinpath(f"_ibl_log.info_{self.SPIKE_SORTER_NAME}.log")) + shutil.copyfile(logfile, probe_out_path.joinpath(f"_ibl_log.info_{self._sortername}.log")) # recover the QC files from the spike sorting output and copy them for file_qc in sorter_dir.glob('_iblqc_*.npy'): shutil.move(file_qc, file_qc_out := ap_file.parent.joinpath(file_qc.name)) @@ -809,7 +803,7 @@ def _run(self): # convert ks2_output into tar file and also register # Make this in case spike sorting is in old raw_ephys_data folders, for new # sessions it should already exist - tar_dir = self.session_path.joinpath('spike_sorters', self.SPIKE_SORTER_NAME, label) + tar_dir = self.session_path.joinpath('spike_sorters', self._sortername, label) tar_dir.mkdir(parents=True, exist_ok=True) out = ibllib.ephys.spikes.ks2_to_tar(sorter_dir, tar_dir, force=self.FORCE_RERUN) out_files.extend(out) diff --git a/ibllib/pipes/video_tasks.py b/ibllib/pipes/video_tasks.py index e0ced2695..5afe80796 100644 --- a/ibllib/pipes/video_tasks.py +++ b/ibllib/pipes/video_tasks.py @@ -328,7 +328,7 @@ def _run(self, update=True, **kwargs): class DLC(base_tasks.VideoTask): """ This task relies on a correctly installed dlc environment as per - https://docs.google.com/document/d/1g0scP6_3EmaXCU4SsDNZWwDTaD9MG0es_grLA-d0gh0/edit# + https://github.com/int-brain-lab/iblvideo#installing-dlc-locally-on-an-ibl-server---tensorflow-2120 If your environment is set up otherwise, make sure that you set the respective attributes: t = EphysDLC(session_path) @@ -341,6 +341,7 @@ class DLC(base_tasks.VideoTask): level = 2 force = True job_size = 'large' + env = 'dlc' dlcenv = Path.home().joinpath('Documents', 'PYTHON', 'envs', 'dlcenv', 'bin', 'activate') scripts = Path.home().joinpath('Documents', 'PYTHON', 'iblscripts', 'deploy', 'serverpc', 'dlc') @@ -357,25 +358,41 @@ def signature(self): return signature def _check_dlcenv(self): - """Check that scripts are present, dlcenv can be activated and get iblvideo version""" - assert len(list(self.scripts.rglob('run_dlc.*'))) == 2, \ - f'Scripts run_dlc.sh and run_dlc.py do not exist in {self.scripts}' - assert len(list(self.scripts.rglob('run_motion.*'))) == 2, \ - f'Scripts run_motion.sh and run_motion.py do not exist in {self.scripts}' - assert self.dlcenv.exists(), f'DLC environment does not exist in assumed location {self.dlcenv}' - command2run = f"source {self.dlcenv}; python -c 'import iblvideo; print(iblvideo.__version__)'" - process = subprocess.Popen( - command2run, - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - executable='/bin/bash' - ) - info, error = process.communicate() - if process.returncode != 0: - raise AssertionError(f"DLC environment check failed\n{error.decode('utf-8')}") - version = info.decode('utf-8').strip().split('\n')[-1] - return version + """ + Check DLC environment and return iblvideo version. + + Attempts to import iblvideo directly. If unsuccessful, checks for necessary + scripts and environment, then retrieves version via subprocess. + + Returns: + tuple: (version: str, needs_subprocess: bool) + """ + try: + import iblvideo + version = iblvideo.__version__ + needs_subprocess = False + _logger.info(f'Current environment contains iblvideo version {self.version}') + except ImportError: + # Check that scripts are present, dlcenv can be activated and get iblvideo version + assert len(list(self.scripts.rglob('run_dlc.*'))) == 2, \ + f'Scripts run_dlc.sh and run_dlc.py do not exist in {self.scripts}' + assert len(list(self.scripts.rglob('run_motion.*'))) == 2, \ + f'Scripts run_motion.sh and run_motion.py do not exist in {self.scripts}' + assert self.dlcenv.exists(), f'DLC environment does not exist in assumed location {self.dlcenv}' + command2run = f"source {self.dlcenv}; python -c 'import iblvideo; print(iblvideo.__version__)'" + process = subprocess.Popen( + command2run, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + executable='/bin/bash' + ) + info, error = process.communicate() + if process.returncode != 0: + raise AssertionError(f"DLC environment check failed\n{error.decode('utf-8')}") + version = info.decode('utf-8').strip().split('\n')[-1] + needs_subprocess = True + return version, needs_subprocess @staticmethod def _video_intact(file_mp4): @@ -386,6 +403,75 @@ def _video_intact(file_mp4): cap.release() return intact + def _run_dlc(self, file_mp4, cam, overwrite, flag_subprocess=True): + try: + if flag_subprocess: + _logger.info(f'iblvideo version {self.version}') + command2run = f"{self.scripts.joinpath('run_dlc.sh')} {str(self.dlcenv)} {file_mp4} {overwrite}" + _logger.info(command2run) + process = subprocess.Popen( + command2run, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + executable='/bin/bash', + ) + info, error = process.communicate() + # info_str = info.decode("utf-8").strip() + # _logger.info(info_str) + if process.returncode != 0: + error_str = error.decode('utf-8').strip() + _logger.error(f'DLC failed for {cam}Camera.\n\n' + f'++++++++ Output of subprocess for debugging ++++++++\n\n' + f'{error_str}\n' + f'++++++++++++++++++++++++++++++++++++++++++++\n') + return process.returncode + pass + else: + from iblvideo import download_weights + from iblvideo.pose_dlc import dlc + path_dlc = download_weights() + dlc_result, _ = dlc(file_mp4, path_dlc=path_dlc, force=overwrite) + return 0 + except Exception as e: + _logger.error(f'An error occurred while running DLC for {cam}Camera: {e}') + _logger.error(traceback.format_exc()) + return -1 + + def _run_motion_energy(self, file_mp4, dlc_result, flag_subprocess=True): + if flag_subprocess: + command2run = f"{self.scripts.joinpath('run_motion.sh')} {str(self.dlcenv)} {file_mp4} {dlc_result}" + _logger.info(command2run) + process = subprocess.Popen( + command2run, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + executable='/bin/bash', + ) + info, error = process.communicate() + # info_str = info.decode('utf-8').strip() + # _logger.info(info_str) + if process.returncode != 0: + error_str = error.decode('utf-8').strip() + _logger.error(f'Motion energy failed for {file_mp4}.\n\n' + f'++++++++ Output of subprocess for debugging ++++++++\n\n' + f'{error_str}\n' + f'++++++++++++++++++++++++++++++++++++++++++++\n') + return_code = process.returncode + else: # runs the motion energy calculation in the current environment + try: + from iblvideo.motion_energy import motion_energy + _ = motion_energy(file_mp4, dlc_result) + return_code = 0 + except Exception: + _logger.error(f'Motion energy failed for {file_mp4}.\n\n' + f'++++++++ Output of subprocess for debugging ++++++++\n\n' + f'{traceback.format_exc()}\n' + f'++++++++++++++++++++++++++++++++++++++++++++\n') + return_code = -1 + return return_code + def _run(self, cams=None, overwrite=False): # Check that the cams are valid for DLC, remove the ones that aren't candidate_cams = cams or self.cameras @@ -419,55 +505,24 @@ def _run(self, cams=None, overwrite=False): _logger.error(f'Corrupt raw video file {file_mp4}') self.status = -1 continue + # Check that dlc environment is ok, shell scripts exists, and get iblvideo version, GPU addressable - self.version = self._check_dlcenv() - _logger.info(f'iblvideo version {self.version}') check_nvidia_driver() + self.version, flag_subprocess = self._check_dlcenv() + # Step 1: Run DLC for this camera _logger.info(f'Running DLC on {cam}Camera.') - command2run = f"{self.scripts.joinpath('run_dlc.sh')} {str(self.dlcenv)} {file_mp4} {overwrite}" - _logger.info(command2run) - process = subprocess.Popen( - command2run, - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - executable='/bin/bash', - ) - info, error = process.communicate() - # info_str = info.decode("utf-8").strip() - # _logger.info(info_str) - if process.returncode != 0: - error_str = error.decode('utf-8').strip() - _logger.error(f'DLC failed for {cam}Camera.\n\n' - f'++++++++ Output of subprocess for debugging ++++++++\n\n' - f'{error_str}\n' - f'++++++++++++++++++++++++++++++++++++++++++++\n') + return_code = self._run_dlc(file_mp4, cam, overwrite, flag_subprocess=flag_subprocess) + if return_code != 0: self.status = -1 - # We dont' run motion energy, or add any files if dlc failed to run continue dlc_result = next(self.session_path.joinpath('alf').glob(f'_ibl_{cam}Camera.dlc*.pqt')) actual_outputs.append(dlc_result) + # Step 2: Compute Motion Energy for this camera _logger.info(f'Computing motion energy for {cam}Camera') - command2run = f"{self.scripts.joinpath('run_motion.sh')} {str(self.dlcenv)} {file_mp4} {dlc_result}" - _logger.info(command2run) - process = subprocess.Popen( - command2run, - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - executable='/bin/bash', - ) - info, error = process.communicate() - # info_str = info.decode('utf-8').strip() - # _logger.info(info_str) - if process.returncode != 0: - error_str = error.decode('utf-8').strip() - _logger.error(f'Motion energy failed for {cam}Camera.\n\n' - f'++++++++ Output of subprocess for debugging ++++++++\n\n' - f'{error_str}\n' - f'++++++++++++++++++++++++++++++++++++++++++++\n') + return_code = self._run_motion_energy(file_mp4, dlc_result, flag_subprocess=flag_subprocess) + if return_code != 0: self.status = -1 continue actual_outputs.append(next(self.session_path.joinpath('alf').glob( diff --git a/requirements.txt b/requirements.txt index b673ee203..adb5c1a1f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,35 +1,37 @@ +# ibl libraries +ONE-api>=3.0.0 boto3 click>=7.0.0 colorlog>=4.0.2 flake8>=3.7.8 globus-sdk graphviz +imagecodecs # used to convert tif snapshots to png when registering mesoscope snapshots (also requires skimage) matplotlib>=3.0.3 -numba>=0.56 -numpy>=1.18,<=2.2 # numpy 2.3 is not compatible with numba - ETA end of June 2025 nptdms +numba>=0.56 +numpy>=1.18 opencv-python-headless pandas pyarrow pynrrd>=0.4.0 +pyqt5 pytest requests>=2.22.0 +scikit-image # this is a widefield requirement missing as of July 2023, we may remove it once wfield has this figured out scikit-learn>=0.22.1 scipy>=1.7.0 -scikit-image # this is a widefield requirement missing as of July 2023, we may remove it once wfield has this figured out -imagecodecs # used to convert tif snapshots to png when registering mesoscope snapshots (also requires skimage) -sparse seaborn>=0.9.0 +sparse tqdm>=4.32.1 # ibl libraries +ONE-api>=3.2.0 +ibl-neuropixel>=1.9.0 +ibl-style iblatlas>=0.5.3 -ibl-neuropixel>=1.6.2 -iblutil>=1.13.0 iblqt>=0.8.2 +iblutil>=1.13.0 mtscomp>=1.0.1 -ONE-api>=3.2.0 -phylib>=2.6.0 +phylib>=2.6.3 psychofit slidingRP>=1.1.1 # steinmetz lab refractory period metrics -pyqt5 -ibl-style