Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
5ca8417
add S3 functionality to strax
LuisSanchez25 Sep 26, 2024
d1cbeeb
add _get_config_values
LuisSanchez25 Sep 26, 2024
f66ff0a
S3 code pases st.make test
LuisSanchez25 Oct 1, 2024
fb4e2ca
add types (incomplete)
LuisSanchez25 Oct 2, 2024
73f6faa
Merge branch 'master' into s3_protocol
LuisSanchez25 Oct 2, 2024
c6c71dd
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 2, 2024
89742f6
Merge branch 'master' into s3_protocol
yuema137 Oct 17, 2024
6c2548b
Merge branch 'master' into s3_protocol
yuema137 Nov 15, 2024
607514b
Merge branch 'master' into s3_protocol
yuema137 Nov 20, 2024
e846991
Merge branch 'master' into s3_protocol
LuisSanchez25 Mar 3, 2025
f2dbc7c
update io
LuisSanchez25 Mar 3, 2025
9409a2e
update documentation + change bucket initialization
LuisSanchez25 Mar 3, 2025
63e0bad
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 3, 2025
55fe59b
remove uneeded imports + reduce line length
LuisSanchez25 Mar 3, 2025
c5cce8e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 3, 2025
8789a53
Merge branch 'master' into s3_protocol
LuisSanchez25 Mar 4, 2025
107f0d9
fix style
LuisSanchez25 Mar 4, 2025
d653133
Update io.py
LuisSanchez25 Mar 4, 2025
7386f90
fix test
LuisSanchez25 Mar 4, 2025
d9aa5b6
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 4, 2025
c306023
add self.bucket_name
LuisSanchez25 Mar 4, 2025
f8d2bee
Remove unnecessary functions
LuisSanchez25 Mar 5, 2025
e820701
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 5, 2025
3770688
remove unused variables
LuisSanchez25 Mar 5, 2025
c0a343f
fix return output + add bucket default name
LuisSanchez25 Mar 5, 2025
1c2c452
Merge branch 'master' into s3_protocol
yuema137 Mar 20, 2025
514002b
fix broken functions after update
LuisSanchez25 May 6, 2025
4320190
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 6, 2025
f7e56af
change boto3 to compatible version
LuisSanchez25 May 6, 2025
3864a43
Merge branch 'master' into s3_protocol
LuisSanchez25 May 22, 2025
29545d3
update files based on copilot recomendation
LuisSanchez25 Jul 29, 2025
ee135f1
Merge branch 'master' into s3_protocol
LuisSanchez25 Jul 29, 2025
4bb5ddb
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ rechunker = "strax.scripts.rechunker:main"
[tool.poetry.dependencies]
python = ">=3.10,<3.13"
blosc = "*"
boto3 = "^1.33.13"
click = "*"
deepdiff = "*"
dill = "*"
Expand Down
1 change: 1 addition & 0 deletions strax/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from .storage.file_rechunker import *
from .storage.mongo import *
from .storage.zipfiles import *
from .storage.s3 import *

from .config import *
from .plugins import *
Expand Down
105 changes: 93 additions & 12 deletions strax/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
import zstandard
import lz4.frame as lz4
from ast import literal_eval
from io import BytesIO
from botocore.exceptions import ClientError

import strax
from strax import RUN_METADATA_PATTERN

export, __all__ = strax.exporter()
__all__.extend(["DECOMPRESS_BUFFER_SIZE"])
Expand Down Expand Up @@ -80,22 +81,54 @@ def _lz4_decompress(f, buffer_size=DECOMPRESS_BUFFER_SIZE):


@export
def load_file(f, compressor, dtype):
"""Read and return data from file.
def load_file(f, compressor, dtype, bucket_name=None, is_s3_path=False):
"""Read and return data from file or S3.

:param f: file name or handle to read from
:param compressor: compressor to use for decompressing. If not passed, will try to load it from
json metadata file.
:param dtype: numpy dtype of data to load
:param is_s3_path: Boolean indicating if the file is stored in S3.

"""
if isinstance(f, str):
with open(f, mode="rb") as write_file:
return _load_file(write_file, compressor, dtype)
if is_s3_path:
# Read from S3
return load_file_from_s3(f, compressor, dtype, bucket_name)
elif isinstance(f, str):
# Read from local file
with open(f, mode="rb") as read_file:
return _load_file(read_file, compressor, dtype)
else:
# If f is already a file-like object, just use it
return _load_file(f, compressor, dtype)


def load_file_from_s3(f, compressor, dtype, bucket_name):
"""Helper function to load data from S3.

