diff --git a/pyproject.toml b/pyproject.toml index aaff2b085..0ea89646c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,6 +26,7 @@ rechunker = "strax.scripts.rechunker:main" [tool.poetry.dependencies] python = ">=3.10,<3.13" blosc = "*" +boto3 = "^1.33.13" click = "*" deepdiff = "*" dill = "*" diff --git a/strax/__init__.py b/strax/__init__.py index 93aa904ff..dd54d0785 100644 --- a/strax/__init__.py +++ b/strax/__init__.py @@ -15,6 +15,7 @@ from .storage.file_rechunker import * from .storage.mongo import * from .storage.zipfiles import * +from .storage.s3 import * from .config import * from .plugins import * diff --git a/strax/io.py b/strax/io.py index 9464e2d8e..e5d0c1b51 100644 --- a/strax/io.py +++ b/strax/io.py @@ -10,9 +10,10 @@ import zstandard import lz4.frame as lz4 from ast import literal_eval +from io import BytesIO +from botocore.exceptions import ClientError import strax -from strax import RUN_METADATA_PATTERN export, __all__ = strax.exporter() __all__.extend(["DECOMPRESS_BUFFER_SIZE"]) @@ -80,22 +81,54 @@ def _lz4_decompress(f, buffer_size=DECOMPRESS_BUFFER_SIZE): @export -def load_file(f, compressor, dtype): - """Read and return data from file. +def load_file(f, compressor, dtype, bucket_name=None, is_s3_path=False): + """Read and return data from file or S3. :param f: file name or handle to read from :param compressor: compressor to use for decompressing. If not passed, will try to load it from json metadata file. :param dtype: numpy dtype of data to load + :param is_s3_path: Boolean indicating if the file is stored in S3. """ - if isinstance(f, str): - with open(f, mode="rb") as write_file: - return _load_file(write_file, compressor, dtype) + if is_s3_path: + # Read from S3 + return load_file_from_s3(f, compressor, dtype, bucket_name) + elif isinstance(f, str): + # Read from local file + with open(f, mode="rb") as read_file: + return _load_file(read_file, compressor, dtype) else: + # If f is already a file-like object, just use it return _load_file(f, compressor, dtype) +def load_file_from_s3(f, compressor, dtype, bucket_name): + """Helper function to load data from S3. + + Confirm file exists, then try to load and decompress it. + + """ + s3 = strax.S3Frontend().s3 + + try: + # data = COMPRESSORS[compressor]["_decompress"](f) + # if not len(data): + # return np.zeros(0, dtype=dtype) + + # Retrieve the file from S3 and load into a BytesIO buffer + response = s3.get_object(Bucket=bucket_name, Key=f) + file_buffer = BytesIO() + + for chunk in response["Body"].iter_chunks(chunk_size=DECOMPRESS_BUFFER_SIZE): + file_beffer.write(chunk) + + file_buffer.seek(0) + + except ClientError as e: + raise RuntimeError(f"Failed to load {f} from bucket {bucket_name}: {e}") + + def _load_file(f, compressor, dtype): try: data = COMPRESSORS[compressor]["_decompress"](f) @@ -105,6 +138,8 @@ def _load_file(f, compressor, dtype): return np.frombuffer(data, dtype=dtype) except ValueError as e: raise ValueError(f"ValueError while loading data with dtype =\n\t{dtype}") from e + except (ValueError, KeyError) as e: + raise RuntimeError(f"Error loading file: {e}") except Exception: raise strax.DataCorrupted( @@ -113,7 +148,7 @@ def _load_file(f, compressor, dtype): @export -def save_file(f, data, compressor="zstd"): +def save_file(f, data, compressor="zstd", s3_client=None, Bucket=None, is_s3_path=False): """Save data to file and return number of bytes written. :param f: file name or handle to save to @@ -121,13 +156,29 @@ def save_file(f, data, compressor="zstd"): :param compressor: compressor to use """ + if isinstance(f, str): final_fn = f temp_fn = f + "_temp" - with open(temp_fn, mode="wb") as write_file: - result = _save_file(write_file, data, compressor) - os.rename(temp_fn, final_fn) - return result + if not is_s3_path: + with open(temp_fn, mode="wb") as write_file: + result = _save_file(write_file, data, compressor) + os.rename(temp_fn, final_fn) + return result + else: + # s3_interface = s3_client + # Copy temp file to final file + result = _save_file_to_s3(s3_client, temp_fn, data, Bucket, compressor) + s3_client.copy_object( + Bucket=Bucket, + Key=final_fn, + CopySource={"Bucket": Bucket, "Key": temp_fn}, + ) + + # Delete the temporary file + s3_client.delete_object(Bucket=Bucket, Key=temp_fn) + + return result else: return _save_file(f, data, compressor) @@ -139,10 +190,40 @@ def _save_file(f, data, compressor="zstd"): return len(d_comp) +def _save_file_to_s3(s3_client, key, data, Bucket, compressor=None): + # Use this method to save file directly to S3 + # If compression is needed, handle it here + # Use `BytesIO` to handle binary data in-memory + assert isinstance(data, np.ndarray), "Please pass a numpy array" + + # Create a binary buffer to simulate writing to a file + buffer = BytesIO() + + # Simulate saving file content (you can compress or directly write data here) + if compressor: + data = COMPRESSORS[compressor]["compress"](data) + buffer.write(data) + buffer.seek(0) # Reset the buffer to the beginning + + # Upload buffer to S3 under the specified key + s3_client.put_object(Bucket=Bucket, Key=key, Body=buffer.getvalue()) + + return len(data) + + +def _compress_blosc(data): + if data.nbytes >= blosc.MAX_BUFFERSIZE: + raise ValueError("Blosc's input buffer cannot exceed ~2 GB") + return blosc.compress(data, shuffle=False) + + +COMPRESSORS["blosc"]["compress"] = _compress_blosc + + @export def dry_load_files(dirname, chunk_numbers=None, disable=False, **kwargs): prefix = strax.storage.files.dirname_to_prefix(dirname) - metadata_json = RUN_METADATA_PATTERN % prefix + metadata_json = f"{prefix}-metadata.json" md_path = os.path.join(dirname, metadata_json) with open(md_path, mode="r") as f: diff --git a/strax/storage/files.py b/strax/storage/files.py index 6b8275343..43da3fad9 100644 --- a/strax/storage/files.py +++ b/strax/storage/files.py @@ -46,7 +46,7 @@ def __init__(self, path=".", *args, deep_scan=False, **kwargs): def _run_meta_path(self, run_id): return osp.join(self.path, RUN_METADATA_PATTERN % run_id) - def run_metadata(self, run_id, projection=None): + def run_metadata(self, run_id: str = "", projection=None): path = self._run_meta_path(run_id) if not osp.exists(path): raise strax.RunMetadataNotAvailable( diff --git a/strax/storage/s3.py b/strax/storage/s3.py new file mode 100644 index 000000000..dcc80a890 --- /dev/null +++ b/strax/storage/s3.py @@ -0,0 +1,611 @@ +import json +import os +import os.path as osp +from typing import Optional, List +from bson import json_util +import boto3 +from botocore.exceptions import ClientError +from botocore.client import Config + +import strax +from .common import StorageFrontend + +export, __all__ = strax.exporter() + +RUN_METADATA_PATTERN = "%s-metadata.json" +BUCKET_NAME = "cdt6-pub" + + +@export +class S3Frontend(StorageFrontend): + """A storage frontend that interacts with an S3-compatible object storage. + + This class handles run-level metadata storage and retrieval, as well as scanning for available + runs. + + """ + + can_define_runs = True + provide_run_metadata = False + provide_superruns = True + BUCKET = "cdt6-pub" + + def __init__( + self, + s3_access_key_id: str = "WTELWQ3XIAUGVZ15TGOH", + s3_secret_access_key: str = "0Q64rU0pJxDlFX7Bd3LulVwFGQtZtLGRHYPMvCmx", + endpoint_url: str = "https://rice1.osn.mghpcc.org", + path: str = "/xenonnt", + bucket_name: str = "cdt6-pub", + deep_scan=False, + *args, + **kwargs, + ): + """Initialize S3Frontend with given storage parameters. + + :param s3_access_key_id: AWS access key for authentication. + :param s3_secret_access_key: AWS secret key for authentication. + :param endpoint_url: URL of the S3-compatible object storage. + :param path: Base path for storing data. + :param bucket_name: Name of the S3 bucket to use. + :param deep_scan: If True, scans for runs even without explicit metadata. + :param args: Additional arguments passed to the superclass. + :param kwargs: Additional keyword arguments passed to the superclass. For other arguments, + see DataRegistry base class. + + """ + super().__init__(*args, **kwargs) + self.path = path + self.bucket_name = bucket_name + + # Configure S3 client + self.boto3_client_kwargs = { + "aws_access_key_id": s3_access_key_id, + "aws_secret_access_key": s3_secret_access_key, + "endpoint_url": endpoint_url, + "service_name": "s3", + "config": Config(connect_timeout=5, retries={"max_attempts": 10}), + } + + # Initialized connection to S3 storage + self.s3 = boto3.client(**self.boto3_client_kwargs) + self.backends = [S3Backend(self.bucket_name, self.path, **self.boto3_client_kwargs)] + + if s3_access_key_id != "": + self.is_configed = True + else: + self.is_configed = False + + def _run_meta_path(self, run_id: str) -> str: + """Generate the metadata file path for a given run ID. + + :param run_id: The identifier of the run. + :return: The path where the metadata is stored. + + """ + # Works but not sure if needed + return osp.join(self.path, RUN_METADATA_PATTERN % run_id) + + def run_metadata(self, run_id: str = "", data_type=None) -> List: + """Retrieve metadata for a given run from S3. + + Parameters + ---------- + + run_id : (str) + The identifier of the run. + data_type : (str) + Fields to extract from metadata (optional). + :return: Run metadata as a dictionary. + + """ + # Works + + # Checks if metadata exists + if self.s3.list_objects_v2(Bucket=self.bucket_name)["KeyCount"] == 0: + raise strax.RunMetadataNotAvailable( + f"No file found, cannot find run metadata for {run_id}" + ) + + # Retrieve metadata + response = self.s3.get_object(Bucket=self.bucket_name) + metadata_files = [ + obj["Key"] + for obj in response.get("Contents", []) + if obj["Key"].endswith("metadata.json") + ] + + if run_id != "": + metadata_files = [file for file in metadata_files if run_id in file] + + if data_type is not None: + metadata_files = [file for file in metadata_files if data_type in file] + # Things here are expected to return a dictionary, maybe I should look into it + return metadata_files + + def write_run_metadata(self, run_id: str, metadata: dict): + """Write metadata for a specific run to S3. + + :param run_id: The identifier of the run. + :param metadata: The metadata dictionary to store. + + """ + # Need to check, is this necessary? + + if "name" not in metadata: + metadata["name"] = run_id + + self.s3.put_object( + Bucket=self.bucket_name, + Key=self._run_meta_path(run_id), + Body=json.dumps(metadata, sort_keys=True, indent=4, default=json_util.default), + ) + + def s3_object_exists(self, key) -> bool: + """Check if a given object exists in the S3 bucket. + + :param key: The object key to check. [run_id-data_type-lineage] + :return: True if the object exists, otherwise False. + + """ + # Works as expected + + try: + response = self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=key, Delimiter="/") + + # Check if any objects were returned, and if the key exactly matches + if "CommonPrefixes" in response: + for prefix in response["CommonPrefixes"]: + new_key = key + "/" + if prefix["Prefix"].startswith(new_key): + return True # Object exists + + return False # Object does not exist + except ClientError as e: + raise e + + def _scan_runs(self, store_fields): + """Scan for available runs stored in S3. + + :param store_fields: List of metadata fields to return. + :return: Yields dictionaries of run metadata. + + """ + found = set() + + # Retrieve stored runs from S3 + for md_path in sorted( + self.s3.list_objects_v2( + Bucket=self.bucket_name, + Prefix=osp.join(self.path, RUN_METADATA_PATTERN.replace("%s", "*")), + ) + ): + run_id = osp.basename(md_path).split("-")[0] + found.add(run_id) + yield self.run_metadata(run_id, projection=store_fields) + + def _find(self, key, write, allow_incomplete, fuzzy_for=None, fuzzy_for_options=None, **kwargs): + """Find the appropriate storage key for a given dataset. + + :param key: The dataset key. + :param write: Whether to check for writable access. + :param allow_incomplete: Allow incomplete datasets. :parm fuzzy_for: Does nothing be + retained for compatibility :parm fuzzy_for_option: does nothing be retained for + compatibility + :return: The backend key if found, otherwise raises DataNotAvailable. + + """ + self.raise_if_non_compatible_run_id(key.run_id) + dirname = osp.join(self.path, str(key)) + exists = self.s3_object_exists(dirname) + bk = self.backend_key(dirname) + + if write: + if exists and not self._can_overwrite(key): + raise strax.DataExistsError(at=dirname) + return bk + + if allow_incomplete and not exists: + # Check for incomplete data (only exact matching for now) + tempdirname = dirname + "_temp" + bk = self.backend_key(tempdirname) + if ( + self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=tempdirname)["KeyCount"] + >= 0 + ): + return bk + + # Check exact match + if exists: + return bk + + raise strax.DataNotAvailable + + def _get_config_value(self, variable, option_name): + """Retrieve a configuration value from the environment or config file. + + :param variable: The variable to check. + :param option_name: The option name in the config file. + :return: The retrieved configuration value. + + """ + if variable is None: + if "s3" not in self.config: + raise EnvironmentError("S3 access point not spesified") + if not self.config.has_option("s3", option_name): + raise EnvironmentError(f"S3 access point lacks a {option_name}") + return self.config.get("s3", option_name) + + else: + return variable + + # def _folder_matches(self, fn, key, fuzzy_for, fuzzy_for_options, ignore_name=False): + """Check if a folder matches the required data key. + + :param fn: Folder name. + :param key: Data key to match against. + :param fuzzy_for: Parameters for fuzzy search. + :param fuzzy_for_options: Additional fuzzy search options. + :param ignore_name: If True, ignores run name while matching. + :return: The run_id if it matches, otherwise False. + + + # Parse the folder name, fuzz stuff doesnt make sense here so remove it + try: + _run_id, _data_type, _hash = self._parse_folder_name(fn) + except InvalidFolderNameFormat: + return False + + # Check exact match + if _data_type != key.data_type: + return False + if not ignore_name and _run_id != key._run_id: + return False + + # Check fuzzy match + if not (fuzzy_for or fuzzy_for_options): + if _hash == key.lineage_hash: + return _run_id + return False + metadata = self.backends[0].get_metadata(fn) + if self._matches(metadata["lineage"], key.lineage, fuzzy_for, fuzzy_for_options): + return _run_id + return False + """ + + def backend_key(self, dirname): + """Return the backend key representation. + + :param dirname: The directory name. + :return: Backend key tuple. + + """ + return self.backends[0].__class__.__name__, dirname + + def remove(self, key): + # Remove a data entery from storage + NotImplementedError + + @staticmethod + def _parse_folder_name(fn): + """Return (run_id, data_type, hash) if folder name matches DataDirectory convention, raise + InvalidFolderNameFormat otherwise.""" + keys = osp.normpath(fn).split(os.sep)[-1].split("-") + if len(keys) != 3: + # This is not a folder with strax data + raise InvalidFolderNameFormat(fn) + return keys + + @staticmethod + def raise_if_non_compatible_run_id(run_id): + """Raise an error if the run ID contains invalid characters. + + :param run_id: The run identifier. + + """ + if "-" in str(run_id): + raise ValueError( + "The filesystem frontend does not understand" + " run_id's with '-', please replace with '_'" + ) + + +@export +def dirname_to_prefix(dirname): + """Return filename prefix from dirname.""" + dirname = dirname.replace("_temp", "") + return os.path.basename(dirname.strip("/").rstrip("\\")).split("-", maxsplit=1)[1] + + +@export +class S3Backend(strax.StorageBackend): + """A storage backend that stores data in an S3-compatible object storage. + + Data is stored in binary files, named based on the chunk number. Metadata is stored separately + as a JSON file. + + """ + + BUCKET = "cdt6-pub" + + def __init__( + self, + bucket_name, + path, + set_target_chunk_mb: Optional[int] = None, + *args, + **kwargs, + ): + """Add set_chunk_size_mb to strax.StorageBackend to allow changing the chunk.target_size_mb + returned from the loader, any args or kwargs are passed to the strax.StorageBackend. + + :param bucket_name: Name of the bucket used as storage + :param set_target_chunk_mb: Prior to returning the loaders' chunks, return the chunk with an + updated target size + + """ + super().__init__() + self.path = path + self.s3 = boto3.client(**kwargs) + self.set_chunk_size_mb = set_target_chunk_mb + self.bucket_name = bucket_name + + def _get_metadata(self, dirname): + """Retrieve metadata for a given directory in S3. + + :param dirname: The directory name in S3 where metadata is stored. + :return: Dictionary containing metadata information. + :raises strax.DataCorrupted: If metadata is missing or corrupted. + + """ + # Works + prefix = dirname_to_prefix(dirname) + metadata_json = f"{prefix}-metadata.json" + md_path = osp.join(dirname, metadata_json) + + if self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=md_path)["KeyCount"] == 0: + # Try to see if we are so fast that there exists a temp folder + # with the metadata we need. + md_path = osp.join(dirname + "_temp", metadata_json) + + if self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=md_path)["KeyCount"] == 0: + # Try old-format metadata + # (if it's not there, just let it raise FileNotFound + # with the usual message in the next stage) + old_md_path = osp.join(dirname, "metadata.json") + if ( + self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=old_md_path)["KeyCount"] + == 0 + ): + raise strax.DataCorrupted(f"Data in {dirname} has no metadata") + md_path = old_md_path + + response = self.s3.get_object(Bucket=self.bucket_name, Key=md_path) + metadata_content = response["Body"].read().decode("utf-8") + return json.loads(metadata_content) + + def _read_and_format_chunk(self, *args, **kwargs): + """Read a data chunk and optionally update its target size. + + :return: Formatted data chunk. + + """ + chunk = super()._read_and_format_chunk(*args, **kwargs) + if self.set_chunk_size_mb: + chunk.target_size_mb = self.set_chunk_size_mb + return chunk + + def _read_chunk(self, dirname, chunk_info, dtype, compressor): + """Read a chunk of data from S3. + + :param dirname: Directory in S3 containing the chunk. + :param chunk_info: Dictionary containing chunk metadata. + :param dtype: Data type of the chunk. + :param compressor: Compression format used for the chunk. + :return: Loaded data chunk. + + """ + fn = osp.join(dirname, chunk_info["filename"]) + return strax.load_file( + fn, + dtype=dtype, + compressor=compressor, + S3_client=self.s3, + bucket_name=self.bucket_name, + is_s3_path=True, + ) + + def _saver(self, dirname, metadata, **kwargs): + """Create a saver object for writing data to S3. + + :param dirname: Directory in S3 where data will be stored. + :param metadata: Metadata dictionary associated with the data. + :param kwargs: Additional keyword arguments for the saver. + :return: An instance of `S3Saver`. + + """ + parent_dir = os.path.join(self.path, dirname) + return S3Saver(parent_dir, self.s3, self.bucket_name, metadata=metadata, **kwargs) + + +@export +class S3Saver(strax.Saver): + """A saver class that writes data chunks to an S3-compatible storage backend. + + Supports metadata management and chunked data saving. + + """ + + json_options = dict(sort_keys=True, indent=4) + # When writing chunks, rewrite the json file every time we write a chunk + _flush_md_for_every_chunk = True + + def __init__(self, dirname, s3, bucket_name, metadata, **kwargs): + """Initialize the S3Saver instance. + + :param dirname: Directory path (prefix) in S3 where data is stored. + :param s3: Boto3 S3 client instance. + :param metadata: Metadata dictionary associated with the data. + :param kwargs: Additional keyword arguments for the saver. + + """ + super().__init__(metadata=metadata) + self.dirname = dirname + self.s3 = s3 + self.bucket_name = bucket_name + + self.tempdirname = dirname + "_temp" + self.prefix = dirname_to_prefix(dirname) + self.metadata_json = f"{self.prefix}-metadata.json" + + self.config = boto3.s3.transfer.TransferConfig(max_concurrency=40, num_download_attempts=30) + + if self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=dirname)["KeyCount"] == 1: + print(f"Removing data in {dirname} to overwrite") + self.s3.delete_object(Bucket=self.bucket_name, Key=dirname) + if ( + self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=self.tempdirname)["KeyCount"] + == 1 + ): + print(f"Removing old incomplete data in {self.tempdirname}") + self.s3.delete_object(Bucket=self.bucket_name, Key=dirname) + # os.makedirs(self.tempdirname) + self._flush_metadata() + + def _flush_metadata(self): + # Convert the metadata dictionary to a JSON string + metadata_content = json.dumps(self.md, **self.json_options) + + # Define the S3 key for the metadata file + metadata_key = f"{self.tempdirname}/{self.metadata_json}" + + # Upload the metadata to S3 + self.s3.put_object(Bucket=self.bucket_name, Key=metadata_key, Body=metadata_content) + + def _chunk_filename(self, chunk_info): + """Generate a filename for a given chunk. + + :param chunk_info: Dictionary containing chunk metadata. + :return: Filename string. + + """ + if "filename" in chunk_info: + return chunk_info["filename"] + ichunk = "%06d" % chunk_info["chunk_i"] + return f"{self.prefix}-{ichunk}" + + def _save_chunk(self, data, chunk_info, executor=None): + """Save a chunk of data to S3. + + :param data: Data chunk to be saved. + :param chunk_info: Metadata dictionary for the chunk. + :param executor: Optional executor for parallel writes. + :return: Chunk metadata dictionary. + + """ + filename = self._chunk_filename(chunk_info) + + fn = os.path.join(self.tempdirname, filename) + kwargs = dict(data=data, compressor=self.md["compressor"]) + if executor is None: + filesize = strax.save_file( + fn, s3_client=self.s3, Bucket=self.bucket_name, is_s3_path=True, **kwargs + ) + return dict(filename=filename, filesize=filesize), None + else: + # Might need to add some s3 stuff here + return dict(filename=filename), executor.submit( + strax.save_file, + fn, + s3_client=self.s3, + Bucket=self.bucket_name, + is_s3_path=True, + **kwargs, + ) + + def _save_chunk_metadata(self, chunk_info): + """Save metadata associated with a data chunk. + + :param chunk_info: Dictionary containing chunk metadata. + + """ + is_first = chunk_info["chunk_i"] == 0 + if is_first: + self.md["start"] = chunk_info["start"] + + if self.is_forked: + # Do not write to the main metadata file to avoid race conditions + + filename = self._chunk_filename(chunk_info) + fn = f"{self.tempdirname}/metadata_{filename}.json" + metadata_content = json.dump(chunk_info, **self.json_options) + self.s3.put_object(Bucket=self.bucket_name, Key=fn, Body=metadata_content) + + if not self.is_forked or is_first: + self.md["chunks"].append(chunk_info) + if self._flush_md_for_every_chunk: + self._flush_metadata() + + def _close(self): + """Finalize the saving process by merging temp data and flushing metadata.""" + try: + response = self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=self.tempdirname) + if "Contents" not in response or len(response["Contents"]) == 0: + raise RuntimeError( + f"{self.tempdirname} was already renamed to {self.dirname}. " + "Did you attempt to run two savers pointing to the same " + "directory? Otherwise, this could be a strange race " + "condition or bug." + ) + + # List the files in the temporary directory matching metadata_*.json + response = self.s3.list_objects_v2( + Bucket=self.bucket_name, Prefix=f"{self.tempdirname}/metadata_" + ) + for obj in response.get("Contents", []): + key = obj["Key"] + # Download each metadata file, process, and delete from tempdirname + metadata_object = self.s3.get_object(Bucket=self.bucket_name, Key=key) + metadata_content = metadata_object["Body"].read().decode("utf-8") + self.md["chunks"].append(json.loads(metadata_content)) + + # Optionally, delete the metadata file after processing + self.s3.delete_object(Bucket=self.bucket_name, Key=key) + + # Flush metadata (this would be another method to handle your metadata saving logic) + self._flush_metadata() + + # Rename directory by copying all files from tempdirname to dirname + self._rename_s3_folder(self.tempdirname, self.dirname) + + except ClientError as e: + print(f"Error occurred: {e}") + raise + + def _rename_s3_folder(self, tempdirname, dirname): + """Rename the temporary directory to the final storage location in S3. + + :param tempdirname: Temporary directory path in S3. + :param dirname: Final directory path in S3. + + """ + response = self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=tempdirname) + for obj in response.get("Contents", []): + key = obj["Key"] + # Copy each file from the temporary directory to the final directory + new_key = key.replace(tempdirname, dirname) + self.s3.copy_object( + Bucket=self.bucket_name, + CopySource={"Bucket": self.bucket_name, "Key": key}, + Key=new_key, + ) + # Delete the file from the temporary directory + self.s3.delete_object(Bucket=self.bucket_name, Key=key) + + # Delete the temporary directory + self.s3.delete_object(Bucket=self.bucket_name, Key=tempdirname) + + +@export +class InvalidFolderNameFormat(Exception): + pass diff --git a/strax/utils.py b/strax/utils.py index f59fcf57e..85d6ef12d 100644 --- a/strax/utils.py +++ b/strax/utils.py @@ -832,6 +832,32 @@ def convert_tuple_to_list(init_func_input): return func_input +@export +def stx_file_parser(path: str): + """Strax assumes 2 main structures for directories it generates and files, as such we will want + to understand these file.""" + + assert type(path) is str + + try: + temp1, temp2, temp3 = re.split(r"-", path) + + if temp1.isdigit(): + file_data = {"run_id": temp1, "dtype": temp2, "lineage": temp3, "is_json": False} + # Covers chunks + else: + # dtype, lineage, x = re.split(r'-', path) + if temp3 == "metadata.json": + file_data = {"run_id": temp1, "dtype": temp2, "lineage": temp3, "is_json": True} + else: + file_data = {"chunk_i": temp3, "dtype": temp1, "lineage": temp2, "is_json": False} + + except ValueError: + print("An invalid string was given to this function") + + return file_data + + @export def convert_structured_array_to_df(structured_array, log=None): """Convert a structured numpy array to a pandas DataFrame. diff --git a/tests/test_storage.py b/tests/test_storage.py index 101c7e758..1794f19d0 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -32,6 +32,15 @@ def test_write_data_dir(self): self.st.make(run_id, self.target) assert self.st.is_stored(run_id, self.target) + def test_write_data_s3(self): + self.st.storage = [strax.S3Frontend()] + run_id = "0" + self.st.make(run_id, self.target) + if self.st.storage[0].is_configed != "": + assert self.st.is_stored(run_id, self.target) + else: + pass + def test_complain_run_id(self): self.st.storage = [strax.DataDirectory(self.path)] run_id = "run-0"