From 089ae2ffac7e0a28e28aa2bf361dd04a6a36514d Mon Sep 17 00:00:00 2001 From: Georg Raiser Date: Tue, 15 Jul 2025 11:34:39 +0100 Subject: [PATCH 1/4] bugfix for job creator when root_path is a session path (as indicated possible by docstrin), no flag files were found --- ibllib/pipes/local_server.py | 45 +++++++++++++++++++----------------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/ibllib/pipes/local_server.py b/ibllib/pipes/local_server.py index 92f1cf39a..78acede85 100644 --- a/ibllib/pipes/local_server.py +++ b/ibllib/pipes/local_server.py @@ -3,6 +3,7 @@ This is the module called by the job services on the lab servers. See iblscripts/deploy/serverpc/crons for the service scripts that employ this module. """ + import logging import time from datetime import datetime @@ -29,14 +30,11 @@ from ibllib.pipes.dynamic_pipeline import make_pipeline, acquisition_description_legacy_session _logger = logging.getLogger(__name__) -LARGE_TASKS = [ - 'EphysVideoCompress', 'TrainingVideoCompress', 'SpikeSorting', 'EphysDLC', 'MesoscopePreprocess' -] +LARGE_TASKS = ['EphysVideoCompress', 'TrainingVideoCompress', 'SpikeSorting', 'EphysDLC', 'MesoscopePreprocess'] def _run_command(cmd): - process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) info, error = process.communicate() if process.returncode != 0: return None @@ -49,22 +47,26 @@ def _get_volume_usage(vol, label=''): res = _run_command(cmd) # size_list = ['/dev/sdc1', '1921802500', '1427128132', '494657984', '75%', '/datadisk'] size_list = re.split(' +', res.split('\n')[-1]) - fac = 1024 ** 2 - d = {'total': int(size_list[1]) / fac, - 'used': int(size_list[2]) / fac, - 'available': int(size_list[3]) / fac, - 'volume': size_list[5]} - return {f"{label}_{k}": d[k] for k in d} + fac = 1024**2 + d = { + 'total': int(size_list[1]) / fac, + 'used': int(size_list[2]) / fac, + 'available': int(size_list[3]) / fac, + 'volume': size_list[5], + } + return {f'{label}_{k}': d[k] for k in d} def report_health(alyx): """ Get a few indicators and label the json field of the corresponding lab with them. """ - status = {'python_version': sys.version, - 'ibllib_version': ibllib_version, - 'phylib_version': importlib.metadata.version('phylib'), - 'local_time': date2isostr(datetime.now())} + status = { + 'python_version': sys.version, + 'ibllib_version': ibllib_version, + 'phylib_version': importlib.metadata.version('phylib'), + 'local_time': date2isostr(datetime.now()), + } status.update(_get_volume_usage('/mnt/s0/Data', 'raid')) status.update(_get_volume_usage('/', 'system')) @@ -106,7 +108,7 @@ def job_creator(root_path, one=None, dry=False, rerun=False): if not one: one = ONE(cache_rest=None) rc = IBLRegistrationClient(one=one) - flag_files = Path(root_path).glob('*/????-??-??/*/raw_session.flag') + flag_files = Path(root_path).glob('raw_session.flag') flag_files = filter(lambda x: is_session_path(x.parent), flag_files) pipes = [] all_datasets = [] @@ -162,6 +164,7 @@ def task_queue(mode='all', lab=None, alyx=None, env=(None,)): list of dict A list of Alyx tasks associated with `lab` that have a 'Waiting' status. """ + def predicate(task): classe = tasks.str2class(task['executable']) return (mode == 'all' or classe.job_size == mode) and classe.env in env @@ -175,8 +178,9 @@ def predicate(task): return # if the lab is none, this will return empty tasks each time data_repo = get_local_data_repository(alyx) # Filter for tasks - waiting_tasks = alyx.rest('tasks', 'list', status='Waiting', - django=f'session__lab__name__in,{lab},data_repository__name,{data_repo}', no_cache=True) + waiting_tasks = alyx.rest( + 'tasks', 'list', status='Waiting', django=f'session__lab__name__in,{lab},data_repository__name,{data_repo}', no_cache=True + ) # Filter tasks by size filtered_tasks = filter(predicate, waiting_tasks) # Order tasks by priority @@ -225,9 +229,8 @@ def tasks_runner(subjects_path, tasks_dict, one=None, dry=False, count=5, time_o # reconstruct the session local path. As many jobs belong to the same session # cache the result if last_session != tdict['session']: - ses = one.alyx.rest('sessions', 'list', django=f"pk,{tdict['session']}")[0] - session_path = Path(subjects_path).joinpath( - Path(ses['subject'], ses['start_time'][:10], str(ses['number']).zfill(3))) + ses = one.alyx.rest('sessions', 'list', django=f'pk,{tdict["session"]}')[0] + session_path = Path(subjects_path).joinpath(Path(ses['subject'], ses['start_time'][:10], str(ses['number']).zfill(3))) last_session = tdict['session'] if dry: print(session_path, tdict['name']) From c7ca93e254096fdc7954d6c04a27f2a9aff48fac Mon Sep 17 00:00:00 2001 From: Georg Raiser Date: Tue, 15 Jul 2025 11:36:02 +0100 Subject: [PATCH 2/4] unruff --- ibllib/pipes/local_server.py | 43 +++++++++++++++++------------------- 1 file changed, 20 insertions(+), 23 deletions(-) diff --git a/ibllib/pipes/local_server.py b/ibllib/pipes/local_server.py index 78acede85..85d60da6d 100644 --- a/ibllib/pipes/local_server.py +++ b/ibllib/pipes/local_server.py @@ -3,7 +3,6 @@ This is the module called by the job services on the lab servers. See iblscripts/deploy/serverpc/crons for the service scripts that employ this module. """ - import logging import time from datetime import datetime @@ -30,11 +29,14 @@ from ibllib.pipes.dynamic_pipeline import make_pipeline, acquisition_description_legacy_session _logger = logging.getLogger(__name__) -LARGE_TASKS = ['EphysVideoCompress', 'TrainingVideoCompress', 'SpikeSorting', 'EphysDLC', 'MesoscopePreprocess'] +LARGE_TASKS = [ + 'EphysVideoCompress', 'TrainingVideoCompress', 'SpikeSorting', 'EphysDLC', 'MesoscopePreprocess' +] def _run_command(cmd): - process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) info, error = process.communicate() if process.returncode != 0: return None @@ -47,26 +49,22 @@ def _get_volume_usage(vol, label=''): res = _run_command(cmd) # size_list = ['/dev/sdc1', '1921802500', '1427128132', '494657984', '75%', '/datadisk'] size_list = re.split(' +', res.split('\n')[-1]) - fac = 1024**2 - d = { - 'total': int(size_list[1]) / fac, - 'used': int(size_list[2]) / fac, - 'available': int(size_list[3]) / fac, - 'volume': size_list[5], - } - return {f'{label}_{k}': d[k] for k in d} + fac = 1024 ** 2 + d = {'total': int(size_list[1]) / fac, + 'used': int(size_list[2]) / fac, + 'available': int(size_list[3]) / fac, + 'volume': size_list[5]} + return {f"{label}_{k}": d[k] for k in d} def report_health(alyx): """ Get a few indicators and label the json field of the corresponding lab with them. """ - status = { - 'python_version': sys.version, - 'ibllib_version': ibllib_version, - 'phylib_version': importlib.metadata.version('phylib'), - 'local_time': date2isostr(datetime.now()), - } + status = {'python_version': sys.version, + 'ibllib_version': ibllib_version, + 'phylib_version': importlib.metadata.version('phylib'), + 'local_time': date2isostr(datetime.now())} status.update(_get_volume_usage('/mnt/s0/Data', 'raid')) status.update(_get_volume_usage('/', 'system')) @@ -164,7 +162,6 @@ def task_queue(mode='all', lab=None, alyx=None, env=(None,)): list of dict A list of Alyx tasks associated with `lab` that have a 'Waiting' status. """ - def predicate(task): classe = tasks.str2class(task['executable']) return (mode == 'all' or classe.job_size == mode) and classe.env in env @@ -178,9 +175,8 @@ def predicate(task): return # if the lab is none, this will return empty tasks each time data_repo = get_local_data_repository(alyx) # Filter for tasks - waiting_tasks = alyx.rest( - 'tasks', 'list', status='Waiting', django=f'session__lab__name__in,{lab},data_repository__name,{data_repo}', no_cache=True - ) + waiting_tasks = alyx.rest('tasks', 'list', status='Waiting', + django=f'session__lab__name__in,{lab},data_repository__name,{data_repo}', no_cache=True) # Filter tasks by size filtered_tasks = filter(predicate, waiting_tasks) # Order tasks by priority @@ -229,8 +225,9 @@ def tasks_runner(subjects_path, tasks_dict, one=None, dry=False, count=5, time_o # reconstruct the session local path. As many jobs belong to the same session # cache the result if last_session != tdict['session']: - ses = one.alyx.rest('sessions', 'list', django=f'pk,{tdict["session"]}')[0] - session_path = Path(subjects_path).joinpath(Path(ses['subject'], ses['start_time'][:10], str(ses['number']).zfill(3))) + ses = one.alyx.rest('sessions', 'list', django=f"pk,{tdict['session']}")[0] + session_path = Path(subjects_path).joinpath( + Path(ses['subject'], ses['start_time'][:10], str(ses['number']).zfill(3))) last_session = tdict['session'] if dry: print(session_path, tdict['name']) From 8a6302cee7a4a3a94b68c71764a576e5236e2931 Mon Sep 17 00:00:00 2001 From: Georg Raiser Date: Tue, 26 Aug 2025 14:39:17 +0100 Subject: [PATCH 3/4] added validation by regexp after blind globbing --- ibllib/pipes/local_server.py | 46 ++++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/ibllib/pipes/local_server.py b/ibllib/pipes/local_server.py index 85d60da6d..29a583c9e 100644 --- a/ibllib/pipes/local_server.py +++ b/ibllib/pipes/local_server.py @@ -3,6 +3,7 @@ This is the module called by the job services on the lab servers. See iblscripts/deploy/serverpc/crons for the service scripts that employ this module. """ + import logging import time from datetime import datetime @@ -29,14 +30,11 @@ from ibllib.pipes.dynamic_pipeline import make_pipeline, acquisition_description_legacy_session _logger = logging.getLogger(__name__) -LARGE_TASKS = [ - 'EphysVideoCompress', 'TrainingVideoCompress', 'SpikeSorting', 'EphysDLC', 'MesoscopePreprocess' -] +LARGE_TASKS = ['EphysVideoCompress', 'TrainingVideoCompress', 'SpikeSorting', 'EphysDLC', 'MesoscopePreprocess'] def _run_command(cmd): - process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) info, error = process.communicate() if process.returncode != 0: return None @@ -49,22 +47,26 @@ def _get_volume_usage(vol, label=''): res = _run_command(cmd) # size_list = ['/dev/sdc1', '1921802500', '1427128132', '494657984', '75%', '/datadisk'] size_list = re.split(' +', res.split('\n')[-1]) - fac = 1024 ** 2 - d = {'total': int(size_list[1]) / fac, - 'used': int(size_list[2]) / fac, - 'available': int(size_list[3]) / fac, - 'volume': size_list[5]} - return {f"{label}_{k}": d[k] for k in d} + fac = 1024**2 + d = { + 'total': int(size_list[1]) / fac, + 'used': int(size_list[2]) / fac, + 'available': int(size_list[3]) / fac, + 'volume': size_list[5], + } + return {f'{label}_{k}': d[k] for k in d} def report_health(alyx): """ Get a few indicators and label the json field of the corresponding lab with them. """ - status = {'python_version': sys.version, - 'ibllib_version': ibllib_version, - 'phylib_version': importlib.metadata.version('phylib'), - 'local_time': date2isostr(datetime.now())} + status = { + 'python_version': sys.version, + 'ibllib_version': ibllib_version, + 'phylib_version': importlib.metadata.version('phylib'), + 'local_time': date2isostr(datetime.now()), + } status.update(_get_volume_usage('/mnt/s0/Data', 'raid')) status.update(_get_volume_usage('/', 'system')) @@ -106,7 +108,8 @@ def job_creator(root_path, one=None, dry=False, rerun=False): if not one: one = ONE(cache_rest=None) rc = IBLRegistrationClient(one=one) - flag_files = Path(root_path).glob('raw_session.flag') + flag_files = Path(root_path).glob('**/raw_session.flag') + flag_files = [file for file in flag_files if re.search(r'/\d{4}-\d{2}-\d{2}/\d{3}/raw_session\.flag$', str(file))] flag_files = filter(lambda x: is_session_path(x.parent), flag_files) pipes = [] all_datasets = [] @@ -162,6 +165,7 @@ def task_queue(mode='all', lab=None, alyx=None, env=(None,)): list of dict A list of Alyx tasks associated with `lab` that have a 'Waiting' status. """ + def predicate(task): classe = tasks.str2class(task['executable']) return (mode == 'all' or classe.job_size == mode) and classe.env in env @@ -175,8 +179,9 @@ def predicate(task): return # if the lab is none, this will return empty tasks each time data_repo = get_local_data_repository(alyx) # Filter for tasks - waiting_tasks = alyx.rest('tasks', 'list', status='Waiting', - django=f'session__lab__name__in,{lab},data_repository__name,{data_repo}', no_cache=True) + waiting_tasks = alyx.rest( + 'tasks', 'list', status='Waiting', django=f'session__lab__name__in,{lab},data_repository__name,{data_repo}', no_cache=True + ) # Filter tasks by size filtered_tasks = filter(predicate, waiting_tasks) # Order tasks by priority @@ -225,9 +230,8 @@ def tasks_runner(subjects_path, tasks_dict, one=None, dry=False, count=5, time_o # reconstruct the session local path. As many jobs belong to the same session # cache the result if last_session != tdict['session']: - ses = one.alyx.rest('sessions', 'list', django=f"pk,{tdict['session']}")[0] - session_path = Path(subjects_path).joinpath( - Path(ses['subject'], ses['start_time'][:10], str(ses['number']).zfill(3))) + ses = one.alyx.rest('sessions', 'list', django=f'pk,{tdict["session"]}')[0] + session_path = Path(subjects_path).joinpath(Path(ses['subject'], ses['start_time'][:10], str(ses['number']).zfill(3))) last_session = tdict['session'] if dry: print(session_path, tdict['name']) From f964a948e54ec258073ecb27861d0e00b804d19a Mon Sep 17 00:00:00 2001 From: Miles Wells Date: Mon, 1 Sep 2025 16:25:11 +0300 Subject: [PATCH 4/4] Added test and updated docstring --- ibllib/pipes/local_server.py | 61 +++++++++++++++++------------------- ibllib/tests/test_pipes.py | 18 +++++++++++ 2 files changed, 47 insertions(+), 32 deletions(-) diff --git a/ibllib/pipes/local_server.py b/ibllib/pipes/local_server.py index 29a583c9e..b692fa42b 100644 --- a/ibllib/pipes/local_server.py +++ b/ibllib/pipes/local_server.py @@ -3,7 +3,6 @@ This is the module called by the job services on the lab servers. See iblscripts/deploy/serverpc/crons for the service scripts that employ this module. """ - import logging import time from datetime import datetime @@ -18,8 +17,7 @@ from one.api import ONE from one.webclient import AlyxClient from one.remote.globus import get_lab_from_endpoint_id, get_local_endpoint_id -from one.alf.spec import is_session_path -from one.alf.path import session_path_parts +from one.alf.path import ALFPath from ibllib import __version__ as ibllib_version from ibllib.pipes import tasks @@ -30,11 +28,14 @@ from ibllib.pipes.dynamic_pipeline import make_pipeline, acquisition_description_legacy_session _logger = logging.getLogger(__name__) -LARGE_TASKS = ['EphysVideoCompress', 'TrainingVideoCompress', 'SpikeSorting', 'EphysDLC', 'MesoscopePreprocess'] +LARGE_TASKS = [ + 'EphysVideoCompress', 'TrainingVideoCompress', 'SpikeSorting', 'EphysDLC', 'MesoscopePreprocess' +] def _run_command(cmd): - process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) info, error = process.communicate() if process.returncode != 0: return None @@ -47,26 +48,22 @@ def _get_volume_usage(vol, label=''): res = _run_command(cmd) # size_list = ['/dev/sdc1', '1921802500', '1427128132', '494657984', '75%', '/datadisk'] size_list = re.split(' +', res.split('\n')[-1]) - fac = 1024**2 - d = { - 'total': int(size_list[1]) / fac, - 'used': int(size_list[2]) / fac, - 'available': int(size_list[3]) / fac, - 'volume': size_list[5], - } - return {f'{label}_{k}': d[k] for k in d} + fac = 1024 ** 2 + d = {'total': int(size_list[1]) / fac, + 'used': int(size_list[2]) / fac, + 'available': int(size_list[3]) / fac, + 'volume': size_list[5]} + return {f"{label}_{k}": d[k] for k in d} def report_health(alyx): """ Get a few indicators and label the json field of the corresponding lab with them. """ - status = { - 'python_version': sys.version, - 'ibllib_version': ibllib_version, - 'phylib_version': importlib.metadata.version('phylib'), - 'local_time': date2isostr(datetime.now()), - } + status = {'python_version': sys.version, + 'ibllib_version': ibllib_version, + 'phylib_version': importlib.metadata.version('phylib'), + 'local_time': date2isostr(datetime.now())} status.update(_get_volume_usage('/mnt/s0/Data', 'raid')) status.update(_get_volume_usage('/', 'system')) @@ -84,12 +81,11 @@ def job_creator(root_path, one=None, dry=False, rerun=False): 1) create the session on Alyx 2) create the tasks to be run on Alyx - For legacy sessions the raw data are registered separately, instead of within a pipeline task. - Parameters ---------- root_path : str, pathlib.Path - Main path containing sessions or a session path. + Main path containing sessions or a session path. NB: If a session path is passed, + a raw_session.flag file needn't be present. one : one.api.OneAlyx An ONE instance for registering the session(s). dry : bool @@ -108,14 +104,16 @@ def job_creator(root_path, one=None, dry=False, rerun=False): if not one: one = ONE(cache_rest=None) rc = IBLRegistrationClient(one=one) - flag_files = Path(root_path).glob('**/raw_session.flag') - flag_files = [file for file in flag_files if re.search(r'/\d{4}-\d{2}-\d{2}/\d{3}/raw_session\.flag$', str(file))] - flag_files = filter(lambda x: is_session_path(x.parent), flag_files) + if (root_path := ALFPath(root_path)).is_session_path(): + flag_files = [root_path.joinpath('raw_session.flag')] + else: + flag_files = root_path.glob('*/????-??-??/*/raw_session.flag') + flag_files = filter(lambda f: f.parent.is_session_path(), flag_files) pipes = [] all_datasets = [] for flag_file in flag_files: session_path = flag_file.parent - if session_path_parts(session_path)[1] in ('test', 'test_subject'): + if session_path.subject in ('test', 'test_subject'): _logger.debug('skipping test session %s', session_path) continue _logger.info(f'creating session for {session_path}') @@ -165,7 +163,6 @@ def task_queue(mode='all', lab=None, alyx=None, env=(None,)): list of dict A list of Alyx tasks associated with `lab` that have a 'Waiting' status. """ - def predicate(task): classe = tasks.str2class(task['executable']) return (mode == 'all' or classe.job_size == mode) and classe.env in env @@ -179,9 +176,8 @@ def predicate(task): return # if the lab is none, this will return empty tasks each time data_repo = get_local_data_repository(alyx) # Filter for tasks - waiting_tasks = alyx.rest( - 'tasks', 'list', status='Waiting', django=f'session__lab__name__in,{lab},data_repository__name,{data_repo}', no_cache=True - ) + waiting_tasks = alyx.rest('tasks', 'list', status='Waiting', + django=f'session__lab__name__in,{lab},data_repository__name,{data_repo}', no_cache=True) # Filter tasks by size filtered_tasks = filter(predicate, waiting_tasks) # Order tasks by priority @@ -230,8 +226,9 @@ def tasks_runner(subjects_path, tasks_dict, one=None, dry=False, count=5, time_o # reconstruct the session local path. As many jobs belong to the same session # cache the result if last_session != tdict['session']: - ses = one.alyx.rest('sessions', 'list', django=f'pk,{tdict["session"]}')[0] - session_path = Path(subjects_path).joinpath(Path(ses['subject'], ses['start_time'][:10], str(ses['number']).zfill(3))) + ses = one.alyx.rest('sessions', 'list', django=f"pk,{tdict['session']}")[0] + session_path = Path(subjects_path).joinpath( + Path(ses['subject'], ses['start_time'][:10], str(ses['number']).zfill(3))) last_session = tdict['session'] if dry: print(session_path, tdict['name']) diff --git a/ibllib/tests/test_pipes.py b/ibllib/tests/test_pipes.py index 6f35d164d..de42fa9be 100644 --- a/ibllib/tests/test_pipes.py +++ b/ibllib/tests/test_pipes.py @@ -60,6 +60,24 @@ def test_task_queue(self, lab_repo_mock): queue = local_server.task_queue(mode='small', lab='foolab', alyx=alyx) self.assertEqual([tasks[2]], queue) + def test_job_creator(self): + """Test ibllib.pipes.local_server.job_creator function. + + This test simply checks that a specific session path can be passed and that a raw_session.flag + file is not necessary. For a full test of the job creator, see ci.tests.iblscripts.test_report_create_jobs + in iblscripts. + """ + session_path = self.tmpdir / 'foo' / '2020-01-01' / '001' + assert not session_path.joinpath('raw_session.flag').exists() + with self.assertLogs('ibllib.pipes.local_server', level='INFO') as log: + local_server.job_creator(session_path, dry=True) + self.assertIn('creating session for', log.output[-1]) + # Check skip when test subject + session_path = self.tmpdir / 'test' / '2020-01-01' / '001' + with self.assertLogs('ibllib.pipes.local_server', level='DEBUG') as log: + local_server.job_creator(session_path, dry=True) + self.assertIn('skipping test session', log.output[-1]) + class TestPipesMisc(unittest.TestCase): """"""