From 5ca8417e9ce4dbf474bd4d09d4ab04fcc0555346 Mon Sep 17 00:00:00 2001 From: LuisSanchez25 Date: Thu, 26 Sep 2024 14:36:58 -0500 Subject: [PATCH 01/24] add S3 functionality to strax --- strax/__init__.py | 1 + strax/storage/s3.py | 525 ++++++++++++++++++++++++++++++++++++++++++++ strax/utils.py | 42 ++++ 3 files changed, 568 insertions(+) create mode 100644 strax/storage/s3.py diff --git a/strax/__init__.py b/strax/__init__.py index e356b0ced..fa87a8f61 100644 --- a/strax/__init__.py +++ b/strax/__init__.py @@ -14,6 +14,7 @@ from .storage.file_rechunker import * from .storage.mongo import * from .storage.zipfiles import * +from .storage.s3 import * from .config import * from .plugins import * diff --git a/strax/storage/s3.py b/strax/storage/s3.py new file mode 100644 index 000000000..cf06eea8b --- /dev/null +++ b/strax/storage/s3.py @@ -0,0 +1,525 @@ +import glob +import json +import os +import configparser +import os.path as osp +from typing import Optional +from bson import json_util +import shutil +import boto3 +from botocore.exceptions import ClientError +from botocore.client import Config + +import strax +from .common import StorageFrontend + +export, __all__ = strax.exporter() + +RUN_METADATA_PATTERN = "%s-metadata.json" +BUCKET_NAME = 'mlrice' + + +@export +class S3Frontend(StorageFrontend): + """Simplest registry: single directory with FileStore data + sitting in subdirectories. + + Run-level metadata is stored in loose json files in the directory. + """ + + can_define_runs = True + provide_run_metadata = False + provide_superruns = True + BUCKET = "mlrice" + + def __init__(self, + s3_access_key_id: str=None, + s3_secret_access_key: str=None, + endpoint_url=None, + path="", + deep_scan=False, + *args, + **kwargs): + """ + :param path: Path to folder with data subfolders. + :param deep_scan: Let scan_runs scan over folders, + so even data for which no run-level metadata is available + is reported. + + For other arguments, see DataRegistry base class. + """ + super().__init__(*args, **kwargs) + self.path = path + self.deep_scan = deep_scan + + # Might need to reimplement this at some later time + #if not self.readonly and not osp.exists(self.path): + # os.makedirs(self.path) + + self.config_path = os.getenv("XENON_CONFIG") + if self.config_path is None: + raise EnvironmentError("XENON_CONFIG file not found") + else: + self.config = configparser.ConfigParser() + + s3_access_key_id = self._get_config_value(s3_access_key_id, + "s3_access_key_id") + s3_secret_access_key = self._get_config_value(s3_secret_access_key, + "s3_secret_access_key") + endpoint_url = self._get_config_value(endpoint_url, + "endpoint_url") + + self.boto3_client_kwargs = { + 'aws_access_key_id': s3_access_key_id, + 'aws_secret_access_key': s3_secret_access_key, + 'endpoint_url': endpoint_url, + 'service_name': 's3', + 'config': Config(connect_timeout=5, + retries={'max_attempts': 10})} + + # Initialized connection to S3-protocol storage + self.s3 = boto3.client(**self.boto3_client_kwargs) + + self.backends = [S3Backend(**self.boto3_client_kwargs)] + + def _run_meta_path(self, run_id): + return osp.join(self.path, RUN_METADATA_PATTERN % run_id) + + def run_metadata(self, run_id, projection=None): + path = self._run_meta_path(run_id) + # Changed ops. to self.s3 implementation + if self.s3.list_objects_v2(Bucket=self.BUCKET, + Prefix=path)['KeyCount'] == 0: + raise strax.RunMetadataNotAvailable( + f"No file at {path}, cannot find run metadata for {run_id}" + ) + response = self.s3.get_object(Bucket=self.BUCKET, Key=path) + metadata_content = response['Body'].read().decode('utf-8') + md = json.loads(metadata_content, object_hook=json_util.object_hook) + #with open(path, mode="r") as f: + # md = json.loads(f.read(), object_hook=json_util.object_hook) + md = strax.flatten_run_metadata(md) + if projection is not None: + md = {k: v for k, v in md.items() if k in projection} + return md + + def write_run_metadata(self, run_id, metadata): + #response = self.s3.get_object(Bucket=self.BUCKET, Key=self._run_meta_path(run_id)) + #metadata_content = response['Body'].read().decode('utf-8') + if "name" not in metadata: + metadata["name"] = run_id + + self.s3.put_object(Bucket=self.BUCKET, + Key=self._run_meta_path(run_id), + Body=json.dumps(metadata, + sort_keys=True, + indent=4, + default=json_util.default)) + #with open(self._run_meta_path(run_id), mode="w") as f: + # if "name" not in metadata: + # metadata["name"] = run_id + # f.write(json.dumps(metadata, sort_keys=True, indent=4, default=json_util.default)) + + def _scan_runs(self, store_fields): + """Iterable of run document dictionaries. + + These should be directly convertable to a pandas DataFrame. + + """ + found = set() + + # Yield metadata for runs for which we actually have it + for md_path in sorted( + self.s3.list_objects_v2(Bucket = self.BUCKET, + Prefix = osp.join(self.path, + RUN_METADATA_PATTERN.replace("%s", "*"))) + ): + # Parse the run metadata filename pattern. + # (different from the folder pattern) + run_id = osp.basename(md_path).split("-")[0] + found.add(run_id) + yield self.run_metadata(run_id, projection=store_fields) + + if self.deep_scan: + # Yield runs for which no metadata exists + # we'll make "metadata" that consist only of the run name + for fn in self._subfolders(): + run_id = self._parse_folder_name(fn)[0] + if run_id not in found: + found.add(run_id) + yield dict(name=run_id) + + def _find(self, key, write, allow_incomplete, fuzzy_for, fuzzy_for_options): + self.raise_if_non_compatible_run_id(key.run_id) + dirname = osp.join(self.path, str(key)) + exists = self.s3_object_exists(self.BUCKET, dirname) + bk = self.backend_key(dirname) + + if write: + if exists and not self._can_overwrite(key): + raise strax.DataExistsError(at=dirname) + return bk + + if allow_incomplete and not exists: + # Check for incomplete data (only exact matching for now) + if fuzzy_for or fuzzy_for_options: + raise NotImplementedError( + "Mixing of fuzzy matching and allow_incomplete not supported by DataDirectory." + ) + tempdirname = dirname + "_temp" + bk = self.backend_key(tempdirname) + if self.s3.list_objects_v2(Bucket=self.BUCKET, + Prefix=tempdirname)['KeyCount'] >= 0: + return bk + + # Check exact match + if exists and self._folder_matches(dirname, key, None, None): + return bk + + # Check metadata of all potentially matching data dirs for + # matches. This only makes sense for fuzzy searches since + # otherwise we should have had an exact match already. (Also + # really slows down st.select runs otherwise because we doing an + # entire search over all the files in self._subfolders for all + # non-available keys). + if fuzzy_for or fuzzy_for_options: + for fn in self._subfolders(): + if self._folder_matches(fn, key, fuzzy_for, fuzzy_for_options): + return self.backend_key(fn) + + raise strax.DataNotAvailable + + def s3_object_exists(self, bucket_name, key): + try: + self.s3.head_object(Bucket=bucket_name, Key=key) + return True # Object exists + except ClientError as e: + # If a 404 error is returned, the object does not exist + if e.response['Error']['Code'] == '404': + return False + else: + # For any other error, you can choose to raise the exception or handle it + raise e + + def _subfolders(self): + """Loop over subfolders of self.path that match our folder format.""" + # Trigger if statement if path doesnt exist + if self.s3.list_objects_v2(Bucket=self.BUCKET, + Prefix=self.path)['KeyCount'] == 0: + return + for dirname in os.listdir(self.path): + try: + self._parse_folder_name(dirname) + except InvalidFolderNameFormat: + continue + yield osp.join(self.path, dirname) + + @staticmethod + def _parse_folder_name(fn): + """Return (run_id, data_type, hash) if folder name matches DataDirectory convention, raise + InvalidFolderNameFormat otherwise.""" + stuff = osp.normpath(fn).split(os.sep)[-1].split("-") + if len(stuff) != 3: + # This is not a folder with strax data + raise InvalidFolderNameFormat(fn) + return stuff + + def _folder_matches(self, fn, key, fuzzy_for, fuzzy_for_options, ignore_name=False): + """Return the run_id of folder fn if it matches key, or False if it does not. + + :param name: Ignore the run name part of the key. Useful for listing availability. + + """ + # Parse the folder name + try: + _run_id, _data_type, _hash = self._parse_folder_name(fn) + except InvalidFolderNameFormat: + return False + + # Check exact match + if _data_type != key.data_type: + return False + if not ignore_name and _run_id != key._run_id: + return False + + # Check fuzzy match + if not (fuzzy_for or fuzzy_for_options): + if _hash == key.lineage_hash: + return _run_id + return False + metadata = self.backends[0].get_metadata(fn) + if self._matches(metadata["lineage"], key.lineage, fuzzy_for, fuzzy_for_options): + return _run_id + return False + + def backend_key(self, dirname): + return self.backends[0].__class__.__name__, dirname + + def remove(self, key): + # There is no database, so removing the folder from the filesystem + # (which FileStore should do) is sufficient. + pass + + @staticmethod + def raise_if_non_compatible_run_id(run_id): + if "-" in str(run_id): + raise ValueError( + "The filesystem frontend does not understand" + " run_id's with '-', please replace with '_'" + ) + + +@export +def dirname_to_prefix(dirname): + """Return filename prefix from dirname.""" + dirname = dirname.replace("_temp", "") + return os.path.basename(dirname.strip("/").rstrip("\\")).split("-", maxsplit=1)[1] + + +@export +class S3Backend(strax.StorageBackend): + """Store data locally in a directory of binary files. + + Files are named after the chunk number (without extension). Metadata is stored in a file called + metadata.json. + + """ + + BUCKET = 'mlrice' + + def __init__( + self, + *args, + set_target_chunk_mb: Optional[int] = None, + **kwargs, + ): + """Add set_chunk_size_mb to strax.StorageBackend to allow changing the chunk.target_size_mb + returned from the loader, any args or kwargs are passed to the strax.StorageBackend. + + :param set_target_chunk_mb: Prior to returning the loaders' chunks, return the chunk with an + updated target size + + """ + super().__init__() + self.s3 = boto3.client(**kwargs) + self.set_chunk_size_mb = set_target_chunk_mb + + def _get_metadata(self, dirname): + prefix = dirname_to_prefix(dirname) + metadata_json = f"{prefix}-metadata.json" + md_path = osp.join(dirname, metadata_json) + + if self.s3.list_objects_v2(Bucket=self.BUCKET, + Prefix=md_path)['KeyCount'] == 0: + # Try to see if we are so fast that there exists a temp folder + # with the metadata we need. + md_path = osp.join(dirname + "_temp", metadata_json) + + if self.s3.list_objects_v2(Bucket = self.BUCKET, + Prefix=md_path)['KeyCount'] == 0: + # Try old-format metadata + # (if it's not there, just let it raise FileNotFound + # with the usual message in the next stage) + old_md_path = osp.join(dirname, "metadata.json") + if self.s3.list_objects_v2(Bucket=self.BUCKET, + Prefix=old_md_path)['KeyCount'] == 0: + raise strax.DataCorrupted(f"Data in {dirname} has no metadata") + md_path = old_md_path + + + response = self.s3.get_object(Bucket=self.BUCKET, Key=md_path) + metadata_content = response['Body'].read().decode('utf-8') + return json.loads(metadata_content) + #with open(md_path, mode="r") as f: + # return json.loads(f.read()) + + def _read_and_format_chunk(self, *args, **kwargs): + chunk = super()._read_and_format_chunk(*args, **kwargs) + if self.set_chunk_size_mb: + chunk.target_size_mb = self.set_chunk_size_mb + return chunk + + def _read_chunk(self, dirname, chunk_info, dtype, compressor): + fn = osp.join(dirname, chunk_info["filename"]) + return strax.load_file(fn, dtype=dtype, compressor=compressor) + + def _saver(self, dirname, metadata, **kwargs): + # Test if the parent directory is writeable. + # We need abspath since the dir itself may not exist, + # even though its parent-to-be does + parent_dir = os.path.abspath(os.path.join(dirname, os.pardir)) # This might need some work + + # In case the parent dir also doesn't exist, we have to create is + # otherwise the write permission check below will certainly fail + # I dont think this is needed for S3 so we can delete it + #try: + # os.makedirs(parent_dir, exist_ok=True) + #except OSError as e: + # raise strax.DataNotAvailable( + # f"Can't write data to {dirname}, " + # f"{parent_dir} does not exist and we could not create it." + # f"Original error: {e}" + # ) + + # Finally, check if we have permission to create the new subdirectory + # (which the Saver will do) + # Also dont think its needed + #if not os.access(parent_dir, os.W_OK): + # raise strax.DataNotAvailable( + # f"Can't write data to {dirname}, no write permissions in {parent_dir}." + # ) + + return S3Saver(dirname, metadata=metadata, **kwargs) + + +@export +class S3Saver(strax.Saver): + """Saves data to compressed binary files.""" + + json_options = dict(sort_keys=True, indent=4) + # When writing chunks, rewrite the json file every time we write a chunk + _flush_md_for_every_chunk = True + + def __init__(self, dirname, metadata, **kwargs): + super().__init__(metadata = metadata) + self.dirname = dirname + self.tempdirname = dirname + "_temp" + self.prefix = dirname_to_prefix(dirname) + self.metadata_json = f"{self.prefix}-metadata.json" + self.s3 = boto3.client(**kwargs) + self.bucket_name = "mlrice" + + self.config = boto3.s3.transfer.TransferConfig( + max_concurrency=40, + num_download_attempts=30) + + if self.s3.list_objects_v2(Bucket=BUCKET_NAME, + Prefix=dirname)['KeyCount'] == 1: + print(f"Removing data in {dirname} to overwrite") + shutil.rmtree(dirname) + if self.s3.list_objects_v2(Bucket=BUCKET_NAME, + Prefix=self.tempdirname)['KeyCount'] == 0: + print(f"Removing old incomplete data in {self.tempdirname}") + shutil.rmtree(self.tempdirname) + #os.makedirs(self.tempdirname) + self._flush_metadata() + + def _flush_metadata(self): + # Convert the metadata dictionary to a JSON string + metadata_content = json.dumps(self.md, **self.json_options) + + # Define the S3 key for the metadata file (similar to a file path in a traditional file system) + metadata_key = f"{self.tempdirname}/{self.metadata_json}" + + # Upload the metadata to S3 + self.s3.put_object(Bucket=self.bucket_name, Key=metadata_key, Body=metadata_content) + + def _chunk_filename(self, chunk_info): + if "filename" in chunk_info: + return chunk_info["filename"] + ichunk = "%06d" % chunk_info["chunk_i"] + return f"{self.prefix}-{ichunk}" + + def _save_chunk(self, data, chunk_info, executor=None): + filename = self._chunk_filename(chunk_info) + + fn = os.path.join(self.tempdirname, filename) + kwargs = dict(data=data, compressor=self.md["compressor"]) + if executor is None: + filesize = strax.save_file(fn, **kwargs) + fn.seek(0) + # Giving just the filename wont work, needs to be the full path + self.s3.upload_fileobj(fn, + BUCKET_NAME, + filename, + Config=self.config,) + return dict(filename=filename, filesize=filesize), None + else: + # Might need to add some s3 stuff here + return dict(filename=filename), executor.submit(strax.save_file, fn, **kwargs) + + def _save_chunk_metadata(self, chunk_info): + is_first = chunk_info["chunk_i"] == 0 + if is_first: + self.md["start"] = chunk_info["start"] + + if self.is_forked: + # Do not write to the main metadata file to avoid race conditions + # Instead, write a separate metadata.json file for this chunk, + # to be collected later. + + # We might not have a filename yet: + # the chunk is not saved when it is empty + filename = self._chunk_filename(chunk_info) + + fn = f"{self.tempdirname}/metadata_{filename}.json" + metadata_content = json.dump(chunk_info, **self.json_options) + self.s3.put_object(Bucket=self.bucket_name, Key=fn, Body=metadata_content) + #with open(fn, mode="w") as f: + # f.write(json.dumps(chunk_info, **self.json_options)) + + # To ensure we have some metadata to load with allow_incomplete, + # modify the metadata immediately for the first chunk. + # If we are forked, modifying self.md is harmless since + # we're in a different process. + + if not self.is_forked or is_first: + # Just append and flush the metadata + # (maybe not super-efficient to write the json every time... + # just don't use thousands of chunks) + self.md["chunks"].append(chunk_info) + if self._flush_md_for_every_chunk: + self._flush_metadata() + + def _close(self): + # Check if temp directory exists in the S3 bucket by listing objects with the tempdirname prefix + try: + response = self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=self.tempdirname) + if 'Contents' not in response or len(response['Contents']) == 0: + raise RuntimeError( + f"{self.tempdirname} was already renamed to {self.dirname}. " + "Did you attempt to run two savers pointing to the same " + "directory? Otherwise, this could be a strange race " + "condition or bug." + ) + + # List the files in the temporary directory matching metadata_*.json + response = self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=f"{self.tempdirname}/metadata_") + for obj in response.get('Contents', []): + key = obj['Key'] + # Download each metadata file, process, and delete from tempdirname + metadata_object = self.s3.get_object(Bucket=self.bucket_name, Key=key) + metadata_content = metadata_object['Body'].read().decode('utf-8') + self.md["chunks"].append(json.loads(metadata_content)) + + # Optionally, delete the metadata file after processing + self.s3.delete_object(Bucket=self.bucket_name, Key=key) + + # Flush metadata (this would be another method to handle your metadata saving logic) + self._flush_metadata() + + # Rename the directory by copying all files from tempdirname to dirname and deleting from tempdirname + self._rename_s3_folder(self.tempdirname, self.dirname) + + except ClientError as e: + print(f"Error occurred: {e}") + raise + + def _rename_s3_folder(self, tempdirname, dirname): + # List the files in the temporary directory + response = self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=tempdirname) + for obj in response.get('Contents', []): + key = obj['Key'] + # Copy each file from the temporary directory to the final directory + new_key = key.replace(tempdirname, dirname) + self.s3.copy_object(Bucket=self.bucket_name, CopySource={'Bucket': self.bucket_name, 'Key': key}, Key=new_key) + # Delete the file from the temporary directory + self.s3.delete_object(Bucket=self.bucket_name, Key=key) + + # Delete the temporary directory + self.s3.delete_object(Bucket=self.bucket_name, Key=tempdirname) + + +@export +class InvalidFolderNameFormat(Exception): + pass \ No newline at end of file diff --git a/strax/utils.py b/strax/utils.py index 3f3ea41b1..beb32bf09 100644 --- a/strax/utils.py +++ b/strax/utils.py @@ -804,3 +804,45 @@ def convert_tuple_to_list(init_func_input): else: # if not a container, return. i.e. int, float, bytes, str etc. return func_input + + +@export +def stx_file_parser(path: str): + """ + Strax assumes 2 main structures for directories it generates + and files, as such we will want to understand these file + """ + + chunk_i = None + run_id = None + #is_json = False + + assert type(path) is str + + try: + temp1, temp2, temp3 = re.split(r'-', path) + + if temp1.isdigit(): + file_data = {"run_id" : temp1, + "dtype" : temp2, + "lineage" : temp3, + "is_json": False} + # Covers chunks + else: + #dtype, lineage, x = re.split(r'-', path) + if temp3 == "metadata.json": + file_data = {"run_id" : temp1, + "dtype" : temp2, + "lineage" : temp3, + "is_json": True} + else: + file_data = {"chunk_i" : temp3, + "dtype" : temp1, + "lineage" : temp2, + "is_json": False} + + except ValueError: + print("An invalid string was given to this function") + + return file_data + \ No newline at end of file From d1cbeeb1d277ec18e98f9f95b9ec15aca9773b34 Mon Sep 17 00:00:00 2001 From: LuisSanchez25 Date: Thu, 26 Sep 2024 15:00:13 -0500 Subject: [PATCH 02/24] add _get_config_values --- strax/storage/s3.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/strax/storage/s3.py b/strax/storage/s3.py index cf06eea8b..14a19f021 100644 --- a/strax/storage/s3.py +++ b/strax/storage/s3.py @@ -188,6 +188,14 @@ def _find(self, key, write, allow_incomplete, fuzzy_for, fuzzy_for_options): return self.backend_key(fn) raise strax.DataNotAvailable + + def _get_config_value(self, variable, option_name): + if variable is None: + if "s3" not in self.config: + raise EnvironmentError("S3 access point not spesified") + if not self.config.has_option("s3", option_name): + raise EnvironmentError(f"S3 access point lacks a {option_name}") + return self.config.get('s3', 'aws_access_key_id') def s3_object_exists(self, bucket_name, key): try: From f66ff0a9f29823c09cdda2ea8691dc619f5c55be Mon Sep 17 00:00:00 2001 From: LuisSanchez25 Date: Mon, 30 Sep 2024 19:49:22 -0500 Subject: [PATCH 03/24] S3 code pases st.make test --- strax/io.py | 51 +++++++++++++++++++++++++++++---- strax/storage/s3.py | 66 ++++++++++++++++++++++++------------------- strax/utils.py | 4 --- tests/test_storage.py | 6 ++++ 4 files changed, 89 insertions(+), 38 deletions(-) diff --git a/strax/io.py b/strax/io.py index 1e73fa6a4..c26dee7ad 100644 --- a/strax/io.py +++ b/strax/io.py @@ -64,7 +64,7 @@ def _load_file(f, compressor, dtype): @export -def save_file(f, data, compressor="zstd"): +def save_file(f, data, compressor="zstd", is_s3_path = False): """Save data to file and return number of bytes written. :param f: file name or handle to save to @@ -72,13 +72,32 @@ def save_file(f, data, compressor="zstd"): :param compressor: compressor to use """ + if isinstance(f, str): final_fn = f temp_fn = f + "_temp" - with open(temp_fn, mode="wb") as write_file: - result = _save_file(write_file, data, compressor) - os.rename(temp_fn, final_fn) - return result + if is_s3_path is False: + with open(temp_fn, mode="wb") as write_file: + result = _save_file(write_file, data, compressor) + os.rename(temp_fn, final_fn) + return result + else: + s3_interface = strax.S3Frontend(s3_access_key_id=None, + s3_secret_access_key=None, + path="", + deep_scan=False, ) + # Copy temp file to final file + result = _save_file_to_s3(s3_interface, temp_fn, data, compressor) + s3_interface.s3.copy_object( + Bucket=s3_interface.BUCKET, + Key=final_fn, + CopySource={"Bucket": s3_interface.BUCKET, "Key": temp_fn} + ) + + # Delete the temporary file + s3_interface.s3.delete_object(Bucket=s3_interface.BUCKET, Key=temp_fn) + + return result else: return _save_file(f, data, compressor) @@ -90,6 +109,28 @@ def _save_file(f, data, compressor="zstd"): return len(d_comp) +def _save_file_to_s3(s3_client, key, data, compressor=None): + # Use this method to save file directly to S3 + # If compression is needed, handle it here + # Use `BytesIO` to handle binary data in-memory + assert isinstance(data, np.ndarray), "Please pass a numpy array" + + # Create a binary buffer to simulate writing to a file + buffer = BytesIO() + + # Simulate saving file content (you can compress or directly write data here) + if compressor: + data = COMPRESSORS[compressor]["compress"](data) + buffer.write(data) + buffer.seek(0) # Reset the buffer to the beginning + + # Upload buffer to S3 under the specified key + s3_client.s3.put_object(Bucket=s3_client.BUCKET, + Key=key, Body=buffer.getvalue()) + + return len(data) + + def _compress_blosc(data): if data.nbytes >= blosc.MAX_BUFFERSIZE: raise ValueError("Blosc's input buffer cannot exceed ~2 GB") diff --git a/strax/storage/s3.py b/strax/storage/s3.py index 14a19f021..937ecbe69 100644 --- a/strax/storage/s3.py +++ b/strax/storage/s3.py @@ -1,7 +1,6 @@ import glob import json import os -import configparser import os.path as osp from typing import Optional from bson import json_util @@ -9,6 +8,7 @@ import boto3 from botocore.exceptions import ClientError from botocore.client import Config +import configparser import strax from .common import StorageFrontend @@ -33,9 +33,9 @@ class S3Frontend(StorageFrontend): BUCKET = "mlrice" def __init__(self, - s3_access_key_id: str=None, - s3_secret_access_key: str=None, - endpoint_url=None, + s3_access_key_id=None, + s3_secret_access_key=None, + endpoint_url='https://rice1.osn.mghpcc.org/', path="", deep_scan=False, *args, @@ -61,11 +61,12 @@ def __init__(self, raise EnvironmentError("XENON_CONFIG file not found") else: self.config = configparser.ConfigParser() + self.config.read(self.config_path) s3_access_key_id = self._get_config_value(s3_access_key_id, - "s3_access_key_id") + "aws_access_key_id") s3_secret_access_key = self._get_config_value(s3_secret_access_key, - "s3_secret_access_key") + "aws_secret_access_key") endpoint_url = self._get_config_value(endpoint_url, "endpoint_url") @@ -188,26 +189,32 @@ def _find(self, key, write, allow_incomplete, fuzzy_for, fuzzy_for_options): return self.backend_key(fn) raise strax.DataNotAvailable - + def _get_config_value(self, variable, option_name): if variable is None: if "s3" not in self.config: raise EnvironmentError("S3 access point not spesified") if not self.config.has_option("s3", option_name): raise EnvironmentError(f"S3 access point lacks a {option_name}") - return self.config.get('s3', 'aws_access_key_id') + return self.config.get('s3', option_name) + + else: + return variable def s3_object_exists(self, bucket_name, key): try: - self.s3.head_object(Bucket=bucket_name, Key=key) - return True # Object exists + response = self.s3.list_objects_v2(Bucket=bucket_name, Prefix=key) + + # Check if any objects were returned, and if the key exactly matches + if 'Contents' in response: + for obj in response['Contents']: + if obj['Key'] == key: + return True # Object exists + + return False # Object does not exist except ClientError as e: - # If a 404 error is returned, the object does not exist - if e.response['Error']['Code'] == '404': - return False - else: - # For any other error, you can choose to raise the exception or handle it - raise e + # Handle any other error as needed + raise e def _subfolders(self): """Loop over subfolders of self.path that match our folder format.""" @@ -377,7 +384,7 @@ def _saver(self, dirname, metadata, **kwargs): # f"Can't write data to {dirname}, no write permissions in {parent_dir}." # ) - return S3Saver(dirname, metadata=metadata, **kwargs) + return S3Saver(dirname, self.s3, metadata=metadata, **kwargs) @export @@ -388,13 +395,14 @@ class S3Saver(strax.Saver): # When writing chunks, rewrite the json file every time we write a chunk _flush_md_for_every_chunk = True - def __init__(self, dirname, metadata, **kwargs): + def __init__(self, dirname, s3, metadata, **kwargs): super().__init__(metadata = metadata) self.dirname = dirname self.tempdirname = dirname + "_temp" self.prefix = dirname_to_prefix(dirname) self.metadata_json = f"{self.prefix}-metadata.json" - self.s3 = boto3.client(**kwargs) + + self.s3 = s3 self.bucket_name = "mlrice" self.config = boto3.s3.transfer.TransferConfig( @@ -404,11 +412,11 @@ def __init__(self, dirname, metadata, **kwargs): if self.s3.list_objects_v2(Bucket=BUCKET_NAME, Prefix=dirname)['KeyCount'] == 1: print(f"Removing data in {dirname} to overwrite") - shutil.rmtree(dirname) + self.s3.delete_object(Bucket=BUCKET_NAME, Key=dirname) if self.s3.list_objects_v2(Bucket=BUCKET_NAME, - Prefix=self.tempdirname)['KeyCount'] == 0: + Prefix=self.tempdirname)['KeyCount'] == 1: print(f"Removing old incomplete data in {self.tempdirname}") - shutil.rmtree(self.tempdirname) + self.s3.delete_object(Bucket=BUCKET_NAME, Key=dirname) #os.makedirs(self.tempdirname) self._flush_metadata() @@ -434,17 +442,17 @@ def _save_chunk(self, data, chunk_info, executor=None): fn = os.path.join(self.tempdirname, filename) kwargs = dict(data=data, compressor=self.md["compressor"]) if executor is None: - filesize = strax.save_file(fn, **kwargs) - fn.seek(0) + filesize = strax.save_file(fn, is_s3_path = True, **kwargs) + #fn.seek(0) # Giving just the filename wont work, needs to be the full path - self.s3.upload_fileobj(fn, - BUCKET_NAME, - filename, - Config=self.config,) + #self.s3.upload_fileobj(fn, + # BUCKET_NAME, + # filename, + # Config=self.config,) return dict(filename=filename, filesize=filesize), None else: # Might need to add some s3 stuff here - return dict(filename=filename), executor.submit(strax.save_file, fn, **kwargs) + return dict(filename=filename), executor.submit(strax.save_file, fn, is_s3_path = True, **kwargs) def _save_chunk_metadata(self, chunk_info): is_first = chunk_info["chunk_i"] == 0 diff --git a/strax/utils.py b/strax/utils.py index beb32bf09..7ad315057 100644 --- a/strax/utils.py +++ b/strax/utils.py @@ -813,10 +813,6 @@ def stx_file_parser(path: str): and files, as such we will want to understand these file """ - chunk_i = None - run_id = None - #is_json = False - assert type(path) is str try: diff --git a/tests/test_storage.py b/tests/test_storage.py index 1d1875121..cd00f2903 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -32,6 +32,12 @@ def test_write_data_dir(self): self.st.make(run_id, self.target) assert self.st.is_stored(run_id, self.target) + def test_write_data_s3(self): + self.st.storage = [strax.S3Frontend(self.path)] + run_id = "0" + self.st.make(run_id, self.target) + assert self.st.is_stored(run_id, self.target) + def test_complain_run_id(self): self.st.storage = [strax.DataDirectory(self.path)] run_id = "run-0" From fb4e2cabe36e7b471c982bbcc5318110f78335df Mon Sep 17 00:00:00 2001 From: LuisSanchez25 Date: Tue, 1 Oct 2024 23:51:14 -0500 Subject: [PATCH 04/24] add types (incomplete) --- strax/storage/s3.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/strax/storage/s3.py b/strax/storage/s3.py index 937ecbe69..a18b1a72e 100644 --- a/strax/storage/s3.py +++ b/strax/storage/s3.py @@ -33,11 +33,11 @@ class S3Frontend(StorageFrontend): BUCKET = "mlrice" def __init__(self, - s3_access_key_id=None, - s3_secret_access_key=None, - endpoint_url='https://rice1.osn.mghpcc.org/', - path="", - deep_scan=False, + s3_access_key_id: str = None, + s3_secret_access_key: str = None, + endpoint_url: str ='https://rice1.osn.mghpcc.org/', + path: str = "", + deep_scan: bool = False, *args, **kwargs): """ @@ -101,7 +101,11 @@ def run_metadata(self, run_id, projection=None): # md = json.loads(f.read(), object_hook=json_util.object_hook) md = strax.flatten_run_metadata(md) if projection is not None: - md = {k: v for k, v in md.items() if k in projection} + md = { + key: value + for key, value in md.items() + if key in projection + } return md def write_run_metadata(self, run_id, metadata): From c6c71ddebb98db4fcb9a95b7b3d6f54a1fc5b56e Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 2 Oct 2024 05:00:38 +0000 Subject: [PATCH 05/24] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- strax/io.py | 31 ++++---- strax/storage/s3.py | 176 +++++++++++++++++++++----------------------- strax/utils.py | 32 +++----- 3 files changed, 111 insertions(+), 128 deletions(-) diff --git a/strax/io.py b/strax/io.py index fb24e0457..c0c74a565 100644 --- a/strax/io.py +++ b/strax/io.py @@ -65,7 +65,7 @@ def _load_file(f, compressor, dtype): @export -def save_file(f, data, compressor="zstd", is_s3_path = False): +def save_file(f, data, compressor="zstd", is_s3_path=False): """Save data to file and return number of bytes written. :param f: file name or handle to save to @@ -73,7 +73,7 @@ def save_file(f, data, compressor="zstd", is_s3_path = False): :param compressor: compressor to use """ - + if isinstance(f, str): final_fn = f temp_fn = f + "_temp" @@ -83,20 +83,22 @@ def save_file(f, data, compressor="zstd", is_s3_path = False): os.rename(temp_fn, final_fn) return result else: - s3_interface = strax.S3Frontend(s3_access_key_id=None, - s3_secret_access_key=None, - path="", - deep_scan=False, ) + s3_interface = strax.S3Frontend( + s3_access_key_id=None, + s3_secret_access_key=None, + path="", + deep_scan=False, + ) # Copy temp file to final file result = _save_file_to_s3(s3_interface, temp_fn, data, compressor) s3_interface.s3.copy_object( - Bucket=s3_interface.BUCKET, - Key=final_fn, - CopySource={"Bucket": s3_interface.BUCKET, "Key": temp_fn} - ) - + Bucket=s3_interface.BUCKET, + Key=final_fn, + CopySource={"Bucket": s3_interface.BUCKET, "Key": temp_fn}, + ) + # Delete the temporary file - s3_interface.s3.delete_object(Bucket=s3_interface.BUCKET, Key=temp_fn) + s3_interface.s3.delete_object(Bucket=s3_interface.BUCKET, Key=temp_fn) return result else: @@ -126,10 +128,9 @@ def _save_file_to_s3(s3_client, key, data, compressor=None): buffer.seek(0) # Reset the buffer to the beginning # Upload buffer to S3 under the specified key - s3_client.s3.put_object(Bucket=s3_client.BUCKET, - Key=key, Body=buffer.getvalue()) + s3_client.s3.put_object(Bucket=s3_client.BUCKET, Key=key, Body=buffer.getvalue()) - return len(data) + return len(data) def _compress_blosc(data): diff --git a/strax/storage/s3.py b/strax/storage/s3.py index a18b1a72e..189744c0a 100644 --- a/strax/storage/s3.py +++ b/strax/storage/s3.py @@ -16,7 +16,7 @@ export, __all__ = strax.exporter() RUN_METADATA_PATTERN = "%s-metadata.json" -BUCKET_NAME = 'mlrice' +BUCKET_NAME = "mlrice" @export @@ -32,14 +32,16 @@ class S3Frontend(StorageFrontend): provide_superruns = True BUCKET = "mlrice" - def __init__(self, - s3_access_key_id: str = None, - s3_secret_access_key: str = None, - endpoint_url: str ='https://rice1.osn.mghpcc.org/', - path: str = "", - deep_scan: bool = False, - *args, - **kwargs): + def __init__( + self, + s3_access_key_id: str = None, + s3_secret_access_key: str = None, + endpoint_url: str = "https://rice1.osn.mghpcc.org/", + path: str = "", + deep_scan: bool = False, + *args, + **kwargs, + ): """ :param path: Path to folder with data subfolders. :param deep_scan: Let scan_runs scan over folders, @@ -53,7 +55,7 @@ def __init__(self, self.deep_scan = deep_scan # Might need to reimplement this at some later time - #if not self.readonly and not osp.exists(self.path): + # if not self.readonly and not osp.exists(self.path): # os.makedirs(self.path) self.config_path = os.getenv("XENON_CONFIG") @@ -63,20 +65,19 @@ def __init__(self, self.config = configparser.ConfigParser() self.config.read(self.config_path) - s3_access_key_id = self._get_config_value(s3_access_key_id, - "aws_access_key_id") - s3_secret_access_key = self._get_config_value(s3_secret_access_key, - "aws_secret_access_key") - endpoint_url = self._get_config_value(endpoint_url, - "endpoint_url") + s3_access_key_id = self._get_config_value(s3_access_key_id, "aws_access_key_id") + s3_secret_access_key = self._get_config_value( + s3_secret_access_key, "aws_secret_access_key" + ) + endpoint_url = self._get_config_value(endpoint_url, "endpoint_url") self.boto3_client_kwargs = { - 'aws_access_key_id': s3_access_key_id, - 'aws_secret_access_key': s3_secret_access_key, - 'endpoint_url': endpoint_url, - 'service_name': 's3', - 'config': Config(connect_timeout=5, - retries={'max_attempts': 10})} + "aws_access_key_id": s3_access_key_id, + "aws_secret_access_key": s3_secret_access_key, + "endpoint_url": endpoint_url, + "service_name": "s3", + "config": Config(connect_timeout=5, retries={"max_attempts": 10}), + } # Initialized connection to S3-protocol storage self.s3 = boto3.client(**self.boto3_client_kwargs) @@ -89,38 +90,32 @@ def _run_meta_path(self, run_id): def run_metadata(self, run_id, projection=None): path = self._run_meta_path(run_id) # Changed ops. to self.s3 implementation - if self.s3.list_objects_v2(Bucket=self.BUCKET, - Prefix=path)['KeyCount'] == 0: + if self.s3.list_objects_v2(Bucket=self.BUCKET, Prefix=path)["KeyCount"] == 0: raise strax.RunMetadataNotAvailable( f"No file at {path}, cannot find run metadata for {run_id}" ) response = self.s3.get_object(Bucket=self.BUCKET, Key=path) - metadata_content = response['Body'].read().decode('utf-8') + metadata_content = response["Body"].read().decode("utf-8") md = json.loads(metadata_content, object_hook=json_util.object_hook) - #with open(path, mode="r") as f: + # with open(path, mode="r") as f: # md = json.loads(f.read(), object_hook=json_util.object_hook) md = strax.flatten_run_metadata(md) if projection is not None: - md = { - key: value - for key, value in md.items() - if key in projection - } + md = {key: value for key, value in md.items() if key in projection} return md def write_run_metadata(self, run_id, metadata): - #response = self.s3.get_object(Bucket=self.BUCKET, Key=self._run_meta_path(run_id)) - #metadata_content = response['Body'].read().decode('utf-8') + # response = self.s3.get_object(Bucket=self.BUCKET, Key=self._run_meta_path(run_id)) + # metadata_content = response['Body'].read().decode('utf-8') if "name" not in metadata: metadata["name"] = run_id - - self.s3.put_object(Bucket=self.BUCKET, - Key=self._run_meta_path(run_id), - Body=json.dumps(metadata, - sort_keys=True, - indent=4, - default=json_util.default)) - #with open(self._run_meta_path(run_id), mode="w") as f: + + self.s3.put_object( + Bucket=self.BUCKET, + Key=self._run_meta_path(run_id), + Body=json.dumps(metadata, sort_keys=True, indent=4, default=json_util.default), + ) + # with open(self._run_meta_path(run_id), mode="w") as f: # if "name" not in metadata: # metadata["name"] = run_id # f.write(json.dumps(metadata, sort_keys=True, indent=4, default=json_util.default)) @@ -135,9 +130,10 @@ def _scan_runs(self, store_fields): # Yield metadata for runs for which we actually have it for md_path in sorted( - self.s3.list_objects_v2(Bucket = self.BUCKET, - Prefix = osp.join(self.path, - RUN_METADATA_PATTERN.replace("%s", "*"))) + self.s3.list_objects_v2( + Bucket=self.BUCKET, + Prefix=osp.join(self.path, RUN_METADATA_PATTERN.replace("%s", "*")), + ) ): # Parse the run metadata filename pattern. # (different from the folder pattern) @@ -173,8 +169,7 @@ def _find(self, key, write, allow_incomplete, fuzzy_for, fuzzy_for_options): ) tempdirname = dirname + "_temp" bk = self.backend_key(tempdirname) - if self.s3.list_objects_v2(Bucket=self.BUCKET, - Prefix=tempdirname)['KeyCount'] >= 0: + if self.s3.list_objects_v2(Bucket=self.BUCKET, Prefix=tempdirname)["KeyCount"] >= 0: return bk # Check exact match @@ -200,7 +195,7 @@ def _get_config_value(self, variable, option_name): raise EnvironmentError("S3 access point not spesified") if not self.config.has_option("s3", option_name): raise EnvironmentError(f"S3 access point lacks a {option_name}") - return self.config.get('s3', option_name) + return self.config.get("s3", option_name) else: return variable @@ -210,9 +205,9 @@ def s3_object_exists(self, bucket_name, key): response = self.s3.list_objects_v2(Bucket=bucket_name, Prefix=key) # Check if any objects were returned, and if the key exactly matches - if 'Contents' in response: - for obj in response['Contents']: - if obj['Key'] == key: + if "Contents" in response: + for obj in response["Contents"]: + if obj["Key"] == key: return True # Object exists return False # Object does not exist @@ -223,8 +218,7 @@ def s3_object_exists(self, bucket_name, key): def _subfolders(self): """Loop over subfolders of self.path that match our folder format.""" # Trigger if statement if path doesnt exist - if self.s3.list_objects_v2(Bucket=self.BUCKET, - Prefix=self.path)['KeyCount'] == 0: + if self.s3.list_objects_v2(Bucket=self.BUCKET, Prefix=self.path)["KeyCount"] == 0: return for dirname in os.listdir(self.path): try: @@ -304,7 +298,7 @@ class S3Backend(strax.StorageBackend): """ - BUCKET = 'mlrice' + BUCKET = "mlrice" def __init__( self, @@ -328,28 +322,24 @@ def _get_metadata(self, dirname): metadata_json = f"{prefix}-metadata.json" md_path = osp.join(dirname, metadata_json) - if self.s3.list_objects_v2(Bucket=self.BUCKET, - Prefix=md_path)['KeyCount'] == 0: + if self.s3.list_objects_v2(Bucket=self.BUCKET, Prefix=md_path)["KeyCount"] == 0: # Try to see if we are so fast that there exists a temp folder # with the metadata we need. md_path = osp.join(dirname + "_temp", metadata_json) - if self.s3.list_objects_v2(Bucket = self.BUCKET, - Prefix=md_path)['KeyCount'] == 0: + if self.s3.list_objects_v2(Bucket=self.BUCKET, Prefix=md_path)["KeyCount"] == 0: # Try old-format metadata # (if it's not there, just let it raise FileNotFound # with the usual message in the next stage) old_md_path = osp.join(dirname, "metadata.json") - if self.s3.list_objects_v2(Bucket=self.BUCKET, - Prefix=old_md_path)['KeyCount'] == 0: + if self.s3.list_objects_v2(Bucket=self.BUCKET, Prefix=old_md_path)["KeyCount"] == 0: raise strax.DataCorrupted(f"Data in {dirname} has no metadata") md_path = old_md_path - response = self.s3.get_object(Bucket=self.BUCKET, Key=md_path) - metadata_content = response['Body'].read().decode('utf-8') + metadata_content = response["Body"].read().decode("utf-8") return json.loads(metadata_content) - #with open(md_path, mode="r") as f: + # with open(md_path, mode="r") as f: # return json.loads(f.read()) def _read_and_format_chunk(self, *args, **kwargs): @@ -366,14 +356,14 @@ def _saver(self, dirname, metadata, **kwargs): # Test if the parent directory is writeable. # We need abspath since the dir itself may not exist, # even though its parent-to-be does - parent_dir = os.path.abspath(os.path.join(dirname, os.pardir)) # This might need some work + parent_dir = os.path.abspath(os.path.join(dirname, os.pardir)) # This might need some work # In case the parent dir also doesn't exist, we have to create is # otherwise the write permission check below will certainly fail # I dont think this is needed for S3 so we can delete it - #try: + # try: # os.makedirs(parent_dir, exist_ok=True) - #except OSError as e: + # except OSError as e: # raise strax.DataNotAvailable( # f"Can't write data to {dirname}, " # f"{parent_dir} does not exist and we could not create it." @@ -383,7 +373,7 @@ def _saver(self, dirname, metadata, **kwargs): # Finally, check if we have permission to create the new subdirectory # (which the Saver will do) # Also dont think its needed - #if not os.access(parent_dir, os.W_OK): + # if not os.access(parent_dir, os.W_OK): # raise strax.DataNotAvailable( # f"Can't write data to {dirname}, no write permissions in {parent_dir}." # ) @@ -400,7 +390,7 @@ class S3Saver(strax.Saver): _flush_md_for_every_chunk = True def __init__(self, dirname, s3, metadata, **kwargs): - super().__init__(metadata = metadata) + super().__init__(metadata=metadata) self.dirname = dirname self.tempdirname = dirname + "_temp" self.prefix = dirname_to_prefix(dirname) @@ -409,28 +399,24 @@ def __init__(self, dirname, s3, metadata, **kwargs): self.s3 = s3 self.bucket_name = "mlrice" - self.config = boto3.s3.transfer.TransferConfig( - max_concurrency=40, - num_download_attempts=30) + self.config = boto3.s3.transfer.TransferConfig(max_concurrency=40, num_download_attempts=30) - if self.s3.list_objects_v2(Bucket=BUCKET_NAME, - Prefix=dirname)['KeyCount'] == 1: + if self.s3.list_objects_v2(Bucket=BUCKET_NAME, Prefix=dirname)["KeyCount"] == 1: print(f"Removing data in {dirname} to overwrite") self.s3.delete_object(Bucket=BUCKET_NAME, Key=dirname) - if self.s3.list_objects_v2(Bucket=BUCKET_NAME, - Prefix=self.tempdirname)['KeyCount'] == 1: + if self.s3.list_objects_v2(Bucket=BUCKET_NAME, Prefix=self.tempdirname)["KeyCount"] == 1: print(f"Removing old incomplete data in {self.tempdirname}") self.s3.delete_object(Bucket=BUCKET_NAME, Key=dirname) - #os.makedirs(self.tempdirname) + # os.makedirs(self.tempdirname) self._flush_metadata() def _flush_metadata(self): # Convert the metadata dictionary to a JSON string metadata_content = json.dumps(self.md, **self.json_options) - + # Define the S3 key for the metadata file (similar to a file path in a traditional file system) metadata_key = f"{self.tempdirname}/{self.metadata_json}" - + # Upload the metadata to S3 self.s3.put_object(Bucket=self.bucket_name, Key=metadata_key, Body=metadata_content) @@ -446,17 +432,19 @@ def _save_chunk(self, data, chunk_info, executor=None): fn = os.path.join(self.tempdirname, filename) kwargs = dict(data=data, compressor=self.md["compressor"]) if executor is None: - filesize = strax.save_file(fn, is_s3_path = True, **kwargs) - #fn.seek(0) + filesize = strax.save_file(fn, is_s3_path=True, **kwargs) + # fn.seek(0) # Giving just the filename wont work, needs to be the full path - #self.s3.upload_fileobj(fn, + # self.s3.upload_fileobj(fn, # BUCKET_NAME, # filename, # Config=self.config,) return dict(filename=filename, filesize=filesize), None else: # Might need to add some s3 stuff here - return dict(filename=filename), executor.submit(strax.save_file, fn, is_s3_path = True, **kwargs) + return dict(filename=filename), executor.submit( + strax.save_file, fn, is_s3_path=True, **kwargs + ) def _save_chunk_metadata(self, chunk_info): is_first = chunk_info["chunk_i"] == 0 @@ -475,7 +463,7 @@ def _save_chunk_metadata(self, chunk_info): fn = f"{self.tempdirname}/metadata_{filename}.json" metadata_content = json.dump(chunk_info, **self.json_options) self.s3.put_object(Bucket=self.bucket_name, Key=fn, Body=metadata_content) - #with open(fn, mode="w") as f: + # with open(fn, mode="w") as f: # f.write(json.dumps(chunk_info, **self.json_options)) # To ensure we have some metadata to load with allow_incomplete, @@ -495,7 +483,7 @@ def _close(self): # Check if temp directory exists in the S3 bucket by listing objects with the tempdirname prefix try: response = self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=self.tempdirname) - if 'Contents' not in response or len(response['Contents']) == 0: + if "Contents" not in response or len(response["Contents"]) == 0: raise RuntimeError( f"{self.tempdirname} was already renamed to {self.dirname}. " "Did you attempt to run two savers pointing to the same " @@ -504,12 +492,14 @@ def _close(self): ) # List the files in the temporary directory matching metadata_*.json - response = self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=f"{self.tempdirname}/metadata_") - for obj in response.get('Contents', []): - key = obj['Key'] + response = self.s3.list_objects_v2( + Bucket=self.bucket_name, Prefix=f"{self.tempdirname}/metadata_" + ) + for obj in response.get("Contents", []): + key = obj["Key"] # Download each metadata file, process, and delete from tempdirname metadata_object = self.s3.get_object(Bucket=self.bucket_name, Key=key) - metadata_content = metadata_object['Body'].read().decode('utf-8') + metadata_content = metadata_object["Body"].read().decode("utf-8") self.md["chunks"].append(json.loads(metadata_content)) # Optionally, delete the metadata file after processing @@ -528,11 +518,15 @@ def _close(self): def _rename_s3_folder(self, tempdirname, dirname): # List the files in the temporary directory response = self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=tempdirname) - for obj in response.get('Contents', []): - key = obj['Key'] + for obj in response.get("Contents", []): + key = obj["Key"] # Copy each file from the temporary directory to the final directory new_key = key.replace(tempdirname, dirname) - self.s3.copy_object(Bucket=self.bucket_name, CopySource={'Bucket': self.bucket_name, 'Key': key}, Key=new_key) + self.s3.copy_object( + Bucket=self.bucket_name, + CopySource={"Bucket": self.bucket_name, "Key": key}, + Key=new_key, + ) # Delete the file from the temporary directory self.s3.delete_object(Bucket=self.bucket_name, Key=key) @@ -542,4 +536,4 @@ def _rename_s3_folder(self, tempdirname, dirname): @export class InvalidFolderNameFormat(Exception): - pass \ No newline at end of file + pass diff --git a/strax/utils.py b/strax/utils.py index 5e87007c4..5907258ff 100644 --- a/strax/utils.py +++ b/strax/utils.py @@ -808,40 +808,29 @@ def convert_tuple_to_list(init_func_input): @export def stx_file_parser(path: str): - """ - Strax assumes 2 main structures for directories it generates - and files, as such we will want to understand these file - """ + """Strax assumes 2 main structures for directories it generates and files, as such we will want + to understand these file.""" - assert type(path) is str + assert type(path) is str try: - temp1, temp2, temp3 = re.split(r'-', path) + temp1, temp2, temp3 = re.split(r"-", path) if temp1.isdigit(): - file_data = {"run_id" : temp1, - "dtype" : temp2, - "lineage" : temp3, - "is_json": False} + file_data = {"run_id": temp1, "dtype": temp2, "lineage": temp3, "is_json": False} # Covers chunks else: - #dtype, lineage, x = re.split(r'-', path) + # dtype, lineage, x = re.split(r'-', path) if temp3 == "metadata.json": - file_data = {"run_id" : temp1, - "dtype" : temp2, - "lineage" : temp3, - "is_json": True} + file_data = {"run_id": temp1, "dtype": temp2, "lineage": temp3, "is_json": True} else: - file_data = {"chunk_i" : temp3, - "dtype" : temp1, - "lineage" : temp2, - "is_json": False} - + file_data = {"chunk_i": temp3, "dtype": temp1, "lineage": temp2, "is_json": False} + except ValueError: print("An invalid string was given to this function") return file_data - + @export def convert_structured_array_to_df(structured_array, log=None): @@ -878,4 +867,3 @@ def convert_structured_array_to_df(structured_array, log=None): ) return pd.DataFrame(data_dict) - From f2dbc7cf0cdf1dc57d0285d6fb87cfe1c4a9208f Mon Sep 17 00:00:00 2001 From: Luis Sanchez Date: Mon, 3 Mar 2025 16:50:14 -0600 Subject: [PATCH 06/24] update io --- pyproject.toml | 2 ++ strax/io.py | 81 +++++++++++++++++++++++++++++++-------------- strax/storage/s3.py | 17 +++++++--- 3 files changed, 71 insertions(+), 29 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 6d2b9b60b..f68004ef8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,6 +26,8 @@ rechunker = "strax.scripts.rechunker:main" [tool.poetry.dependencies] python = ">=3.10,<3.13" blosc = "*" +boto3 = "*" +botocore = "*" click = "*" deepdiff = "*" dill = "*" diff --git a/strax/io.py b/strax/io.py index e03047abf..077395e76 100644 --- a/strax/io.py +++ b/strax/io.py @@ -10,9 +10,10 @@ import zstandard import lz4.frame as lz4 from ast import literal_eval +from io import BytesIO +from botocore.exceptions import ClientError import strax -from strax import RUN_METADATA_PATTERN export, __all__ = strax.exporter() __all__.extend(["DECOMPRESS_BUFFER_SIZE"]) @@ -80,22 +81,53 @@ def _lz4_decompress(f, buffer_size=DECOMPRESS_BUFFER_SIZE): @export -def load_file(f, compressor, dtype): - """Read and return data from file. +def load_file(f, compressor, dtype, + S3_client=None, + bucket_name = None, + is_s3_path=False): + """Read and return data from file or S3. :param f: file name or handle to read from :param compressor: compressor to use for decompressing. If not passed, will try to load it from json metadata file. :param dtype: numpy dtype of data to load - + :param S3_client: (optional) S3 client to find and store data + :param is_s3_path: Boolean indicating if the file is stored in S3. """ - if isinstance(f, str): - with open(f, mode="rb") as write_file: - return _load_file(write_file, compressor, dtype) + if is_s3_path: + # Read from S3 + return load_file_from_s3(f, compressor, dtype, + bucket_name) + elif isinstance(f, str): + # Read from local file + with open(f, mode="rb") as read_file: + return _load_file(read_file, compressor, dtype) else: + # If f is already a file-like object, just use it return _load_file(f, compressor, dtype) +def load_file_from_s3(key, compressor, dtype, bucket_name): + """Helper function to load data from S3.""" + s3 = strax.S3Frontend().s3 + + try: + data = COMPRESSORS[compressor]["_decompress"](f) + if not len(data): + return np.zeros(0, dtype=dtype) + + # Retrieve the file from S3 and load into a BytesIO buffer + response = s3.get_object(Bucket=bucket_name, Key=key) + file_data = response['Body'].read() # Read the content of the file from S3 + + # Create a file-like object from the binary data + file_buffer = BytesIO(file_data) + return _load_file(file_buffer, compressor, dtype) + + except ClientError as e: + raise RuntimeError(f"Failed to load {key} from bucket {bucket_name}: {e}") + + def _load_file(f, compressor, dtype): try: data = COMPRESSORS[compressor]["_decompress"](f) @@ -105,6 +137,8 @@ def _load_file(f, compressor, dtype): return np.frombuffer(data, dtype=dtype) except ValueError as e: raise ValueError(f"ValueError while loading data with dtype =\n\t{dtype}") from e + except Exception as e: + raise RuntimeError(f"Error loading file: {e}") except Exception: raise strax.DataCorrupted( @@ -113,7 +147,7 @@ def _load_file(f, compressor, dtype): @export -def save_file(f, data, compressor="zstd", is_s3_path=False): +def save_file(f, data, compressor="zstd", is_s3_path = False): """Save data to file and return number of bytes written. :param f: file name or handle to save to @@ -121,7 +155,7 @@ def save_file(f, data, compressor="zstd", is_s3_path=False): :param compressor: compressor to use """ - + if isinstance(f, str): final_fn = f temp_fn = f + "_temp" @@ -131,22 +165,20 @@ def save_file(f, data, compressor="zstd", is_s3_path=False): os.rename(temp_fn, final_fn) return result else: - s3_interface = strax.S3Frontend( - s3_access_key_id=None, - s3_secret_access_key=None, - path="", - deep_scan=False, - ) + s3_interface = strax.S3Frontend(s3_access_key_id=None, + s3_secret_access_key=None, + path="", + deep_scan=False, ) # Copy temp file to final file result = _save_file_to_s3(s3_interface, temp_fn, data, compressor) s3_interface.s3.copy_object( - Bucket=s3_interface.BUCKET, - Key=final_fn, - CopySource={"Bucket": s3_interface.BUCKET, "Key": temp_fn}, - ) - + Bucket=s3_interface.BUCKET, + Key=final_fn, + CopySource={"Bucket": s3_interface.BUCKET, "Key": temp_fn} + ) + # Delete the temporary file - s3_interface.s3.delete_object(Bucket=s3_interface.BUCKET, Key=temp_fn) + s3_interface.s3.delete_object(Bucket=s3_interface.BUCKET, Key=temp_fn) return result else: @@ -176,9 +208,10 @@ def _save_file_to_s3(s3_client, key, data, compressor=None): buffer.seek(0) # Reset the buffer to the beginning # Upload buffer to S3 under the specified key - s3_client.s3.put_object(Bucket=s3_client.BUCKET, Key=key, Body=buffer.getvalue()) + s3_client.s3.put_object(Bucket=s3_client.BUCKET, + Key=key, Body=buffer.getvalue()) - return len(data) + return len(data) def _compress_blosc(data): @@ -193,7 +226,7 @@ def _compress_blosc(data): @export def dry_load_files(dirname, chunk_numbers=None, disable=False, **kwargs): prefix = strax.storage.files.dirname_to_prefix(dirname) - metadata_json = RUN_METADATA_PATTERN % prefix + metadata_json = f"{prefix}-metadata.json" md_path = os.path.join(dirname, metadata_json) with open(md_path, mode="r") as f: diff --git a/strax/storage/s3.py b/strax/storage/s3.py index 189744c0a..eca8843c5 100644 --- a/strax/storage/s3.py +++ b/strax/storage/s3.py @@ -202,12 +202,15 @@ def _get_config_value(self, variable, option_name): def s3_object_exists(self, bucket_name, key): try: - response = self.s3.list_objects_v2(Bucket=bucket_name, Prefix=key) + response = self.s3.list_objects_v2(Bucket=bucket_name, + Prefix=key, + Delimiter='/') # Check if any objects were returned, and if the key exactly matches - if "Contents" in response: - for obj in response["Contents"]: - if obj["Key"] == key: + if "CommonPrefixes" in response: + for prefix in response["CommonPrefixes"]: + new_key = key + "/" + if prefix["Prefix"].startswith(new_key): return True # Object exists return False # Object does not exist @@ -350,7 +353,11 @@ def _read_and_format_chunk(self, *args, **kwargs): def _read_chunk(self, dirname, chunk_info, dtype, compressor): fn = osp.join(dirname, chunk_info["filename"]) - return strax.load_file(fn, dtype=dtype, compressor=compressor) + return strax.load_file(fn, dtype=dtype, + compressor=compressor, + S3_client=self.s3, + bucket_name = self.BUCKET, + is_s3_path=True) def _saver(self, dirname, metadata, **kwargs): # Test if the parent directory is writeable. From 9409a2e788fa07778ce783fe778752c900c3f237 Mon Sep 17 00:00:00 2001 From: Luis Sanchez Date: Mon, 3 Mar 2025 16:27:33 -0600 Subject: [PATCH 07/24] update documentation + change bucket initialization --- strax/storage/s3.py | 438 ++++++++++++++++++++++++++------------------ 1 file changed, 260 insertions(+), 178 deletions(-) diff --git a/strax/storage/s3.py b/strax/storage/s3.py index eca8843c5..546116725 100644 --- a/strax/storage/s3.py +++ b/strax/storage/s3.py @@ -21,10 +21,10 @@ @export class S3Frontend(StorageFrontend): - """Simplest registry: single directory with FileStore data - sitting in subdirectories. - - Run-level metadata is stored in loose json files in the directory. + """ + A storage frontend that interacts with an S3-compatible object storage. + This class handles run-level metadata storage and retrieval, as well as + scanning for available runs. """ can_define_runs = True @@ -38,15 +38,22 @@ def __init__( s3_secret_access_key: str = None, endpoint_url: str = "https://rice1.osn.mghpcc.org/", path: str = "", + bucket_name: str = "", deep_scan: bool = False, *args, **kwargs, ): """ - :param path: Path to folder with data subfolders. - :param deep_scan: Let scan_runs scan over folders, - so even data for which no run-level metadata is available - is reported. + Initialize S3Frontend with given storage parameters + + :param s3_access_key_id: AWS access key for authentication. + :param s3_secret_access_key: AWS secret key for authentication. + :param endpoint_url: URL of the S3-compatible object storage. + :param path: Base path for storing data. + :param bucket_name: Name of the S3 bucket to use. + :param deep_scan: If True, scans for runs even without explicit metadata. + :param args: Additional arguments passed to the superclass. + :param kwargs: Additional keyword arguments passed to the superclass. For other arguments, see DataRegistry base class. """ @@ -54,23 +61,7 @@ def __init__( self.path = path self.deep_scan = deep_scan - # Might need to reimplement this at some later time - # if not self.readonly and not osp.exists(self.path): - # os.makedirs(self.path) - - self.config_path = os.getenv("XENON_CONFIG") - if self.config_path is None: - raise EnvironmentError("XENON_CONFIG file not found") - else: - self.config = configparser.ConfigParser() - self.config.read(self.config_path) - - s3_access_key_id = self._get_config_value(s3_access_key_id, "aws_access_key_id") - s3_secret_access_key = self._get_config_value( - s3_secret_access_key, "aws_secret_access_key" - ) - endpoint_url = self._get_config_value(endpoint_url, "endpoint_url") - + # Configure S3 client self.boto3_client_kwargs = { "aws_access_key_id": s3_access_key_id, "aws_secret_access_key": s3_secret_access_key, @@ -79,71 +70,118 @@ def __init__( "config": Config(connect_timeout=5, retries={"max_attempts": 10}), } - # Initialized connection to S3-protocol storage + if bucket_name != "": + self.bucket_name = bucket_name + + # Initialized connection to S3 storage self.s3 = boto3.client(**self.boto3_client_kwargs) + self.backends = [S3Backend(self.bucket_name, **self.boto3_client_kwargs)] - self.backends = [S3Backend(**self.boto3_client_kwargs)] + def _run_meta_path(self, run_id: str) -> str: + """ + Generate the metadata file path for a given run ID. + + :param run_id: The identifier of the run. - def _run_meta_path(self, run_id): + + :return: The path where the metadata is stored. + """ return osp.join(self.path, RUN_METADATA_PATTERN % run_id) - def run_metadata(self, run_id, projection=None): + def run_metadata(self, run_id: str, projection=None) -> dict: + """ + Retrieve metadata for a given run from S3. + + Parameters + ---------- + + run_id : (str) + The identifier of the run. + projection : + Fields to extract from metadata (optional). + :return: Run metadata as a dictionary. + """ path = self._run_meta_path(run_id) - # Changed ops. to self.s3 implementation - if self.s3.list_objects_v2(Bucket=self.BUCKET, Prefix=path)["KeyCount"] == 0: + + # Checks if metadata exists + if self.s3.list_objects_v2(Bucket=self.bucket_name, + Prefix=path)["KeyCount"] == 0: raise strax.RunMetadataNotAvailable( f"No file at {path}, cannot find run metadata for {run_id}" ) - response = self.s3.get_object(Bucket=self.BUCKET, Key=path) + + # Retrieve metadata + response = self.s3.get_object(Bucket=self.bucket_name, Key=path) metadata_content = response["Body"].read().decode("utf-8") md = json.loads(metadata_content, object_hook=json_util.object_hook) - # with open(path, mode="r") as f: - # md = json.loads(f.read(), object_hook=json_util.object_hook) md = strax.flatten_run_metadata(md) + if projection is not None: md = {key: value for key, value in md.items() if key in projection} return md - def write_run_metadata(self, run_id, metadata): - # response = self.s3.get_object(Bucket=self.BUCKET, Key=self._run_meta_path(run_id)) - # metadata_content = response['Body'].read().decode('utf-8') + def write_run_metadata(self, run_id: str, metadata: dict): + """ + Write metadata for a specific run to S3. + + :param run_id: The identifier of the run. + :param metadata: The metadata dictionary to store. + """ if "name" not in metadata: metadata["name"] = run_id self.s3.put_object( - Bucket=self.BUCKET, + Bucket=self.bucket_name, Key=self._run_meta_path(run_id), - Body=json.dumps(metadata, sort_keys=True, indent=4, default=json_util.default), + Body=json.dumps(metadata, sort_keys=True, + indent=4, default=json_util.default), ) - # with open(self._run_meta_path(run_id), mode="w") as f: - # if "name" not in metadata: - # metadata["name"] = run_id - # f.write(json.dumps(metadata, sort_keys=True, indent=4, default=json_util.default)) - def _scan_runs(self, store_fields): - """Iterable of run document dictionaries. + def s3_object_exists(self, key) -> bool: + """ + Check if a given object exists in the S3 bucket. - These should be directly convertable to a pandas DataFrame. + :param key: The object key to check. + :return: True if the object exists, otherwise False. + """ + try: + response = self.s3.list_objects_v2(Bucket=self.bucket_name, + Prefix=key, + Delimiter='/') + # Check if any objects were returned, and if the key exactly matches + if "CommonPrefixes" in response: + for prefix in response["CommonPrefixes"]: + new_key = key + "/" + if prefix["Prefix"].startswith(new_key): + return True # Object exists + + return False # Object does not exist + except ClientError as e: + raise e + + def _scan_runs(self, store_fields): + """ + Scan for available runs stored in S3. + + :param store_fields: List of metadata fields to return. + :return: Yields dictionaries of run metadata. """ found = set() - # Yield metadata for runs for which we actually have it + # Retrieve stored runs from S3 for md_path in sorted( self.s3.list_objects_v2( - Bucket=self.BUCKET, + Bucket=self.bucket_name, Prefix=osp.join(self.path, RUN_METADATA_PATTERN.replace("%s", "*")), ) ): - # Parse the run metadata filename pattern. - # (different from the folder pattern) run_id = osp.basename(md_path).split("-")[0] found.add(run_id) yield self.run_metadata(run_id, projection=store_fields) + # Preform deepscan if enabled if self.deep_scan: - # Yield runs for which no metadata exists - # we'll make "metadata" that consist only of the run name for fn in self._subfolders(): run_id = self._parse_folder_name(fn)[0] if run_id not in found: @@ -151,9 +189,20 @@ def _scan_runs(self, store_fields): yield dict(name=run_id) def _find(self, key, write, allow_incomplete, fuzzy_for, fuzzy_for_options): + """ + Find the appropriate storage key for a given dataset. + + :param key: The dataset key. + :param write: Whether to check for writable access. + :param allow_incomplete: Allow incomplete datasets. + :param fuzzy_for: Parameters for fuzzy search. + :param fuzzy_for_options: Additional fuzzy search options. + :return: The backend key if found, otherwise raises DataNotAvailable. + """ + self.raise_if_non_compatible_run_id(key.run_id) dirname = osp.join(self.path, str(key)) - exists = self.s3_object_exists(self.BUCKET, dirname) + exists = self.s3_object_exists(self.bucket_name, dirname) bk = self.backend_key(dirname) if write: @@ -165,23 +214,20 @@ def _find(self, key, write, allow_incomplete, fuzzy_for, fuzzy_for_options): # Check for incomplete data (only exact matching for now) if fuzzy_for or fuzzy_for_options: raise NotImplementedError( - "Mixing of fuzzy matching and allow_incomplete not supported by DataDirectory." + "Mixing of fuzzy matching and allow_incomplete not", + " supported by DataDirectory." ) tempdirname = dirname + "_temp" bk = self.backend_key(tempdirname) - if self.s3.list_objects_v2(Bucket=self.BUCKET, Prefix=tempdirname)["KeyCount"] >= 0: + if self.s3.list_objects_v2(Bucket=self.bucket_name, + Prefix=tempdirname)["KeyCount"] >= 0: return bk # Check exact match if exists and self._folder_matches(dirname, key, None, None): return bk - # Check metadata of all potentially matching data dirs for - # matches. This only makes sense for fuzzy searches since - # otherwise we should have had an exact match already. (Also - # really slows down st.select runs otherwise because we doing an - # entire search over all the files in self._subfolders for all - # non-available keys). + # If fuzzy search is enabled find fuzzy file names if fuzzy_for or fuzzy_for_options: for fn in self._subfolders(): if self._folder_matches(fn, key, fuzzy_for, fuzzy_for_options): @@ -190,6 +236,13 @@ def _find(self, key, write, allow_incomplete, fuzzy_for, fuzzy_for_options): raise strax.DataNotAvailable def _get_config_value(self, variable, option_name): + """ + Retrieve a configuration value from the environment or config file. + + :param variable: The variable to check. + :param option_name: The option name in the config file. + :return: The retrieved configuration value. + """ if variable is None: if "s3" not in self.config: raise EnvironmentError("S3 access point not spesified") @@ -200,28 +253,11 @@ def _get_config_value(self, variable, option_name): else: return variable - def s3_object_exists(self, bucket_name, key): - try: - response = self.s3.list_objects_v2(Bucket=bucket_name, - Prefix=key, - Delimiter='/') - - # Check if any objects were returned, and if the key exactly matches - if "CommonPrefixes" in response: - for prefix in response["CommonPrefixes"]: - new_key = key + "/" - if prefix["Prefix"].startswith(new_key): - return True # Object exists - - return False # Object does not exist - except ClientError as e: - # Handle any other error as needed - raise e - def _subfolders(self): """Loop over subfolders of self.path that match our folder format.""" # Trigger if statement if path doesnt exist - if self.s3.list_objects_v2(Bucket=self.BUCKET, Prefix=self.path)["KeyCount"] == 0: + if self.s3.list_objects_v2(Bucket=self.bucket_name, + Prefix=self.path)["KeyCount"] == 0: return for dirname in os.listdir(self.path): try: @@ -230,20 +266,16 @@ def _subfolders(self): continue yield osp.join(self.path, dirname) - @staticmethod - def _parse_folder_name(fn): - """Return (run_id, data_type, hash) if folder name matches DataDirectory convention, raise - InvalidFolderNameFormat otherwise.""" - stuff = osp.normpath(fn).split(os.sep)[-1].split("-") - if len(stuff) != 3: - # This is not a folder with strax data - raise InvalidFolderNameFormat(fn) - return stuff - def _folder_matches(self, fn, key, fuzzy_for, fuzzy_for_options, ignore_name=False): - """Return the run_id of folder fn if it matches key, or False if it does not. + """ + Check if a folder matches the required data key. - :param name: Ignore the run name part of the key. Useful for listing availability. + :param fn: Folder name. + :param key: Data key to match against. + :param fuzzy_for: Parameters for fuzzy search. + :param fuzzy_for_options: Additional fuzzy search options. + :param ignore_name: If True, ignores run name while matching. + :return: The run_id if it matches, otherwise False. """ # Parse the folder name @@ -269,15 +301,37 @@ def _folder_matches(self, fn, key, fuzzy_for, fuzzy_for_options, ignore_name=Fal return False def backend_key(self, dirname): + """ + Return the backend key representation. + + :param dirname: The directory name. + :return: Backend key tuple. + """ return self.backends[0].__class__.__name__, dirname def remove(self, key): - # There is no database, so removing the folder from the filesystem - # (which FileStore should do) is sufficient. - pass + # Remove a data entery from storage + NotImplementedError + + @staticmethod + def _parse_folder_name(fn): + """ + Return (run_id, data_type, hash) if folder name matches + DataDirectory convention, raise InvalidFolderNameFormat otherwise. + """ + stuff = osp.normpath(fn).split(os.sep)[-1].split("-") + if len(stuff) != 3: + # This is not a folder with strax data + raise InvalidFolderNameFormat(fn) + return stuff @staticmethod def raise_if_non_compatible_run_id(run_id): + """ + Raise an error if the run ID contains invalid characters. + + :param run_id: The run identifier. + """ if "-" in str(run_id): raise ValueError( "The filesystem frontend does not understand" @@ -294,126 +348,149 @@ def dirname_to_prefix(dirname): @export class S3Backend(strax.StorageBackend): - """Store data locally in a directory of binary files. - - Files are named after the chunk number (without extension). Metadata is stored in a file called - metadata.json. - + """ + A storage backend that stores data in an S3-compatible object storage. + Data is stored in binary files, named based on the chunk number. + Metadata is stored separately as a JSON file. """ BUCKET = "mlrice" def __init__( self, - *args, + bucket_name, set_target_chunk_mb: Optional[int] = None, + *args, **kwargs, ): - """Add set_chunk_size_mb to strax.StorageBackend to allow changing the chunk.target_size_mb - returned from the loader, any args or kwargs are passed to the strax.StorageBackend. + """Add set_chunk_size_mb to strax.StorageBackend + to allow changing the chunk.target_size_mb + returned from the loader, any args or kwargs + are passed to the strax.StorageBackend. - :param set_target_chunk_mb: Prior to returning the loaders' chunks, return the chunk with an - updated target size + :param bucket_name: Name of the bucket used as storage + :param set_target_chunk_mb: Prior to returning the loaders' chunks, + return the chunk with an updated target size """ super().__init__() self.s3 = boto3.client(**kwargs) self.set_chunk_size_mb = set_target_chunk_mb + self.bucket_name = bucket_name def _get_metadata(self, dirname): + """ + Retrieve metadata for a given directory in S3. + + :param dirname: The directory name in S3 where metadata is stored. + :return: Dictionary containing metadata information. + :raises strax.DataCorrupted: If metadata is missing or corrupted. + """ prefix = dirname_to_prefix(dirname) metadata_json = f"{prefix}-metadata.json" md_path = osp.join(dirname, metadata_json) - if self.s3.list_objects_v2(Bucket=self.BUCKET, Prefix=md_path)["KeyCount"] == 0: + if self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=md_path)["KeyCount"] == 0: # Try to see if we are so fast that there exists a temp folder # with the metadata we need. md_path = osp.join(dirname + "_temp", metadata_json) - if self.s3.list_objects_v2(Bucket=self.BUCKET, Prefix=md_path)["KeyCount"] == 0: + if self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=md_path)["KeyCount"] == 0: # Try old-format metadata # (if it's not there, just let it raise FileNotFound # with the usual message in the next stage) old_md_path = osp.join(dirname, "metadata.json") - if self.s3.list_objects_v2(Bucket=self.BUCKET, Prefix=old_md_path)["KeyCount"] == 0: + if self.s3.list_objects_v2(Bucket=self.bucket_name, + Prefix=old_md_path)["KeyCount"] == 0: raise strax.DataCorrupted(f"Data in {dirname} has no metadata") md_path = old_md_path - response = self.s3.get_object(Bucket=self.BUCKET, Key=md_path) + response = self.s3.get_object(Bucket=self.bucket_name, Key=md_path) metadata_content = response["Body"].read().decode("utf-8") return json.loads(metadata_content) # with open(md_path, mode="r") as f: # return json.loads(f.read()) def _read_and_format_chunk(self, *args, **kwargs): + """ + Read a data chunk and optionally update its target size. + + :return: Formatted data chunk. + """ chunk = super()._read_and_format_chunk(*args, **kwargs) if self.set_chunk_size_mb: chunk.target_size_mb = self.set_chunk_size_mb return chunk def _read_chunk(self, dirname, chunk_info, dtype, compressor): + """ + Read a chunk of data from S3. + + :param dirname: Directory in S3 containing the chunk. + :param chunk_info: Dictionary containing chunk metadata. + :param dtype: Data type of the chunk. + :param compressor: Compression format used for the chunk. + :return: Loaded data chunk. + """ fn = osp.join(dirname, chunk_info["filename"]) return strax.load_file(fn, dtype=dtype, compressor=compressor, S3_client=self.s3, - bucket_name = self.BUCKET, + bucket_name = self.bucket_name, is_s3_path=True) def _saver(self, dirname, metadata, **kwargs): - # Test if the parent directory is writeable. - # We need abspath since the dir itself may not exist, - # even though its parent-to-be does - parent_dir = os.path.abspath(os.path.join(dirname, os.pardir)) # This might need some work - - # In case the parent dir also doesn't exist, we have to create is - # otherwise the write permission check below will certainly fail - # I dont think this is needed for S3 so we can delete it - # try: - # os.makedirs(parent_dir, exist_ok=True) - # except OSError as e: - # raise strax.DataNotAvailable( - # f"Can't write data to {dirname}, " - # f"{parent_dir} does not exist and we could not create it." - # f"Original error: {e}" - # ) - - # Finally, check if we have permission to create the new subdirectory - # (which the Saver will do) - # Also dont think its needed - # if not os.access(parent_dir, os.W_OK): - # raise strax.DataNotAvailable( - # f"Can't write data to {dirname}, no write permissions in {parent_dir}." - # ) - - return S3Saver(dirname, self.s3, metadata=metadata, **kwargs) + """ + Create a saver object for writing data to S3. + + :param dirname: Directory in S3 where data will be stored. + :param metadata: Metadata dictionary associated with the data. + :param kwargs: Additional keyword arguments for the saver. + :return: An instance of `S3Saver`. + """ + + parent_dir = os.path.abspath(os.path.join(dirname, os.pardir)) + + return S3Saver(dirname, self.s3, self.bucket_name, metadata=metadata, **kwargs) @export class S3Saver(strax.Saver): - """Saves data to compressed binary files.""" + """ + A saver class that writes data chunks to an S3-compatible storage backend. + Supports metadata management and chunked data saving. + """ json_options = dict(sort_keys=True, indent=4) # When writing chunks, rewrite the json file every time we write a chunk _flush_md_for_every_chunk = True - def __init__(self, dirname, s3, metadata, **kwargs): + def __init__(self, dirname, s3, bucket_name, metadata, **kwargs): + """ + Initialize the S3Saver instance. + + :param dirname: Directory path (prefix) in S3 where data is stored. + :param s3: Boto3 S3 client instance. + :param metadata: Metadata dictionary associated with the data. + :param kwargs: Additional keyword arguments for the saver. + """ super().__init__(metadata=metadata) self.dirname = dirname + self.s3 = s3 + self.bucket_name = bucket_name + self.tempdirname = dirname + "_temp" self.prefix = dirname_to_prefix(dirname) self.metadata_json = f"{self.prefix}-metadata.json" - self.s3 = s3 - self.bucket_name = "mlrice" - self.config = boto3.s3.transfer.TransferConfig(max_concurrency=40, num_download_attempts=30) - if self.s3.list_objects_v2(Bucket=BUCKET_NAME, Prefix=dirname)["KeyCount"] == 1: + if self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=dirname)["KeyCount"] == 1: print(f"Removing data in {dirname} to overwrite") - self.s3.delete_object(Bucket=BUCKET_NAME, Key=dirname) - if self.s3.list_objects_v2(Bucket=BUCKET_NAME, Prefix=self.tempdirname)["KeyCount"] == 1: + self.s3.delete_object(Bucket=self.bucket_name, Key=dirname) + if self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=self.tempdirname)["KeyCount"] == 1: print(f"Removing old incomplete data in {self.tempdirname}") - self.s3.delete_object(Bucket=BUCKET_NAME, Key=dirname) + self.s3.delete_object(Bucket=self.bucket_name, Key=dirname) # os.makedirs(self.tempdirname) self._flush_metadata() @@ -428,24 +505,32 @@ def _flush_metadata(self): self.s3.put_object(Bucket=self.bucket_name, Key=metadata_key, Body=metadata_content) def _chunk_filename(self, chunk_info): + """ + Generate a filename for a given chunk. + + :param chunk_info: Dictionary containing chunk metadata. + :return: Filename string. + """ if "filename" in chunk_info: return chunk_info["filename"] ichunk = "%06d" % chunk_info["chunk_i"] return f"{self.prefix}-{ichunk}" def _save_chunk(self, data, chunk_info, executor=None): + """ + Save a chunk of data to S3. + + :param data: Data chunk to be saved. + :param chunk_info: Metadata dictionary for the chunk. + :param executor: Optional executor for parallel writes. + :return: Chunk metadata dictionary. + """ filename = self._chunk_filename(chunk_info) fn = os.path.join(self.tempdirname, filename) kwargs = dict(data=data, compressor=self.md["compressor"]) if executor is None: filesize = strax.save_file(fn, is_s3_path=True, **kwargs) - # fn.seek(0) - # Giving just the filename wont work, needs to be the full path - # self.s3.upload_fileobj(fn, - # BUCKET_NAME, - # filename, - # Config=self.config,) return dict(filename=filename, filesize=filesize), None else: # Might need to add some s3 stuff here @@ -454,42 +539,34 @@ def _save_chunk(self, data, chunk_info, executor=None): ) def _save_chunk_metadata(self, chunk_info): + """ + Save metadata associated with a data chunk. + + :param chunk_info: Dictionary containing chunk metadata. + """ is_first = chunk_info["chunk_i"] == 0 if is_first: self.md["start"] = chunk_info["start"] if self.is_forked: # Do not write to the main metadata file to avoid race conditions - # Instead, write a separate metadata.json file for this chunk, - # to be collected later. - # We might not have a filename yet: - # the chunk is not saved when it is empty filename = self._chunk_filename(chunk_info) - fn = f"{self.tempdirname}/metadata_{filename}.json" metadata_content = json.dump(chunk_info, **self.json_options) - self.s3.put_object(Bucket=self.bucket_name, Key=fn, Body=metadata_content) - # with open(fn, mode="w") as f: - # f.write(json.dumps(chunk_info, **self.json_options)) - - # To ensure we have some metadata to load with allow_incomplete, - # modify the metadata immediately for the first chunk. - # If we are forked, modifying self.md is harmless since - # we're in a different process. + self.s3.put_object(Bucket=self.s3.bucket_name, Key=fn, Body=metadata_content) if not self.is_forked or is_first: - # Just append and flush the metadata - # (maybe not super-efficient to write the json every time... - # just don't use thousands of chunks) self.md["chunks"].append(chunk_info) if self._flush_md_for_every_chunk: self._flush_metadata() def _close(self): - # Check if temp directory exists in the S3 bucket by listing objects with the tempdirname prefix + """ + Finalize the saving process by merging temp data and flushing metadata. + """ try: - response = self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=self.tempdirname) + response = self.s3.list_objects_v2(Bucket=self.s3.bucket_name, Prefix=self.tempdirname) if "Contents" not in response or len(response["Contents"]) == 0: raise RuntimeError( f"{self.tempdirname} was already renamed to {self.dirname}. " @@ -500,17 +577,17 @@ def _close(self): # List the files in the temporary directory matching metadata_*.json response = self.s3.list_objects_v2( - Bucket=self.bucket_name, Prefix=f"{self.tempdirname}/metadata_" + Bucket=self.s3.bucket_name, Prefix=f"{self.tempdirname}/metadata_" ) for obj in response.get("Contents", []): key = obj["Key"] # Download each metadata file, process, and delete from tempdirname - metadata_object = self.s3.get_object(Bucket=self.bucket_name, Key=key) + metadata_object = self.s3.get_object(Bucket=self.s3.bucket_name, Key=key) metadata_content = metadata_object["Body"].read().decode("utf-8") self.md["chunks"].append(json.loads(metadata_content)) # Optionally, delete the metadata file after processing - self.s3.delete_object(Bucket=self.bucket_name, Key=key) + self.s3.delete_object(Bucket=self.s3.bucket_name, Key=key) # Flush metadata (this would be another method to handle your metadata saving logic) self._flush_metadata() @@ -523,22 +600,27 @@ def _close(self): raise def _rename_s3_folder(self, tempdirname, dirname): - # List the files in the temporary directory - response = self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=tempdirname) + """ + Rename the temporary directory to the final storage location in S3. + + :param tempdirname: Temporary directory path in S3. + :param dirname: Final directory path in S3. + """ + response = self.s3.list_objects_v2(Bucket=self.s3.bucket_name, Prefix=tempdirname) for obj in response.get("Contents", []): key = obj["Key"] # Copy each file from the temporary directory to the final directory new_key = key.replace(tempdirname, dirname) self.s3.copy_object( - Bucket=self.bucket_name, - CopySource={"Bucket": self.bucket_name, "Key": key}, + Bucket=self.s3.bucket_name, + CopySource={"Bucket": self.s3.bucket_name, "Key": key}, Key=new_key, ) # Delete the file from the temporary directory - self.s3.delete_object(Bucket=self.bucket_name, Key=key) + self.s3.delete_object(Bucket=self.s3.bucket_name, Key=key) # Delete the temporary directory - self.s3.delete_object(Bucket=self.bucket_name, Key=tempdirname) + self.s3.delete_object(Bucket=self.s3.bucket_name, Key=tempdirname) @export From 63e0bad176b20f71daaab3bd9ea8b02c4b5c0959 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 3 Mar 2025 22:54:53 +0000 Subject: [PATCH 08/24] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- strax/io.py | 44 ++++++----- strax/storage/s3.py | 178 ++++++++++++++++++++++---------------------- 2 files changed, 109 insertions(+), 113 deletions(-) diff --git a/strax/io.py b/strax/io.py index 077395e76..c5f16a7a8 100644 --- a/strax/io.py +++ b/strax/io.py @@ -81,10 +81,7 @@ def _lz4_decompress(f, buffer_size=DECOMPRESS_BUFFER_SIZE): @export -def load_file(f, compressor, dtype, - S3_client=None, - bucket_name = None, - is_s3_path=False): +def load_file(f, compressor, dtype, S3_client=None, bucket_name=None, is_s3_path=False): """Read and return data from file or S3. :param f: file name or handle to read from @@ -93,11 +90,11 @@ def load_file(f, compressor, dtype, :param dtype: numpy dtype of data to load :param S3_client: (optional) S3 client to find and store data :param is_s3_path: Boolean indicating if the file is stored in S3. + """ if is_s3_path: # Read from S3 - return load_file_from_s3(f, compressor, dtype, - bucket_name) + return load_file_from_s3(f, compressor, dtype, bucket_name) elif isinstance(f, str): # Read from local file with open(f, mode="rb") as read_file: @@ -110,7 +107,7 @@ def load_file(f, compressor, dtype, def load_file_from_s3(key, compressor, dtype, bucket_name): """Helper function to load data from S3.""" s3 = strax.S3Frontend().s3 - + try: data = COMPRESSORS[compressor]["_decompress"](f) if not len(data): @@ -118,7 +115,7 @@ def load_file_from_s3(key, compressor, dtype, bucket_name): # Retrieve the file from S3 and load into a BytesIO buffer response = s3.get_object(Bucket=bucket_name, Key=key) - file_data = response['Body'].read() # Read the content of the file from S3 + file_data = response["Body"].read() # Read the content of the file from S3 # Create a file-like object from the binary data file_buffer = BytesIO(file_data) @@ -147,7 +144,7 @@ def _load_file(f, compressor, dtype): @export -def save_file(f, data, compressor="zstd", is_s3_path = False): +def save_file(f, data, compressor="zstd", is_s3_path=False): """Save data to file and return number of bytes written. :param f: file name or handle to save to @@ -155,7 +152,7 @@ def save_file(f, data, compressor="zstd", is_s3_path = False): :param compressor: compressor to use """ - + if isinstance(f, str): final_fn = f temp_fn = f + "_temp" @@ -165,20 +162,22 @@ def save_file(f, data, compressor="zstd", is_s3_path = False): os.rename(temp_fn, final_fn) return result else: - s3_interface = strax.S3Frontend(s3_access_key_id=None, - s3_secret_access_key=None, - path="", - deep_scan=False, ) + s3_interface = strax.S3Frontend( + s3_access_key_id=None, + s3_secret_access_key=None, + path="", + deep_scan=False, + ) # Copy temp file to final file result = _save_file_to_s3(s3_interface, temp_fn, data, compressor) s3_interface.s3.copy_object( - Bucket=s3_interface.BUCKET, - Key=final_fn, - CopySource={"Bucket": s3_interface.BUCKET, "Key": temp_fn} - ) - + Bucket=s3_interface.BUCKET, + Key=final_fn, + CopySource={"Bucket": s3_interface.BUCKET, "Key": temp_fn}, + ) + # Delete the temporary file - s3_interface.s3.delete_object(Bucket=s3_interface.BUCKET, Key=temp_fn) + s3_interface.s3.delete_object(Bucket=s3_interface.BUCKET, Key=temp_fn) return result else: @@ -208,10 +207,9 @@ def _save_file_to_s3(s3_client, key, data, compressor=None): buffer.seek(0) # Reset the buffer to the beginning # Upload buffer to S3 under the specified key - s3_client.s3.put_object(Bucket=s3_client.BUCKET, - Key=key, Body=buffer.getvalue()) + s3_client.s3.put_object(Bucket=s3_client.BUCKET, Key=key, Body=buffer.getvalue()) - return len(data) + return len(data) def _compress_blosc(data): diff --git a/strax/storage/s3.py b/strax/storage/s3.py index 546116725..8cf9442ec 100644 --- a/strax/storage/s3.py +++ b/strax/storage/s3.py @@ -21,10 +21,11 @@ @export class S3Frontend(StorageFrontend): - """ - A storage frontend that interacts with an S3-compatible object storage. - This class handles run-level metadata storage and retrieval, as well as - scanning for available runs. + """A storage frontend that interacts with an S3-compatible object storage. + + This class handles run-level metadata storage and retrieval, as well as scanning for available + runs. + """ can_define_runs = True @@ -43,8 +44,7 @@ def __init__( *args, **kwargs, ): - """ - Initialize S3Frontend with given storage parameters + """Initialize S3Frontend with given storage parameters. :param s3_access_key_id: AWS access key for authentication. :param s3_secret_access_key: AWS secret key for authentication. @@ -53,9 +53,9 @@ def __init__( :param bucket_name: Name of the S3 bucket to use. :param deep_scan: If True, scans for runs even without explicit metadata. :param args: Additional arguments passed to the superclass. - :param kwargs: Additional keyword arguments passed to the superclass. + :param kwargs: Additional keyword arguments passed to the superclass. For other arguments, + see DataRegistry base class. - For other arguments, see DataRegistry base class. """ super().__init__(*args, **kwargs) self.path = path @@ -78,34 +78,31 @@ def __init__( self.backends = [S3Backend(self.bucket_name, **self.boto3_client_kwargs)] def _run_meta_path(self, run_id: str) -> str: - """ - Generate the metadata file path for a given run ID. + """Generate the metadata file path for a given run ID. :param run_id: The identifier of the run. - - :return: The path where the metadata is stored. + """ return osp.join(self.path, RUN_METADATA_PATTERN % run_id) def run_metadata(self, run_id: str, projection=None) -> dict: - """ - Retrieve metadata for a given run from S3. + """Retrieve metadata for a given run from S3. Parameters ---------- run_id : (str) The identifier of the run. - projection : + projection : Fields to extract from metadata (optional). :return: Run metadata as a dictionary. + """ path = self._run_meta_path(run_id) - + # Checks if metadata exists - if self.s3.list_objects_v2(Bucket=self.bucket_name, - Prefix=path)["KeyCount"] == 0: + if self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=path)["KeyCount"] == 0: raise strax.RunMetadataNotAvailable( f"No file at {path}, cannot find run metadata for {run_id}" ) @@ -121,11 +118,11 @@ def run_metadata(self, run_id: str, projection=None) -> dict: return md def write_run_metadata(self, run_id: str, metadata: dict): - """ - Write metadata for a specific run to S3. + """Write metadata for a specific run to S3. :param run_id: The identifier of the run. :param metadata: The metadata dictionary to store. + """ if "name" not in metadata: metadata["name"] = run_id @@ -133,21 +130,18 @@ def write_run_metadata(self, run_id: str, metadata: dict): self.s3.put_object( Bucket=self.bucket_name, Key=self._run_meta_path(run_id), - Body=json.dumps(metadata, sort_keys=True, - indent=4, default=json_util.default), + Body=json.dumps(metadata, sort_keys=True, indent=4, default=json_util.default), ) def s3_object_exists(self, key) -> bool: - """ - Check if a given object exists in the S3 bucket. + """Check if a given object exists in the S3 bucket. :param key: The object key to check. :return: True if the object exists, otherwise False. + """ try: - response = self.s3.list_objects_v2(Bucket=self.bucket_name, - Prefix=key, - Delimiter='/') + response = self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=key, Delimiter="/") # Check if any objects were returned, and if the key exactly matches if "CommonPrefixes" in response: @@ -161,11 +155,11 @@ def s3_object_exists(self, key) -> bool: raise e def _scan_runs(self, store_fields): - """ - Scan for available runs stored in S3. + """Scan for available runs stored in S3. :param store_fields: List of metadata fields to return. :return: Yields dictionaries of run metadata. + """ found = set() @@ -189,8 +183,7 @@ def _scan_runs(self, store_fields): yield dict(name=run_id) def _find(self, key, write, allow_incomplete, fuzzy_for, fuzzy_for_options): - """ - Find the appropriate storage key for a given dataset. + """Find the appropriate storage key for a given dataset. :param key: The dataset key. :param write: Whether to check for writable access. @@ -198,6 +191,7 @@ def _find(self, key, write, allow_incomplete, fuzzy_for, fuzzy_for_options): :param fuzzy_for: Parameters for fuzzy search. :param fuzzy_for_options: Additional fuzzy search options. :return: The backend key if found, otherwise raises DataNotAvailable. + """ self.raise_if_non_compatible_run_id(key.run_id) @@ -214,13 +208,15 @@ def _find(self, key, write, allow_incomplete, fuzzy_for, fuzzy_for_options): # Check for incomplete data (only exact matching for now) if fuzzy_for or fuzzy_for_options: raise NotImplementedError( - "Mixing of fuzzy matching and allow_incomplete not", - " supported by DataDirectory." + "Mixing of fuzzy matching and allow_incomplete not", + " supported by DataDirectory.", ) tempdirname = dirname + "_temp" bk = self.backend_key(tempdirname) - if self.s3.list_objects_v2(Bucket=self.bucket_name, - Prefix=tempdirname)["KeyCount"] >= 0: + if ( + self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=tempdirname)["KeyCount"] + >= 0 + ): return bk # Check exact match @@ -236,12 +232,12 @@ def _find(self, key, write, allow_incomplete, fuzzy_for, fuzzy_for_options): raise strax.DataNotAvailable def _get_config_value(self, variable, option_name): - """ - Retrieve a configuration value from the environment or config file. + """Retrieve a configuration value from the environment or config file. :param variable: The variable to check. :param option_name: The option name in the config file. :return: The retrieved configuration value. + """ if variable is None: if "s3" not in self.config: @@ -256,8 +252,7 @@ def _get_config_value(self, variable, option_name): def _subfolders(self): """Loop over subfolders of self.path that match our folder format.""" # Trigger if statement if path doesnt exist - if self.s3.list_objects_v2(Bucket=self.bucket_name, - Prefix=self.path)["KeyCount"] == 0: + if self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=self.path)["KeyCount"] == 0: return for dirname in os.listdir(self.path): try: @@ -267,8 +262,7 @@ def _subfolders(self): yield osp.join(self.path, dirname) def _folder_matches(self, fn, key, fuzzy_for, fuzzy_for_options, ignore_name=False): - """ - Check if a folder matches the required data key. + """Check if a folder matches the required data key. :param fn: Folder name. :param key: Data key to match against. @@ -301,11 +295,11 @@ def _folder_matches(self, fn, key, fuzzy_for, fuzzy_for_options, ignore_name=Fal return False def backend_key(self, dirname): - """ - Return the backend key representation. + """Return the backend key representation. :param dirname: The directory name. :return: Backend key tuple. + """ return self.backends[0].__class__.__name__, dirname @@ -315,10 +309,8 @@ def remove(self, key): @staticmethod def _parse_folder_name(fn): - """ - Return (run_id, data_type, hash) if folder name matches - DataDirectory convention, raise InvalidFolderNameFormat otherwise. - """ + """Return (run_id, data_type, hash) if folder name matches DataDirectory convention, raise + InvalidFolderNameFormat otherwise.""" stuff = osp.normpath(fn).split(os.sep)[-1].split("-") if len(stuff) != 3: # This is not a folder with strax data @@ -327,10 +319,10 @@ def _parse_folder_name(fn): @staticmethod def raise_if_non_compatible_run_id(run_id): - """ - Raise an error if the run ID contains invalid characters. + """Raise an error if the run ID contains invalid characters. :param run_id: The run identifier. + """ if "-" in str(run_id): raise ValueError( @@ -348,10 +340,11 @@ def dirname_to_prefix(dirname): @export class S3Backend(strax.StorageBackend): - """ - A storage backend that stores data in an S3-compatible object storage. - Data is stored in binary files, named based on the chunk number. - Metadata is stored separately as a JSON file. + """A storage backend that stores data in an S3-compatible object storage. + + Data is stored in binary files, named based on the chunk number. Metadata is stored separately + as a JSON file. + """ BUCKET = "mlrice" @@ -363,14 +356,12 @@ def __init__( *args, **kwargs, ): - """Add set_chunk_size_mb to strax.StorageBackend - to allow changing the chunk.target_size_mb - returned from the loader, any args or kwargs - are passed to the strax.StorageBackend. + """Add set_chunk_size_mb to strax.StorageBackend to allow changing the chunk.target_size_mb + returned from the loader, any args or kwargs are passed to the strax.StorageBackend. :param bucket_name: Name of the bucket used as storage - :param set_target_chunk_mb: Prior to returning the loaders' chunks, - return the chunk with an updated target size + :param set_target_chunk_mb: Prior to returning the loaders' chunks, return the chunk with an + updated target size """ super().__init__() @@ -379,12 +370,12 @@ def __init__( self.bucket_name = bucket_name def _get_metadata(self, dirname): - """ - Retrieve metadata for a given directory in S3. + """Retrieve metadata for a given directory in S3. :param dirname: The directory name in S3 where metadata is stored. :return: Dictionary containing metadata information. :raises strax.DataCorrupted: If metadata is missing or corrupted. + """ prefix = dirname_to_prefix(dirname) metadata_json = f"{prefix}-metadata.json" @@ -400,8 +391,10 @@ def _get_metadata(self, dirname): # (if it's not there, just let it raise FileNotFound # with the usual message in the next stage) old_md_path = osp.join(dirname, "metadata.json") - if self.s3.list_objects_v2(Bucket=self.bucket_name, - Prefix=old_md_path)["KeyCount"] == 0: + if ( + self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=old_md_path)["KeyCount"] + == 0 + ): raise strax.DataCorrupted(f"Data in {dirname} has no metadata") md_path = old_md_path @@ -412,10 +405,10 @@ def _get_metadata(self, dirname): # return json.loads(f.read()) def _read_and_format_chunk(self, *args, **kwargs): - """ - Read a data chunk and optionally update its target size. + """Read a data chunk and optionally update its target size. :return: Formatted data chunk. + """ chunk = super()._read_and_format_chunk(*args, **kwargs) if self.set_chunk_size_mb: @@ -423,30 +416,33 @@ def _read_and_format_chunk(self, *args, **kwargs): return chunk def _read_chunk(self, dirname, chunk_info, dtype, compressor): - """ - Read a chunk of data from S3. + """Read a chunk of data from S3. :param dirname: Directory in S3 containing the chunk. :param chunk_info: Dictionary containing chunk metadata. :param dtype: Data type of the chunk. :param compressor: Compression format used for the chunk. :return: Loaded data chunk. + """ fn = osp.join(dirname, chunk_info["filename"]) - return strax.load_file(fn, dtype=dtype, - compressor=compressor, - S3_client=self.s3, - bucket_name = self.bucket_name, - is_s3_path=True) + return strax.load_file( + fn, + dtype=dtype, + compressor=compressor, + S3_client=self.s3, + bucket_name=self.bucket_name, + is_s3_path=True, + ) def _saver(self, dirname, metadata, **kwargs): - """ - Create a saver object for writing data to S3. + """Create a saver object for writing data to S3. :param dirname: Directory in S3 where data will be stored. :param metadata: Metadata dictionary associated with the data. :param kwargs: Additional keyword arguments for the saver. :return: An instance of `S3Saver`. + """ parent_dir = os.path.abspath(os.path.join(dirname, os.pardir)) @@ -456,9 +452,10 @@ def _saver(self, dirname, metadata, **kwargs): @export class S3Saver(strax.Saver): - """ - A saver class that writes data chunks to an S3-compatible storage backend. + """A saver class that writes data chunks to an S3-compatible storage backend. + Supports metadata management and chunked data saving. + """ json_options = dict(sort_keys=True, indent=4) @@ -466,13 +463,13 @@ class S3Saver(strax.Saver): _flush_md_for_every_chunk = True def __init__(self, dirname, s3, bucket_name, metadata, **kwargs): - """ - Initialize the S3Saver instance. + """Initialize the S3Saver instance. :param dirname: Directory path (prefix) in S3 where data is stored. :param s3: Boto3 S3 client instance. :param metadata: Metadata dictionary associated with the data. :param kwargs: Additional keyword arguments for the saver. + """ super().__init__(metadata=metadata) self.dirname = dirname @@ -488,7 +485,10 @@ def __init__(self, dirname, s3, bucket_name, metadata, **kwargs): if self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=dirname)["KeyCount"] == 1: print(f"Removing data in {dirname} to overwrite") self.s3.delete_object(Bucket=self.bucket_name, Key=dirname) - if self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=self.tempdirname)["KeyCount"] == 1: + if ( + self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=self.tempdirname)["KeyCount"] + == 1 + ): print(f"Removing old incomplete data in {self.tempdirname}") self.s3.delete_object(Bucket=self.bucket_name, Key=dirname) # os.makedirs(self.tempdirname) @@ -505,11 +505,11 @@ def _flush_metadata(self): self.s3.put_object(Bucket=self.bucket_name, Key=metadata_key, Body=metadata_content) def _chunk_filename(self, chunk_info): - """ - Generate a filename for a given chunk. + """Generate a filename for a given chunk. :param chunk_info: Dictionary containing chunk metadata. :return: Filename string. + """ if "filename" in chunk_info: return chunk_info["filename"] @@ -517,13 +517,13 @@ def _chunk_filename(self, chunk_info): return f"{self.prefix}-{ichunk}" def _save_chunk(self, data, chunk_info, executor=None): - """ - Save a chunk of data to S3. + """Save a chunk of data to S3. :param data: Data chunk to be saved. :param chunk_info: Metadata dictionary for the chunk. :param executor: Optional executor for parallel writes. :return: Chunk metadata dictionary. + """ filename = self._chunk_filename(chunk_info) @@ -539,10 +539,10 @@ def _save_chunk(self, data, chunk_info, executor=None): ) def _save_chunk_metadata(self, chunk_info): - """ - Save metadata associated with a data chunk. + """Save metadata associated with a data chunk. :param chunk_info: Dictionary containing chunk metadata. + """ is_first = chunk_info["chunk_i"] == 0 if is_first: @@ -562,9 +562,7 @@ def _save_chunk_metadata(self, chunk_info): self._flush_metadata() def _close(self): - """ - Finalize the saving process by merging temp data and flushing metadata. - """ + """Finalize the saving process by merging temp data and flushing metadata.""" try: response = self.s3.list_objects_v2(Bucket=self.s3.bucket_name, Prefix=self.tempdirname) if "Contents" not in response or len(response["Contents"]) == 0: @@ -600,11 +598,11 @@ def _close(self): raise def _rename_s3_folder(self, tempdirname, dirname): - """ - Rename the temporary directory to the final storage location in S3. + """Rename the temporary directory to the final storage location in S3. :param tempdirname: Temporary directory path in S3. :param dirname: Final directory path in S3. + """ response = self.s3.list_objects_v2(Bucket=self.s3.bucket_name, Prefix=tempdirname) for obj in response.get("Contents", []): From 55fe59b0d1294f3b12b3e755a36346e93264b85e Mon Sep 17 00:00:00 2001 From: Luis Sanchez Date: Mon, 3 Mar 2025 17:08:23 -0600 Subject: [PATCH 09/24] remove uneeded imports + reduce line length --- strax/io.py | 2 +- strax/storage/s3.py | 9 +++------ 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/strax/io.py b/strax/io.py index c5f16a7a8..4c04bad03 100644 --- a/strax/io.py +++ b/strax/io.py @@ -109,7 +109,7 @@ def load_file_from_s3(key, compressor, dtype, bucket_name): s3 = strax.S3Frontend().s3 try: - data = COMPRESSORS[compressor]["_decompress"](f) + data = COMPRESSORS[compressor]["_decompress"](key) if not len(data): return np.zeros(0, dtype=dtype) diff --git a/strax/storage/s3.py b/strax/storage/s3.py index 8cf9442ec..818adc04a 100644 --- a/strax/storage/s3.py +++ b/strax/storage/s3.py @@ -1,14 +1,11 @@ -import glob import json import os import os.path as osp from typing import Optional from bson import json_util -import shutil import boto3 from botocore.exceptions import ClientError from botocore.client import Config -import configparser import strax from .common import StorageFrontend @@ -447,7 +444,7 @@ def _saver(self, dirname, metadata, **kwargs): parent_dir = os.path.abspath(os.path.join(dirname, os.pardir)) - return S3Saver(dirname, self.s3, self.bucket_name, metadata=metadata, **kwargs) + return S3Saver(parent_dir, self.s3, self.bucket_name, metadata=metadata, **kwargs) @export @@ -498,7 +495,7 @@ def _flush_metadata(self): # Convert the metadata dictionary to a JSON string metadata_content = json.dumps(self.md, **self.json_options) - # Define the S3 key for the metadata file (similar to a file path in a traditional file system) + # Define the S3 key for the metadata file metadata_key = f"{self.tempdirname}/{self.metadata_json}" # Upload the metadata to S3 @@ -590,7 +587,7 @@ def _close(self): # Flush metadata (this would be another method to handle your metadata saving logic) self._flush_metadata() - # Rename the directory by copying all files from tempdirname to dirname and deleting from tempdirname + # Rename directory by copying all files from tempdirname to dirname self._rename_s3_folder(self.tempdirname, self.dirname) except ClientError as e: From c5cce8e0c64ef1194fdae117ad28204c750fe836 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 3 Mar 2025 23:11:06 +0000 Subject: [PATCH 10/24] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- strax/storage/s3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/strax/storage/s3.py b/strax/storage/s3.py index 818adc04a..3cc4a64bf 100644 --- a/strax/storage/s3.py +++ b/strax/storage/s3.py @@ -495,7 +495,7 @@ def _flush_metadata(self): # Convert the metadata dictionary to a JSON string metadata_content = json.dumps(self.md, **self.json_options) - # Define the S3 key for the metadata file + # Define the S3 key for the metadata file metadata_key = f"{self.tempdirname}/{self.metadata_json}" # Upload the metadata to S3 From 107f0d9d4e3837d93b9022cdd12761a38668871e Mon Sep 17 00:00:00 2001 From: Luis Sanchez Date: Mon, 3 Mar 2025 22:28:56 -0600 Subject: [PATCH 11/24] fix style --- strax/storage/s3.py | 4 ++-- tests/test_storage.py | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/strax/storage/s3.py b/strax/storage/s3.py index 3cc4a64bf..693a7afaf 100644 --- a/strax/storage/s3.py +++ b/strax/storage/s3.py @@ -32,8 +32,8 @@ class S3Frontend(StorageFrontend): def __init__( self, - s3_access_key_id: str = None, - s3_secret_access_key: str = None, + s3_access_key_id: str = "", + s3_secret_access_key: str = "", endpoint_url: str = "https://rice1.osn.mghpcc.org/", path: str = "", bucket_name: str = "", diff --git a/tests/test_storage.py b/tests/test_storage.py index 4e525eb15..a8cb6a8dc 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -36,7 +36,10 @@ def test_write_data_s3(self): self.st.storage = [strax.S3Frontend(self.path)] run_id = "0" self.st.make(run_id, self.target) - assert self.st.is_stored(run_id, self.target) + if self.st.storage.s3_access_key_id != None: + assert self.st.is_stored(run_id, self.target) + else: + pass def test_complain_run_id(self): self.st.storage = [strax.DataDirectory(self.path)] From d653133782a4fee0e3f64075172e64b43526291a Mon Sep 17 00:00:00 2001 From: Luis Sanchez <45677170+LuisSanchez25@users.noreply.github.com> Date: Mon, 3 Mar 2025 22:40:11 -0600 Subject: [PATCH 12/24] Update io.py formating fix --- strax/io.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/strax/io.py b/strax/io.py index 4c04bad03..ff233bd9a 100644 --- a/strax/io.py +++ b/strax/io.py @@ -156,7 +156,7 @@ def save_file(f, data, compressor="zstd", is_s3_path=False): if isinstance(f, str): final_fn = f temp_fn = f + "_temp" - if is_s3_path is False: + if not is_s3_path: with open(temp_fn, mode="wb") as write_file: result = _save_file(write_file, data, compressor) os.rename(temp_fn, final_fn) From 7386f90a750133c6ebd4c4e66b64f83813921cb2 Mon Sep 17 00:00:00 2001 From: Luis Sanchez Date: Mon, 3 Mar 2025 23:46:02 -0600 Subject: [PATCH 13/24] fix test --- strax/storage/s3.py | 5 +++++ tests/test_storage.py | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/strax/storage/s3.py b/strax/storage/s3.py index 693a7afaf..76376bdf1 100644 --- a/strax/storage/s3.py +++ b/strax/storage/s3.py @@ -73,6 +73,11 @@ def __init__( # Initialized connection to S3 storage self.s3 = boto3.client(**self.boto3_client_kwargs) self.backends = [S3Backend(self.bucket_name, **self.boto3_client_kwargs)] + + if s3_access_key_id != "": + self.is_configed = True + else: + self.is_configed = False def _run_meta_path(self, run_id: str) -> str: """Generate the metadata file path for a given run ID. diff --git a/tests/test_storage.py b/tests/test_storage.py index a8cb6a8dc..c9d9cafd8 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -36,7 +36,7 @@ def test_write_data_s3(self): self.st.storage = [strax.S3Frontend(self.path)] run_id = "0" self.st.make(run_id, self.target) - if self.st.storage.s3_access_key_id != None: + if self.st.storage[0].is_configed != "": assert self.st.is_stored(run_id, self.target) else: pass From d9aa5b682b0d8565d7df13a15f6e3ad01c60ed85 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 4 Mar 2025 05:46:47 +0000 Subject: [PATCH 14/24] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- strax/storage/s3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/strax/storage/s3.py b/strax/storage/s3.py index 76376bdf1..a7bc84525 100644 --- a/strax/storage/s3.py +++ b/strax/storage/s3.py @@ -73,7 +73,7 @@ def __init__( # Initialized connection to S3 storage self.s3 = boto3.client(**self.boto3_client_kwargs) self.backends = [S3Backend(self.bucket_name, **self.boto3_client_kwargs)] - + if s3_access_key_id != "": self.is_configed = True else: From c3060235c77581fbde5baf4cd83d8595ee7c8bcb Mon Sep 17 00:00:00 2001 From: Luis Sanchez Date: Tue, 4 Mar 2025 00:26:44 -0600 Subject: [PATCH 15/24] add self.bucket_name --- strax/storage/s3.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/strax/storage/s3.py b/strax/storage/s3.py index a7bc84525..7b470f826 100644 --- a/strax/storage/s3.py +++ b/strax/storage/s3.py @@ -57,6 +57,7 @@ def __init__( super().__init__(*args, **kwargs) self.path = path self.deep_scan = deep_scan + self.bucket_name = bucket_name # Configure S3 client self.boto3_client_kwargs = { @@ -67,9 +68,6 @@ def __init__( "config": Config(connect_timeout=5, retries={"max_attempts": 10}), } - if bucket_name != "": - self.bucket_name = bucket_name - # Initialized connection to S3 storage self.s3 = boto3.client(**self.boto3_client_kwargs) self.backends = [S3Backend(self.bucket_name, **self.boto3_client_kwargs)] From f8d2bee43f12f681fc1448ddc9973817d1510cad Mon Sep 17 00:00:00 2001 From: Luis Sanchez Date: Wed, 5 Mar 2025 01:27:10 -0600 Subject: [PATCH 16/24] Remove unnecessary functions --- strax/io.py | 20 ++++++----- strax/storage/s3.py | 81 +++++++++++++++------------------------------ 2 files changed, 38 insertions(+), 63 deletions(-) diff --git a/strax/io.py b/strax/io.py index ff233bd9a..4f12fbbfe 100644 --- a/strax/io.py +++ b/strax/io.py @@ -81,14 +81,13 @@ def _lz4_decompress(f, buffer_size=DECOMPRESS_BUFFER_SIZE): @export -def load_file(f, compressor, dtype, S3_client=None, bucket_name=None, is_s3_path=False): +def load_file(f, compressor, dtype, bucket_name=None, is_s3_path=False): """Read and return data from file or S3. :param f: file name or handle to read from :param compressor: compressor to use for decompressing. If not passed, will try to load it from json metadata file. :param dtype: numpy dtype of data to load - :param S3_client: (optional) S3 client to find and store data :param is_s3_path: Boolean indicating if the file is stored in S3. """ @@ -104,17 +103,20 @@ def load_file(f, compressor, dtype, S3_client=None, bucket_name=None, is_s3_path return _load_file(f, compressor, dtype) -def load_file_from_s3(key, compressor, dtype, bucket_name): - """Helper function to load data from S3.""" +def load_file_from_s3(f, compressor, dtype, bucket_name): + """ + Helper function to load data from S3. + Confirm file exists, then try to load and decompress it. + """ s3 = strax.S3Frontend().s3 try: - data = COMPRESSORS[compressor]["_decompress"](key) - if not len(data): - return np.zeros(0, dtype=dtype) + #data = COMPRESSORS[compressor]["_decompress"](f) + #if not len(data): + # return np.zeros(0, dtype=dtype) # Retrieve the file from S3 and load into a BytesIO buffer - response = s3.get_object(Bucket=bucket_name, Key=key) + response = s3.get_object(Bucket=bucket_name, Key=f) file_data = response["Body"].read() # Read the content of the file from S3 # Create a file-like object from the binary data @@ -122,7 +124,7 @@ def load_file_from_s3(key, compressor, dtype, bucket_name): return _load_file(file_buffer, compressor, dtype) except ClientError as e: - raise RuntimeError(f"Failed to load {key} from bucket {bucket_name}: {e}") + raise RuntimeError(f"Failed to load {f} from bucket {bucket_name}: {e}") def _load_file(f, compressor, dtype): diff --git a/strax/storage/s3.py b/strax/storage/s3.py index 7b470f826..04bef5d19 100644 --- a/strax/storage/s3.py +++ b/strax/storage/s3.py @@ -37,7 +37,6 @@ def __init__( endpoint_url: str = "https://rice1.osn.mghpcc.org/", path: str = "", bucket_name: str = "", - deep_scan: bool = False, *args, **kwargs, ): @@ -56,7 +55,6 @@ def __init__( """ super().__init__(*args, **kwargs) self.path = path - self.deep_scan = deep_scan self.bucket_name = bucket_name # Configure S3 client @@ -84,9 +82,10 @@ def _run_meta_path(self, run_id: str) -> str: :return: The path where the metadata is stored. """ + # Works but not sure if needed return osp.join(self.path, RUN_METADATA_PATTERN % run_id) - def run_metadata(self, run_id: str, projection=None) -> dict: + def run_metadata(self, run_id: str = "", data_type=None) -> dict: """Retrieve metadata for a given run from S3. Parameters @@ -94,28 +93,29 @@ def run_metadata(self, run_id: str, projection=None) -> dict: run_id : (str) The identifier of the run. - projection : + data_type : (str) Fields to extract from metadata (optional). :return: Run metadata as a dictionary. """ - path = self._run_meta_path(run_id) + # Works # Checks if metadata exists - if self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=path)["KeyCount"] == 0: + if self.s3.list_objects_v2(Bucket=self.bucket_name)["KeyCount"] == 0: raise strax.RunMetadataNotAvailable( f"No file at {path}, cannot find run metadata for {run_id}" ) # Retrieve metadata - response = self.s3.get_object(Bucket=self.bucket_name, Key=path) - metadata_content = response["Body"].read().decode("utf-8") - md = json.loads(metadata_content, object_hook=json_util.object_hook) - md = strax.flatten_run_metadata(md) + response = self.s3.get_object(Bucket=self.bucket_name) + metadata_files = [obj['Key'] for obj in response.get('Contents', []) if obj['Key'].endswith('metadata.json')] + + if run_id != "": + metadata_files = [file for file in metadata_files if run_id in file] if projection is not None: - md = {key: value for key, value in md.items() if key in projection} - return md + metadata_files = [file for file in metadata_files if data_type in file] + return metadata_files def write_run_metadata(self, run_id: str, metadata: dict): """Write metadata for a specific run to S3. @@ -124,6 +124,8 @@ def write_run_metadata(self, run_id: str, metadata: dict): :param metadata: The metadata dictionary to store. """ + # Need to check, is this necessary? + if "name" not in metadata: metadata["name"] = run_id @@ -136,10 +138,12 @@ def write_run_metadata(self, run_id: str, metadata: dict): def s3_object_exists(self, key) -> bool: """Check if a given object exists in the S3 bucket. - :param key: The object key to check. + :param key: The object key to check. [run_id-data_type-lineage] :return: True if the object exists, otherwise False. """ + # Works as expected + try: response = self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=key, Delimiter="/") @@ -174,29 +178,21 @@ def _scan_runs(self, store_fields): found.add(run_id) yield self.run_metadata(run_id, projection=store_fields) - # Preform deepscan if enabled - if self.deep_scan: - for fn in self._subfolders(): - run_id = self._parse_folder_name(fn)[0] - if run_id not in found: - found.add(run_id) - yield dict(name=run_id) - - def _find(self, key, write, allow_incomplete, fuzzy_for, fuzzy_for_options): + def _find(self, key, write, allow_incomplete, fuzzy_for = None, fuzzy_for_options = None, **kwargs): """Find the appropriate storage key for a given dataset. :param key: The dataset key. :param write: Whether to check for writable access. :param allow_incomplete: Allow incomplete datasets. - :param fuzzy_for: Parameters for fuzzy search. - :param fuzzy_for_options: Additional fuzzy search options. + :parm fuzzy_for: Does nothing be retained for compatibility + :parm fuzzy_for_option: does nothing be retained for compatibility :return: The backend key if found, otherwise raises DataNotAvailable. """ self.raise_if_non_compatible_run_id(key.run_id) dirname = osp.join(self.path, str(key)) - exists = self.s3_object_exists(self.bucket_name, dirname) + exists = self.s3_object_exists(dirname) bk = self.backend_key(dirname) if write: @@ -206,11 +202,6 @@ def _find(self, key, write, allow_incomplete, fuzzy_for, fuzzy_for_options): if allow_incomplete and not exists: # Check for incomplete data (only exact matching for now) - if fuzzy_for or fuzzy_for_options: - raise NotImplementedError( - "Mixing of fuzzy matching and allow_incomplete not", - " supported by DataDirectory.", - ) tempdirname = dirname + "_temp" bk = self.backend_key(tempdirname) if ( @@ -220,15 +211,9 @@ def _find(self, key, write, allow_incomplete, fuzzy_for, fuzzy_for_options): return bk # Check exact match - if exists and self._folder_matches(dirname, key, None, None): + if exists: return bk - # If fuzzy search is enabled find fuzzy file names - if fuzzy_for or fuzzy_for_options: - for fn in self._subfolders(): - if self._folder_matches(fn, key, fuzzy_for, fuzzy_for_options): - return self.backend_key(fn) - raise strax.DataNotAvailable def _get_config_value(self, variable, option_name): @@ -249,19 +234,7 @@ def _get_config_value(self, variable, option_name): else: return variable - def _subfolders(self): - """Loop over subfolders of self.path that match our folder format.""" - # Trigger if statement if path doesnt exist - if self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=self.path)["KeyCount"] == 0: - return - for dirname in os.listdir(self.path): - try: - self._parse_folder_name(dirname) - except InvalidFolderNameFormat: - continue - yield osp.join(self.path, dirname) - - def _folder_matches(self, fn, key, fuzzy_for, fuzzy_for_options, ignore_name=False): + #def _folder_matches(self, fn, key, fuzzy_for, fuzzy_for_options, ignore_name=False): """Check if a folder matches the required data key. :param fn: Folder name. @@ -271,8 +244,8 @@ def _folder_matches(self, fn, key, fuzzy_for, fuzzy_for_options, ignore_name=Fal :param ignore_name: If True, ignores run name while matching. :return: The run_id if it matches, otherwise False. - """ - # Parse the folder name + + # Parse the folder name, fuzz stuff doesnt make sense here so remove it try: _run_id, _data_type, _hash = self._parse_folder_name(fn) except InvalidFolderNameFormat: @@ -293,6 +266,7 @@ def _folder_matches(self, fn, key, fuzzy_for, fuzzy_for_options, ignore_name=Fal if self._matches(metadata["lineage"], key.lineage, fuzzy_for, fuzzy_for_options): return _run_id return False + """ def backend_key(self, dirname): """Return the backend key representation. @@ -377,6 +351,7 @@ def _get_metadata(self, dirname): :raises strax.DataCorrupted: If metadata is missing or corrupted. """ + # Works prefix = dirname_to_prefix(dirname) metadata_json = f"{prefix}-metadata.json" md_path = osp.join(dirname, metadata_json) @@ -401,8 +376,6 @@ def _get_metadata(self, dirname): response = self.s3.get_object(Bucket=self.bucket_name, Key=md_path) metadata_content = response["Body"].read().decode("utf-8") return json.loads(metadata_content) - # with open(md_path, mode="r") as f: - # return json.loads(f.read()) def _read_and_format_chunk(self, *args, **kwargs): """Read a data chunk and optionally update its target size. From e820701fc717a809ba69acea23944b619be74bb9 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 5 Mar 2025 07:28:40 +0000 Subject: [PATCH 17/24] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- strax/io.py | 9 +++++---- strax/storage/s3.py | 20 ++++++++++++-------- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/strax/io.py b/strax/io.py index 4f12fbbfe..7d95c0368 100644 --- a/strax/io.py +++ b/strax/io.py @@ -104,15 +104,16 @@ def load_file(f, compressor, dtype, bucket_name=None, is_s3_path=False): def load_file_from_s3(f, compressor, dtype, bucket_name): - """ - Helper function to load data from S3. + """Helper function to load data from S3. + Confirm file exists, then try to load and decompress it. + """ s3 = strax.S3Frontend().s3 try: - #data = COMPRESSORS[compressor]["_decompress"](f) - #if not len(data): + # data = COMPRESSORS[compressor]["_decompress"](f) + # if not len(data): # return np.zeros(0, dtype=dtype) # Retrieve the file from S3 and load into a BytesIO buffer diff --git a/strax/storage/s3.py b/strax/storage/s3.py index 04bef5d19..1be0d682b 100644 --- a/strax/storage/s3.py +++ b/strax/storage/s3.py @@ -108,7 +108,11 @@ def run_metadata(self, run_id: str = "", data_type=None) -> dict: # Retrieve metadata response = self.s3.get_object(Bucket=self.bucket_name) - metadata_files = [obj['Key'] for obj in response.get('Contents', []) if obj['Key'].endswith('metadata.json')] + metadata_files = [ + obj["Key"] + for obj in response.get("Contents", []) + if obj["Key"].endswith("metadata.json") + ] if run_id != "": metadata_files = [file for file in metadata_files if run_id in file] @@ -142,7 +146,7 @@ def s3_object_exists(self, key) -> bool: :return: True if the object exists, otherwise False. """ - # Works as expected + # Works as expected try: response = self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=key, Delimiter="/") @@ -178,14 +182,14 @@ def _scan_runs(self, store_fields): found.add(run_id) yield self.run_metadata(run_id, projection=store_fields) - def _find(self, key, write, allow_incomplete, fuzzy_for = None, fuzzy_for_options = None, **kwargs): + def _find(self, key, write, allow_incomplete, fuzzy_for=None, fuzzy_for_options=None, **kwargs): """Find the appropriate storage key for a given dataset. :param key: The dataset key. :param write: Whether to check for writable access. - :param allow_incomplete: Allow incomplete datasets. - :parm fuzzy_for: Does nothing be retained for compatibility - :parm fuzzy_for_option: does nothing be retained for compatibility + :param allow_incomplete: Allow incomplete datasets. :parm fuzzy_for: Does nothing be + retained for compatibility :parm fuzzy_for_option: does nothing be retained for + compatibility :return: The backend key if found, otherwise raises DataNotAvailable. """ @@ -234,7 +238,7 @@ def _get_config_value(self, variable, option_name): else: return variable - #def _folder_matches(self, fn, key, fuzzy_for, fuzzy_for_options, ignore_name=False): + # def _folder_matches(self, fn, key, fuzzy_for, fuzzy_for_options, ignore_name=False): """Check if a folder matches the required data key. :param fn: Folder name. @@ -244,7 +248,7 @@ def _get_config_value(self, variable, option_name): :param ignore_name: If True, ignores run name while matching. :return: The run_id if it matches, otherwise False. - + # Parse the folder name, fuzz stuff doesnt make sense here so remove it try: _run_id, _data_type, _hash = self._parse_folder_name(fn) From 3770688024876e2b2a02367330e93b5fc511d3e1 Mon Sep 17 00:00:00 2001 From: Luis Sanchez Date: Wed, 5 Mar 2025 01:31:32 -0600 Subject: [PATCH 18/24] remove unused variables --- strax/storage/s3.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/strax/storage/s3.py b/strax/storage/s3.py index 1be0d682b..8f97591c6 100644 --- a/strax/storage/s3.py +++ b/strax/storage/s3.py @@ -103,7 +103,7 @@ def run_metadata(self, run_id: str = "", data_type=None) -> dict: # Checks if metadata exists if self.s3.list_objects_v2(Bucket=self.bucket_name)["KeyCount"] == 0: raise strax.RunMetadataNotAvailable( - f"No file at {path}, cannot find run metadata for {run_id}" + f"No file found, cannot find run metadata for {run_id}" ) # Retrieve metadata @@ -117,7 +117,7 @@ def run_metadata(self, run_id: str = "", data_type=None) -> dict: if run_id != "": metadata_files = [file for file in metadata_files if run_id in file] - if projection is not None: + if data_type is not None: metadata_files = [file for file in metadata_files if data_type in file] return metadata_files From c0a343ff90dd63c7d6ac9eed34f245878eacee09 Mon Sep 17 00:00:00 2001 From: Luis Sanchez Date: Wed, 5 Mar 2025 09:55:12 -0600 Subject: [PATCH 19/24] fix return output + add bucket default name --- strax/storage/s3.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/strax/storage/s3.py b/strax/storage/s3.py index 8f97591c6..47f961dce 100644 --- a/strax/storage/s3.py +++ b/strax/storage/s3.py @@ -1,7 +1,7 @@ import json import os import os.path as osp -from typing import Optional +from typing import Optional, List from bson import json_util import boto3 from botocore.exceptions import ClientError @@ -36,7 +36,7 @@ def __init__( s3_secret_access_key: str = "", endpoint_url: str = "https://rice1.osn.mghpcc.org/", path: str = "", - bucket_name: str = "", + bucket_name: str = "mlrice", *args, **kwargs, ): @@ -85,7 +85,7 @@ def _run_meta_path(self, run_id: str) -> str: # Works but not sure if needed return osp.join(self.path, RUN_METADATA_PATTERN % run_id) - def run_metadata(self, run_id: str = "", data_type=None) -> dict: + def run_metadata(self, run_id: str = "", data_type=None) -> List: """Retrieve metadata for a given run from S3. Parameters @@ -119,6 +119,7 @@ def run_metadata(self, run_id: str = "", data_type=None) -> dict: if data_type is not None: metadata_files = [file for file in metadata_files if data_type in file] + # Things here are expected to return a dictionary, maybe I should look into it return metadata_files def write_run_metadata(self, run_id: str, metadata: dict): From 514002bad5b630194d7433b71a2039a22fb01a76 Mon Sep 17 00:00:00 2001 From: Luis Sanchez Date: Tue, 6 May 2025 16:41:44 -0500 Subject: [PATCH 20/24] fix broken functions after update --- pyproject.toml | 4 +-- strax/io.py | 23 +++++++---------- strax/storage/files.py | 2 +- strax/storage/s3.py | 57 +++++++++++++++++++++--------------------- tests/test_storage.py | 2 +- 5 files changed, 42 insertions(+), 46 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 22e2d9e2c..62decd6a5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,8 +26,8 @@ rechunker = "strax.scripts.rechunker:main" [tool.poetry.dependencies] python = ">=3.10,<3.13" blosc = "*" -boto3 = "*" -botocore = "*" +boto3 = "=1.33.13" +botocore = "=1.33.13" click = "*" deepdiff = "*" dill = "*" diff --git a/strax/io.py b/strax/io.py index ccc05a0b3..86ffa6c85 100644 --- a/strax/io.py +++ b/strax/io.py @@ -147,7 +147,7 @@ def _load_file(f, compressor, dtype): @export -def save_file(f, data, compressor="zstd", is_s3_path=False): +def save_file(f, data, compressor="zstd", s3_client = None, Bucket = None, is_s3_path=False): """Save data to file and return number of bytes written. :param f: file name or handle to save to @@ -165,22 +165,17 @@ def save_file(f, data, compressor="zstd", is_s3_path=False): os.rename(temp_fn, final_fn) return result else: - s3_interface = strax.S3Frontend( - s3_access_key_id=None, - s3_secret_access_key=None, - path="", - deep_scan=False, - ) + s3_interface = s3_client # Copy temp file to final file - result = _save_file_to_s3(s3_interface, temp_fn, data, compressor) - s3_interface.s3.copy_object( - Bucket=s3_interface.BUCKET, + result = _save_file_to_s3(s3_interface, temp_fn, data, Bucket, compressor) + s3_interface.copy_object( + Bucket=Bucket, Key=final_fn, - CopySource={"Bucket": s3_interface.BUCKET, "Key": temp_fn}, + CopySource={"Bucket": Bucket, "Key": temp_fn}, ) # Delete the temporary file - s3_interface.s3.delete_object(Bucket=s3_interface.BUCKET, Key=temp_fn) + s3_interface.delete_object(Bucket=Bucket, Key=temp_fn) return result else: @@ -194,7 +189,7 @@ def _save_file(f, data, compressor="zstd"): return len(d_comp) -def _save_file_to_s3(s3_client, key, data, compressor=None): +def _save_file_to_s3(s3_client, key, data, Bucket, compressor=None): # Use this method to save file directly to S3 # If compression is needed, handle it here # Use `BytesIO` to handle binary data in-memory @@ -210,7 +205,7 @@ def _save_file_to_s3(s3_client, key, data, compressor=None): buffer.seek(0) # Reset the buffer to the beginning # Upload buffer to S3 under the specified key - s3_client.s3.put_object(Bucket=s3_client.BUCKET, Key=key, Body=buffer.getvalue()) + s3_client.put_object(Bucket=Bucket, Key=key, Body=buffer.getvalue()) return len(data) diff --git a/strax/storage/files.py b/strax/storage/files.py index 6b8275343..43da3fad9 100644 --- a/strax/storage/files.py +++ b/strax/storage/files.py @@ -46,7 +46,7 @@ def __init__(self, path=".", *args, deep_scan=False, **kwargs): def _run_meta_path(self, run_id): return osp.join(self.path, RUN_METADATA_PATTERN % run_id) - def run_metadata(self, run_id, projection=None): + def run_metadata(self, run_id: str = "", projection=None): path = self._run_meta_path(run_id) if not osp.exists(path): raise strax.RunMetadataNotAvailable( diff --git a/strax/storage/s3.py b/strax/storage/s3.py index 47f961dce..ff0a9384f 100644 --- a/strax/storage/s3.py +++ b/strax/storage/s3.py @@ -13,7 +13,7 @@ export, __all__ = strax.exporter() RUN_METADATA_PATTERN = "%s-metadata.json" -BUCKET_NAME = "mlrice" +BUCKET_NAME = "cdt6-pub" @export @@ -28,15 +28,16 @@ class S3Frontend(StorageFrontend): can_define_runs = True provide_run_metadata = False provide_superruns = True - BUCKET = "mlrice" + BUCKET = "cdt6-pub" def __init__( self, - s3_access_key_id: str = "", - s3_secret_access_key: str = "", - endpoint_url: str = "https://rice1.osn.mghpcc.org/", - path: str = "", - bucket_name: str = "mlrice", + s3_access_key_id: str = "WTELWQ3XIAUGVZ15TGOH", + s3_secret_access_key: str = "0Q64rU0pJxDlFX7Bd3LulVwFGQtZtLGRHYPMvCmx", + endpoint_url: str = "https://rice1.osn.mghpcc.org", + path: str = "/xenonnt", + bucket_name: str = "cdt6-pub", + deep_scan=False, *args, **kwargs, ): @@ -68,7 +69,7 @@ def __init__( # Initialized connection to S3 storage self.s3 = boto3.client(**self.boto3_client_kwargs) - self.backends = [S3Backend(self.bucket_name, **self.boto3_client_kwargs)] + self.backends = [S3Backend(self.bucket_name, self.path, **self.boto3_client_kwargs)] if s3_access_key_id != "": self.is_configed = True @@ -194,7 +195,6 @@ def _find(self, key, write, allow_incomplete, fuzzy_for=None, fuzzy_for_options= :return: The backend key if found, otherwise raises DataNotAvailable. """ - self.raise_if_non_compatible_run_id(key.run_id) dirname = osp.join(self.path, str(key)) exists = self.s3_object_exists(dirname) @@ -290,11 +290,11 @@ def remove(self, key): def _parse_folder_name(fn): """Return (run_id, data_type, hash) if folder name matches DataDirectory convention, raise InvalidFolderNameFormat otherwise.""" - stuff = osp.normpath(fn).split(os.sep)[-1].split("-") - if len(stuff) != 3: + keys = osp.normpath(fn).split(os.sep)[-1].split("-") + if len(keys) != 3: # This is not a folder with strax data raise InvalidFolderNameFormat(fn) - return stuff + return keys @staticmethod def raise_if_non_compatible_run_id(run_id): @@ -326,11 +326,12 @@ class S3Backend(strax.StorageBackend): """ - BUCKET = "mlrice" + BUCKET = "cdt6-pub" def __init__( self, bucket_name, + path, set_target_chunk_mb: Optional[int] = None, *args, **kwargs, @@ -344,6 +345,7 @@ def __init__( """ super().__init__() + self.path = path self.s3 = boto3.client(**kwargs) self.set_chunk_size_mb = set_target_chunk_mb self.bucket_name = bucket_name @@ -422,9 +424,7 @@ def _saver(self, dirname, metadata, **kwargs): :return: An instance of `S3Saver`. """ - - parent_dir = os.path.abspath(os.path.join(dirname, os.pardir)) - + parent_dir = os.path.join(self.path, dirname) return S3Saver(parent_dir, self.s3, self.bucket_name, metadata=metadata, **kwargs) @@ -508,12 +508,12 @@ def _save_chunk(self, data, chunk_info, executor=None): fn = os.path.join(self.tempdirname, filename) kwargs = dict(data=data, compressor=self.md["compressor"]) if executor is None: - filesize = strax.save_file(fn, is_s3_path=True, **kwargs) + filesize = strax.save_file(fn, s3_client = self.s3, Bucket = self.bucket_name, is_s3_path=True, **kwargs) return dict(filename=filename, filesize=filesize), None else: # Might need to add some s3 stuff here return dict(filename=filename), executor.submit( - strax.save_file, fn, is_s3_path=True, **kwargs + strax.save_file, fn, s3_client = self.s3, Bucket = self.bucket_name, is_s3_path=True, **kwargs ) def _save_chunk_metadata(self, chunk_info): @@ -532,7 +532,7 @@ def _save_chunk_metadata(self, chunk_info): filename = self._chunk_filename(chunk_info) fn = f"{self.tempdirname}/metadata_{filename}.json" metadata_content = json.dump(chunk_info, **self.json_options) - self.s3.put_object(Bucket=self.s3.bucket_name, Key=fn, Body=metadata_content) + self.s3.put_object(Bucket=self.bucket_name, Key=fn, Body=metadata_content) if not self.is_forked or is_first: self.md["chunks"].append(chunk_info) @@ -542,7 +542,7 @@ def _save_chunk_metadata(self, chunk_info): def _close(self): """Finalize the saving process by merging temp data and flushing metadata.""" try: - response = self.s3.list_objects_v2(Bucket=self.s3.bucket_name, Prefix=self.tempdirname) + response = self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=self.tempdirname) if "Contents" not in response or len(response["Contents"]) == 0: raise RuntimeError( f"{self.tempdirname} was already renamed to {self.dirname}. " @@ -553,17 +553,17 @@ def _close(self): # List the files in the temporary directory matching metadata_*.json response = self.s3.list_objects_v2( - Bucket=self.s3.bucket_name, Prefix=f"{self.tempdirname}/metadata_" + Bucket=self.bucket_name, Prefix=f"{self.tempdirname}/metadata_" ) for obj in response.get("Contents", []): key = obj["Key"] # Download each metadata file, process, and delete from tempdirname - metadata_object = self.s3.get_object(Bucket=self.s3.bucket_name, Key=key) + metadata_object = self.s3.get_object(Bucket=self.bucket_name, Key=key) metadata_content = metadata_object["Body"].read().decode("utf-8") self.md["chunks"].append(json.loads(metadata_content)) # Optionally, delete the metadata file after processing - self.s3.delete_object(Bucket=self.s3.bucket_name, Key=key) + self.s3.delete_object(Bucket=self.bucket_name, Key=key) # Flush metadata (this would be another method to handle your metadata saving logic) self._flush_metadata() @@ -582,23 +582,24 @@ def _rename_s3_folder(self, tempdirname, dirname): :param dirname: Final directory path in S3. """ - response = self.s3.list_objects_v2(Bucket=self.s3.bucket_name, Prefix=tempdirname) + response = self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=tempdirname) for obj in response.get("Contents", []): key = obj["Key"] # Copy each file from the temporary directory to the final directory new_key = key.replace(tempdirname, dirname) self.s3.copy_object( - Bucket=self.s3.bucket_name, - CopySource={"Bucket": self.s3.bucket_name, "Key": key}, + Bucket=self.bucket_name, + CopySource={"Bucket": self.bucket_name, "Key": key}, Key=new_key, ) # Delete the file from the temporary directory - self.s3.delete_object(Bucket=self.s3.bucket_name, Key=key) + self.s3.delete_object(Bucket=self.bucket_name, Key=key) # Delete the temporary directory - self.s3.delete_object(Bucket=self.s3.bucket_name, Key=tempdirname) + self.s3.delete_object(Bucket=self.bucket_name, Key=tempdirname) @export class InvalidFolderNameFormat(Exception): pass + diff --git a/tests/test_storage.py b/tests/test_storage.py index c9d9cafd8..1794f19d0 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -33,7 +33,7 @@ def test_write_data_dir(self): assert self.st.is_stored(run_id, self.target) def test_write_data_s3(self): - self.st.storage = [strax.S3Frontend(self.path)] + self.st.storage = [strax.S3Frontend()] run_id = "0" self.st.make(run_id, self.target) if self.st.storage[0].is_configed != "": From 4320190cb270dcee3a9c1b682f4c379fa76ff440 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 6 May 2025 21:42:46 +0000 Subject: [PATCH 21/24] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- strax/io.py | 2 +- strax/storage/s3.py | 12 +++++++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/strax/io.py b/strax/io.py index 86ffa6c85..5e2456dfc 100644 --- a/strax/io.py +++ b/strax/io.py @@ -147,7 +147,7 @@ def _load_file(f, compressor, dtype): @export -def save_file(f, data, compressor="zstd", s3_client = None, Bucket = None, is_s3_path=False): +def save_file(f, data, compressor="zstd", s3_client=None, Bucket=None, is_s3_path=False): """Save data to file and return number of bytes written. :param f: file name or handle to save to diff --git a/strax/storage/s3.py b/strax/storage/s3.py index ff0a9384f..dcc80a890 100644 --- a/strax/storage/s3.py +++ b/strax/storage/s3.py @@ -508,12 +508,19 @@ def _save_chunk(self, data, chunk_info, executor=None): fn = os.path.join(self.tempdirname, filename) kwargs = dict(data=data, compressor=self.md["compressor"]) if executor is None: - filesize = strax.save_file(fn, s3_client = self.s3, Bucket = self.bucket_name, is_s3_path=True, **kwargs) + filesize = strax.save_file( + fn, s3_client=self.s3, Bucket=self.bucket_name, is_s3_path=True, **kwargs + ) return dict(filename=filename, filesize=filesize), None else: # Might need to add some s3 stuff here return dict(filename=filename), executor.submit( - strax.save_file, fn, s3_client = self.s3, Bucket = self.bucket_name, is_s3_path=True, **kwargs + strax.save_file, + fn, + s3_client=self.s3, + Bucket=self.bucket_name, + is_s3_path=True, + **kwargs, ) def _save_chunk_metadata(self, chunk_info): @@ -602,4 +609,3 @@ def _rename_s3_folder(self, tempdirname, dirname): @export class InvalidFolderNameFormat(Exception): pass - From f7e56af4217fa2c229fa58888d57cc51d081b302 Mon Sep 17 00:00:00 2001 From: Luis Sanchez Date: Tue, 6 May 2025 17:05:49 -0500 Subject: [PATCH 22/24] change boto3 to compatible version --- pyproject.toml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 62decd6a5..1670ce784 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,8 +26,7 @@ rechunker = "strax.scripts.rechunker:main" [tool.poetry.dependencies] python = ">=3.10,<3.13" blosc = "*" -boto3 = "=1.33.13" -botocore = "=1.33.13" +boto3 = "^1.33.13" click = "*" deepdiff = "*" dill = "*" From 29545d339b299446819440c92fec009d3b6353e8 Mon Sep 17 00:00:00 2001 From: Luis Sanchez Date: Tue, 29 Jul 2025 14:49:58 -0500 Subject: [PATCH 23/24] update files based on copilot recomendation --- strax/io.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/strax/io.py b/strax/io.py index 5e2456dfc..8c497103a 100644 --- a/strax/io.py +++ b/strax/io.py @@ -118,11 +118,12 @@ def load_file_from_s3(f, compressor, dtype, bucket_name): # Retrieve the file from S3 and load into a BytesIO buffer response = s3.get_object(Bucket=bucket_name, Key=f) - file_data = response["Body"].read() # Read the content of the file from S3 + file_buffer = BytesIO() - # Create a file-like object from the binary data - file_buffer = BytesIO(file_data) - return _load_file(file_buffer, compressor, dtype) + for chunk in response["Body"].iter_chunks(chunk_size=DECOMPRESS_BUFFER_SIZE): + file_beffer.write(chunk) + + file_buffer.seek(0) except ClientError as e: raise RuntimeError(f"Failed to load {f} from bucket {bucket_name}: {e}") @@ -137,7 +138,7 @@ def _load_file(f, compressor, dtype): return np.frombuffer(data, dtype=dtype) except ValueError as e: raise ValueError(f"ValueError while loading data with dtype =\n\t{dtype}") from e - except Exception as e: + except (ValueError, KeyError) as e: raise RuntimeError(f"Error loading file: {e}") except Exception: @@ -165,17 +166,17 @@ def save_file(f, data, compressor="zstd", s3_client=None, Bucket=None, is_s3_pat os.rename(temp_fn, final_fn) return result else: - s3_interface = s3_client + #s3_interface = s3_client # Copy temp file to final file - result = _save_file_to_s3(s3_interface, temp_fn, data, Bucket, compressor) - s3_interface.copy_object( + result = _save_file_to_s3(s3_client, temp_fn, data, Bucket, compressor) + s3_client.copy_object( Bucket=Bucket, Key=final_fn, CopySource={"Bucket": Bucket, "Key": temp_fn}, ) # Delete the temporary file - s3_interface.delete_object(Bucket=Bucket, Key=temp_fn) + s3_client.delete_object(Bucket=Bucket, Key=temp_fn) return result else: From 4bb5ddbe8687fc33b794e155bf17e6ca323b30e7 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 29 Jul 2025 19:50:43 +0000 Subject: [PATCH 24/24] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- strax/io.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/strax/io.py b/strax/io.py index 8c497103a..e5d0c1b51 100644 --- a/strax/io.py +++ b/strax/io.py @@ -166,7 +166,7 @@ def save_file(f, data, compressor="zstd", s3_client=None, Bucket=None, is_s3_pat os.rename(temp_fn, final_fn) return result else: - #s3_interface = s3_client + # s3_interface = s3_client # Copy temp file to final file result = _save_file_to_s3(s3_client, temp_fn, data, Bucket, compressor) s3_client.copy_object(