Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 61 additions & 3 deletions ibllib/oneibl/patcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@
from pathlib import Path, PurePosixPath, WindowsPath
from collections import defaultdict
from itertools import starmap
from functools import lru_cache
from subprocess import Popen, PIPE, STDOUT
import subprocess
import logging
from getpass import getpass
import shutil

import globus_sdk
import boto3
import iblutil.io.params as iopar
from iblutil.util import ensure_list
from one.alf.path import get_session_path, add_uuid_string, full_path_parts
Expand All @@ -40,7 +42,7 @@
from one.webclient import AlyxClient
from one.converters import path_from_dataset
from one.remote import globus
from one.remote.aws import url2uri, get_s3_from_alyx
from one.remote.aws import url2uri, get_s3_from_alyx, get_aws_access_keys

from ibllib.oneibl.registration import register_dataset

Expand Down Expand Up @@ -96,6 +98,29 @@ def globus_path_from_dataset(dset, repository=None, uuid=False):
return path_from_dataset(dset, root_path=PurePosixPath('/'), repository=repository, uuid=uuid)


@lru_cache(maxsize=32)
def get_s3_bucket_from_alyx(alyx, repository_name):
"""
Retrieves the S3 bucket and region associated with a given Alyx repository.

Parameters
----------
alyx : AlyxClient
An instance of the Alyx client.
repository_name : str
The name of the repository to look up.

Returns
-------
str
The S3 bucket name.
str
The S3 region name.
"""
session_keys, bucket_name = get_aws_access_keys(alyx, repository_name)
return bucket_name, session_keys['region_name']


class Patcher(abc.ABC):
def __init__(self, one=None):
assert one
Expand Down Expand Up @@ -248,6 +273,7 @@ def __init__(self, client_name='default', one=None, label='ibllib patch'):
# transfers/delete from flatiron to optional third parties to synchronize / delete
self.globus_transfers_locals = {}
self.globus_deletes_locals = {}
self.aws_transfers = {} # dictionary region_name -> bucket_name -> list of (src, dst)
super().__init__(one=one)

def _scp(self, local_path, remote_path, dry=True):
Expand Down Expand Up @@ -284,16 +310,24 @@ def patch_datasets(self, file_list, **kwargs):
:return:
"""
responses = super().patch_datasets(file_list, **kwargs)
for dset in responses:
for dset, file in zip(responses, file_list):
# get the flatiron path
fr = next(fr for fr in dset['file_records'] if 'flatiron' in fr['data_repository'])
relative_path = add_uuid_string(fr['relative_path'], dset['id']).as_posix()
flatiron_path = self.to_address(relative_path, fr['data_repository'])
aws_destination_key = 'data' + flatiron_path
# loop over the remaining repositories (local servers) and create a transfer
# from flatiron to the local server
for fr in dset['file_records']:
if fr['data_repository'] == DMZ_REPOSITORY:
continue
if fr['data_repository'].startswith('aws_'):
bucket_name, region_name = get_s3_bucket_from_alyx(self.one.alyx, fr['data_repository'])
if region_name not in self.aws_transfers:
self.aws_transfers[region_name] = {}
if bucket_name not in self.aws_transfers[region_name]:
self.aws_transfers[region_name][bucket_name] = []
self.aws_transfers[region_name][bucket_name].append((str(file), aws_destination_key))
if fr['data_repository'] not in self.endpoints:
continue
repo_gid = self.endpoints[fr['data_repository']]['id']
Expand All @@ -313,7 +347,7 @@ def patch_datasets(self, file_list, **kwargs):
def launch_transfers(self, local_servers=False):
"""
patcher.launch_transfers()
Launches the globus transfer and delete from the local patch computer to the flat-rion
Launches the globus transfer and delete from the local patch computer to the flatiron
:param: local_servers (False): if True, sync the local servers after the main transfer
:return: None
"""
Expand Down Expand Up @@ -377,6 +411,30 @@ def launch_transfers_secondary(self):
if len(transfer['DATA']) > 0:
self.client.submit_delete(delete)

def launch_aws_transfers(self, aws_profile='ibladmin', dry=False):
"""Launches the AWS transfers from local path to AWS S3 bucket.

Parameters
----------
aws_profile : str
The AWS profile name to use.
dry : bool
If true, the transfer is not actually executed.

"""
session = boto3.Session(profile_name=aws_profile)
for region, buckets in self.aws_transfers.items():
s3_client = session.client('s3', region_name=region)
for bucket, files in buckets.items():
try:
for source_file, destination_key in files:
if not dry:
s3_client.upload_file(source_file, bucket, destination_key)
_logger.info(f'Copied {source_file} to s3://{bucket}/{destination_key}')
except Exception as e:
_logger.error(f'Failed to copy {source_file} to S3: {e}')
raise


class IBLGlobusPatcher(Patcher, globus.Globus):
"""This is a replacement for the GlobusPatcher class, utilizing the ONE Globus class.
Expand Down
Loading