Confirm file exists, then try to load and decompress it.

"""
s3 = strax.S3Frontend().s3

try:
# data = COMPRESSORS[compressor]["_decompress"](f)
# if not len(data):
# return np.zeros(0, dtype=dtype)

# Retrieve the file from S3 and load into a BytesIO buffer
response = s3.get_object(Bucket=bucket_name, Key=f)
file_buffer = BytesIO()

for chunk in response["Body"].iter_chunks(chunk_size=DECOMPRESS_BUFFER_SIZE):
file_beffer.write(chunk)

file_buffer.seek(0)

except ClientError as e:
raise RuntimeError(f"Failed to load {f} from bucket {bucket_name}: {e}")


def _load_file(f, compressor, dtype):
try:
data = COMPRESSORS[compressor]["_decompress"](f)
Expand All @@ -105,6 +138,8 @@ def _load_file(f, compressor, dtype):
return np.frombuffer(data, dtype=dtype)
except ValueError as e:
raise ValueError(f"ValueError while loading data with dtype =\n\t{dtype}") from e
except (ValueError, KeyError) as e:
raise RuntimeError(f"Error loading file: {e}")

except Exception:
raise strax.DataCorrupted(
Expand All @@ -113,21 +148,37 @@ def _load_file(f, compressor, dtype):


@export
def save_file(f, data, compressor="zstd"):
def save_file(f, data, compressor="zstd", s3_client=None, Bucket=None, is_s3_path=False):
"""Save data to file and return number of bytes written.

:param f: file name or handle to save to
:param data: data (numpy array) to save
:param compressor: compressor to use

"""

if isinstance(f, str):
final_fn = f
temp_fn = f + "_temp"
with open(temp_fn, mode="wb") as write_file:
result = _save_file(write_file, data, compressor)
os.rename(temp_fn, final_fn)
return result
if not is_s3_path:
with open(temp_fn, mode="wb") as write_file:
result = _save_file(write_file, data, compressor)
os.rename(temp_fn, final_fn)
return result
else:
# s3_interface = s3_client
# Copy temp file to final file
result = _save_file_to_s3(s3_client, temp_fn, data, Bucket, compressor)
s3_client.copy_object(
Bucket=Bucket,
Key=final_fn,
CopySource={"Bucket": Bucket, "Key": temp_fn},
)

# Delete the temporary file
s3_client.delete_object(Bucket=Bucket, Key=temp_fn)

return result
else:
return _save_file(f, data, compressor)

Expand All @@ -139,10 +190,40 @@ def _save_file(f, data, compressor="zstd"):
return len(d_comp)


def _save_file_to_s3(s3_client, key, data, Bucket, compressor=None):
# Use this method to save file directly to S3
# If compression is needed, handle it here
# Use `BytesIO` to handle binary data in-memory
assert isinstance(data, np.ndarray), "Please pass a numpy array"

# Create a binary buffer to simulate writing to a file
buffer = BytesIO()

# Simulate saving file content (you can compress or directly write data here)
if compressor:
data = COMPRESSORS[compressor]["compress"](data)
buffer.write(data)
buffer.seek(0) # Reset the buffer to the beginning

# Upload buffer to S3 under the specified key
s3_client.put_object(Bucket=Bucket, Key=key, Body=buffer.getvalue())

return len(data)


def _compress_blosc(data):
if data.nbytes >= blosc.MAX_BUFFERSIZE:
raise ValueError("Blosc's input buffer cannot exceed ~2 GB")
return blosc.compress(data, shuffle=False)


COMPRESSORS["blosc"]["compress"] = _compress_blosc


@export
def dry_load_files(dirname, chunk_numbers=None, disable=False, **kwargs):
prefix = strax.storage.files.dirname_to_prefix(dirname)
metadata_json = RUN_METADATA_PATTERN % prefix
metadata_json = f"{prefix}-metadata.json"
md_path = os.path.join(dirname, metadata_json)

with open(md_path, mode="r") as f:
Expand Down
2 changes: 1 addition & 1 deletion strax/storage/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def __init__(self, path=".", *args, deep_scan=False, **kwargs):
def _run_meta_path(self, run_id):
return osp.join(self.path, RUN_METADATA_PATTERN % run_id)

def run_metadata(self, run_id, projection=None):
def run_metadata(self, run_id: str = "", projection=None):
path = self._run_meta_path(run_id)
if not osp.exists(path):
raise strax.RunMetadataNotAvailable(
Expand Down
Loading