From 5d58e47d8d4530a2c8d0cbae14e2fcc97d2603b5 Mon Sep 17 00:00:00 2001 From: Miles Wells Date: Mon, 27 Oct 2025 15:55:32 +0200 Subject: [PATCH 1/3] Transfer files directly to S3 with Globus Patcher --- ibllib/oneibl/patcher.py | 63 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 60 insertions(+), 3 deletions(-) diff --git a/ibllib/oneibl/patcher.py b/ibllib/oneibl/patcher.py index 514563636..21ce34e64 100644 --- a/ibllib/oneibl/patcher.py +++ b/ibllib/oneibl/patcher.py @@ -25,6 +25,7 @@ from pathlib import Path, PurePosixPath, WindowsPath from collections import defaultdict from itertools import starmap +from functools import lru_cache from subprocess import Popen, PIPE, STDOUT import subprocess import logging @@ -32,6 +33,7 @@ import shutil import globus_sdk +import boto3 import iblutil.io.params as iopar from iblutil.util import ensure_list from one.alf.path import get_session_path, add_uuid_string, full_path_parts @@ -40,7 +42,7 @@ from one.webclient import AlyxClient from one.converters import path_from_dataset from one.remote import globus -from one.remote.aws import url2uri, get_s3_from_alyx +from one.remote.aws import url2uri, get_s3_from_alyx, get_aws_access_keys from ibllib.oneibl.registration import register_dataset @@ -96,6 +98,29 @@ def globus_path_from_dataset(dset, repository=None, uuid=False): return path_from_dataset(dset, root_path=PurePosixPath('/'), repository=repository, uuid=uuid) +@lru_cache(maxsize=32) +def get_s3_bucket_from_alyx(alyx, repository_name): + """ + Retrieves the S3 bucket and region associated with a given Alyx repository. + + Parameters + ---------- + alyx : AlyxClient + An instance of the Alyx client. + repository_name : str + The name of the repository to look up. + + Returns + ------- + str + The S3 bucket name. + str + The S3 region name. + """ + session_keys, bucket_name = get_aws_access_keys(alyx, repository_name) + return bucket_name, session_keys['region_name'] + + class Patcher(abc.ABC): def __init__(self, one=None): assert one @@ -248,6 +273,7 @@ def __init__(self, client_name='default', one=None, label='ibllib patch'): # transfers/delete from flatiron to optional third parties to synchronize / delete self.globus_transfers_locals = {} self.globus_deletes_locals = {} + self.aws_transfers = {} # dictionary region_name -> bucket_name -> list of (src, dst) super().__init__(one=one) def _scp(self, local_path, remote_path, dry=True): @@ -284,7 +310,7 @@ def patch_datasets(self, file_list, **kwargs): :return: """ responses = super().patch_datasets(file_list, **kwargs) - for dset in responses: + for dset, file in (responses, file_list): # get the flatiron path fr = next(fr for fr in dset['file_records'] if 'flatiron' in fr['data_repository']) relative_path = add_uuid_string(fr['relative_path'], dset['id']).as_posix() @@ -294,6 +320,13 @@ def patch_datasets(self, file_list, **kwargs): for fr in dset['file_records']: if fr['data_repository'] == DMZ_REPOSITORY: continue + if fr['data_repository'].startswith('aws_'): + bucket_name, region_name = get_s3_bucket_from_alyx(self.one.alyx, fr['data_repository']) + if region_name not in self.aws_transfers: + self.aws_transfers[region_name] = {} + if bucket_name not in self.aws_transfers[region_name]: + self.aws_transfers[region_name][bucket_name] = [] + self.aws_transfers[region_name][bucket_name].append((str(file), relative_path)) if fr['data_repository'] not in self.endpoints: continue repo_gid = self.endpoints[fr['data_repository']]['id'] @@ -313,7 +346,7 @@ def patch_datasets(self, file_list, **kwargs): def launch_transfers(self, local_servers=False): """ patcher.launch_transfers() - Launches the globus transfer and delete from the local patch computer to the flat-rion + Launches the globus transfer and delete from the local patch computer to the flatiron :param: local_servers (False): if True, sync the local servers after the main transfer :return: None """ @@ -377,6 +410,30 @@ def launch_transfers_secondary(self): if len(transfer['DATA']) > 0: self.client.submit_delete(delete) + def launch_aws_transfers(self, aws_profile='ibladmin', dry=False): + """Launches the AWS transfers from local path to AWS S3 bucket. + + Parameters + ---------- + aws_profile : str + The AWS profile name to use. + dry : bool + If true, the transfer is not actually executed. + + """ + session = boto3.Session(profile_name=aws_profile) + for region, buckets in self.aws_transfers.items(): + s3_client = session.client('s3', region_name=region) + for bucket, files in buckets.items(): + try: + for source_file, destination_key in files: + if not dry: + s3_client.upload_file(source_file, bucket, destination_key) + _logger.info(f'Copied {source_file} to s3://{bucket}/{destination_key}') + except Exception as e: + _logger.error(f'Failed to copy {source_file} to S3: {e}') + raise + class IBLGlobusPatcher(Patcher, globus.Globus): """This is a replacement for the GlobusPatcher class, utilizing the ONE Globus class. From 4d7efb78a70c48488c223f96600e8b33fcbd2733 Mon Sep 17 00:00:00 2001 From: oliche Date: Tue, 4 Nov 2025 13:14:42 +0000 Subject: [PATCH 2/3] bugfix --- ibllib/oneibl/patcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ibllib/oneibl/patcher.py b/ibllib/oneibl/patcher.py index 21ce34e64..61f02bf90 100644 --- a/ibllib/oneibl/patcher.py +++ b/ibllib/oneibl/patcher.py @@ -310,7 +310,7 @@ def patch_datasets(self, file_list, **kwargs): :return: """ responses = super().patch_datasets(file_list, **kwargs) - for dset, file in (responses, file_list): + for dset, file in zip(responses, file_list): # get the flatiron path fr = next(fr for fr in dset['file_records'] if 'flatiron' in fr['data_repository']) relative_path = add_uuid_string(fr['relative_path'], dset['id']).as_posix() From 6f8d532eb2df59373dfd4155f0b88eab6081024f Mon Sep 17 00:00:00 2001 From: oliche Date: Tue, 4 Nov 2025 13:49:00 +0000 Subject: [PATCH 3/3] correct aws destination key --- ibllib/oneibl/patcher.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ibllib/oneibl/patcher.py b/ibllib/oneibl/patcher.py index 61f02bf90..3291ca4c9 100644 --- a/ibllib/oneibl/patcher.py +++ b/ibllib/oneibl/patcher.py @@ -315,6 +315,7 @@ def patch_datasets(self, file_list, **kwargs): fr = next(fr for fr in dset['file_records'] if 'flatiron' in fr['data_repository']) relative_path = add_uuid_string(fr['relative_path'], dset['id']).as_posix() flatiron_path = self.to_address(relative_path, fr['data_repository']) + aws_destination_key = 'data' + flatiron_path # loop over the remaining repositories (local servers) and create a transfer # from flatiron to the local server for fr in dset['file_records']: @@ -326,7 +327,7 @@ def patch_datasets(self, file_list, **kwargs): self.aws_transfers[region_name] = {} if bucket_name not in self.aws_transfers[region_name]: self.aws_transfers[region_name][bucket_name] = [] - self.aws_transfers[region_name][bucket_name].append((str(file), relative_path)) + self.aws_transfers[region_name][bucket_name].append((str(file), aws_destination_key)) if fr['data_repository'] not in self.endpoints: continue repo_gid = self.endpoints[fr['data_repository']]['id']