diff --git a/README.md b/README.md index d429159..c1e9df0 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,12 @@ or from conda: conda install -c conda-forge nd2 ``` +> **Note:** Using `pip install nd2` will install the default nd2 reader from PyPI. To install this fork with fsspec/remote reader support: +> +> ```sh +> uv pip install git+https://github.com/derekthirstrup/nd2.git@main +> ``` + ### Legacy nd2 file support Legacy nd2 (JPEG2000) files are also supported, but require `imagecodecs`. To @@ -590,6 +596,45 @@ pytest (and feel free to open an issue if that doesn't work!) +## Remote File Access (fsspec) + +`ND2File` supports reading from remote URLs (HTTP, S3, GCS, Azure) via [fsspec](https://filesystem-spec.readthedocs.io/): + +```python +import nd2 + +# HTTP/HTTPS +with nd2.ND2File("https://example.com/data/file.nd2") as f: + data = f.asarray() + +# S3 (requires s3fs) +with nd2.ND2File("s3://bucket/path/file.nd2") as f: + frames = f.read_frames([0, 1, 2], max_workers=64) + +# Check if file is remote +print(f.is_remote) # True +``` + +### Installation + +```bash +pip install nd2 fsspec aiohttp + +# For S3: pip install s3fs +# For GCS: pip install gcsfs +# For Azure: pip install adlfs +``` + +### Parallel I/O for High-Bandwidth Networks + +For remote files, `read_frames()` uses parallel requests to saturate high-bandwidth connections: + +```python +with nd2.ND2File("s3://bucket/large_file.nd2") as f: + # Read multiple frames in parallel (default: 64 workers for remote) + frames = f.read_frames(list(range(100)), max_workers=64) +``` + ## alternatives Here are some other nd2 readers that I know of, though many diff --git a/src/nd2/_nd2file.py b/src/nd2/_nd2file.py index 05032a7..4000130 100644 --- a/src/nd2/_nd2file.py +++ b/src/nd2/_nd2file.py @@ -1255,6 +1255,54 @@ def read_frame(self, frame_index: SupportsInt) -> np.ndarray: frame.shape = self._raw_frame_shape return frame.transpose((2, 0, 1, 3)).squeeze() + def read_frames( + self, + indices: Sequence[int] | None = None, + max_workers: int | None = None, + ) -> np.ndarray: + """Read multiple frames with optional parallel I/O. + + For remote files (HTTP, S3, etc.), this uses parallel requests + to saturate high-bandwidth connections (e.g., 10Gbit+). + + Parameters + ---------- + indices : Sequence[int], optional + Frame indices to read. If None, reads all frames. + max_workers : int, optional + Number of parallel workers. Default: 4 for local, 64 for remote. + + Returns + ------- + np.ndarray + Stacked array of frames with shape matching self.shape + """ + if indices is None: + indices = list(range(self._frame_count)) + + # Determine optimal worker count + if max_workers is None: + max_workers = 64 if self.is_remote else 4 + + # Use parallel reader if available + if hasattr(self._rdr, "read_frames_parallel"): + frames = self._rdr.read_frames_parallel(list(indices), max_workers) + else: + frames = [self._rdr.read_frame(i) for i in indices] + + # Reshape and transpose frames to match expected output + reshaped = [] + for frame in frames: + frame.shape = self._raw_frame_shape + reshaped.append(frame.transpose((2, 0, 1, 3)).squeeze()) + + return np.stack(reshaped) + + @property + def is_remote(self) -> bool: + """Whether this file is accessed via a remote URL (http, s3, etc.).""" + return getattr(self._rdr, "_is_remote", False) + @cached_property def loop_indices(self) -> tuple[dict[str, int], ...]: """Return a tuple of dicts of loop indices for each frame. @@ -1349,7 +1397,9 @@ def __repr__(self) -> str: """Return a string representation of the ND2File.""" try: details = " (closed)" if self.closed else f" {self.dtype}: {self.sizes!r}" - extra = f": {self._path.name!r}{details}" + # Handle both Path objects and remote URL strings + name = self._path.name if hasattr(self._path, "name") else self._path + extra = f": {name!r}{details}" except Exception: extra = "" return f"" diff --git a/src/nd2/_readers/_modern/modern_reader.py b/src/nd2/_readers/_modern/modern_reader.py index 7ece2ec..deb49e5 100644 --- a/src/nd2/_readers/_modern/modern_reader.py +++ b/src/nd2/_readers/_modern/modern_reader.py @@ -1,8 +1,10 @@ from __future__ import annotations import os +import threading import warnings import zlib +from concurrent.futures import ThreadPoolExecutor, as_completed from contextlib import suppress from typing import TYPE_CHECKING, Any, cast @@ -331,18 +333,23 @@ def read_frame(self, index: int) -> np.ndarray: if self.attributes().compressionType == "lossless": return self._read_compressed_frame(index) - try: - return np.ndarray( - shape=self._actual_frame_shape(), - dtype=self._dtype(), - buffer=self._mmap, - offset=offset, - strides=self._strides, - ) - except TypeError: - # If the chunkmap is wrong, and the mmap isn't long enough - # for the requested offset & size, a TypeError is raised. - return self._missing_frame(index) + # Fast path: mmap for local files + if self._mmap is not None: + try: + return np.ndarray( + shape=self._actual_frame_shape(), + dtype=self._dtype(), + buffer=self._mmap, + offset=offset, + strides=self._strides, + ) + except TypeError: + # If the chunkmap is wrong, and the mmap isn't long enough + # for the requested offset & size, a TypeError is raised. + return self._missing_frame(index) + + # Fallback: seek/read for remote files (no mmap available) + return self._read_frame_fallback(index, offset) def _read_compressed_frame(self, index: int) -> np.ndarray: ch = self._load_chunk(f"ImageDataSeq|{index}!".encode()) @@ -357,6 +364,133 @@ def _missing_frame(self, index: int = 0) -> np.ndarray: # TODO: add other modes for filling missing data return np.zeros(self._raw_frame_shape(), self._dtype()) + def _read_frame_fallback(self, index: int, offset: int) -> np.ndarray: + """Read frame via seek/read for remote files without mmap.""" + try: + if self._fh is None: + raise ValueError("File not open") + self._fh.seek(offset) + attr = self.attributes() + # Calculate frame size in bytes + frame_bytes = attr.heightPx * ( + attr.widthBytes + or (attr.widthPx or 1) + * (attr.bitsPerComponentInMemory // 8) + * attr.componentCount + ) + data = self._fh.read(frame_bytes) + arr = np.frombuffer(data, dtype=self._dtype()) + return arr.reshape(self._actual_frame_shape()) + except Exception: + return self._missing_frame(index) + + def _frame_size_bytes(self) -> int: + """Calculate frame size in bytes.""" + attr = self.attributes() + return attr.heightPx * ( + attr.widthBytes + or (attr.widthPx or 1) + * (attr.bitsPerComponentInMemory // 8) + * attr.componentCount + ) + + def read_frames_parallel( + self, + indices: list[int], + max_workers: int = 64, + ) -> list[np.ndarray]: + """Read multiple frames in parallel (optimized for remote files). + + For local files with mmap, this offers minimal benefit over sequential reads. + For remote files, this can saturate high-bandwidth connections by issuing + parallel byte-range HTTP requests. + + Parameters + ---------- + indices : list[int] + Frame indices to read + max_workers : int + Number of parallel workers. Default 64 is good for remote files + to saturate 10Gbit+ connections. For local files, 4-8 is sufficient. + + Returns + ------- + list[np.ndarray] + List of frame arrays in the same order as indices + """ + if not self._fh: + raise ValueError("Attempt to read from closed nd2 file") + + # For local files with mmap, parallel reading has minimal benefit + # Sequential is simpler and nearly as fast + if self._mmap is not None: + return [self.read_frame(i) for i in indices] + + # For remote files: parallel reads via thread pool + return self._read_frames_parallel_remote(indices, max_workers) + + def _read_frames_parallel_remote( + self, + indices: list[int], + max_workers: int, + ) -> list[np.ndarray]: + """Read frames in parallel using concurrent file handles.""" + from nd2._readers.protocol import _open_file + + # Pre-compute all offsets + frame_info: list[tuple[int, int | None]] = [] + for idx in indices: + offset = self._frame_offsets.get(idx, None) + frame_info.append((idx, offset)) + + frame_bytes = self._frame_size_bytes() + dtype = self._dtype() + shape = self._actual_frame_shape() + path = self._path + results: dict[int, np.ndarray] = {} + + # Thread-local file handles for connection reuse + thread_local = threading.local() + # Track all opened file handles for cleanup + open_handles: list[Any] = [] + handles_lock = threading.Lock() + + def get_file() -> Any: + if not hasattr(thread_local, "fh"): + thread_local.fh = _open_file(path) + with handles_lock: + open_handles.append(thread_local.fh) + return thread_local.fh + + def read_one(info: tuple[int, int | None]) -> tuple[int, np.ndarray]: + idx, offset = info + if offset is None: + return idx, np.zeros(self._raw_frame_shape(), dtype) + + fh = get_file() + fh.seek(offset) + data = fh.read(frame_bytes) + arr = np.frombuffer(data, dtype=dtype).reshape(shape) + return idx, arr.copy() # Copy to release buffer + + try: + # Execute in parallel + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = { + executor.submit(read_one, info): info[0] for info in frame_info + } + for future in as_completed(futures): + idx, arr = future.result() + results[idx] = arr + finally: + # Close all thread-local file handles + for fh in open_handles: + with suppress(Exception): + fh.close() + + # Return in original order + return [results[idx] for idx in indices] + def _raw_frame_shape(self) -> tuple[int, ...]: if self._raw_frame_shape_ is None: attr = self.attributes() diff --git a/src/nd2/_readers/protocol.py b/src/nd2/_readers/protocol.py index 5a379f2..8ec65b0 100644 --- a/src/nd2/_readers/protocol.py +++ b/src/nd2/_readers/protocol.py @@ -1,11 +1,12 @@ from __future__ import annotations import abc +import contextlib import mmap import warnings from contextlib import nullcontext from pathlib import Path -from typing import TYPE_CHECKING, BinaryIO, cast +from typing import TYPE_CHECKING, Any, BinaryIO, cast from nd2._parse._chunk_decode import get_version @@ -30,6 +31,51 @@ ChunkMap = dict[bytes, Sequence[int]] +# Remote URL prefixes that require fsspec +_REMOTE_PREFIXES = ( + "http://", + "https://", + "s3://", + "gs://", + "az://", + "abfs://", + "smb://", +) + + +def _is_remote_path(path: str | Path | Any) -> bool: + """Check if path is a remote URL requiring fsspec.""" + return isinstance(path, str) and any(path.startswith(p) for p in _REMOTE_PREFIXES) + + +def _open_file(path: str | Path | Any, block_size: int = 8 * 1024 * 1024) -> BinaryIO: + """Open file, using fsspec for remote URLs. + + Parameters + ---------- + path : str | Path + File path or remote URL + block_size : int + Block size for fsspec caching (default 8MB) + + Returns + ------- + BinaryIO + Open file handle + """ + if _is_remote_path(path): + try: + import fsspec + except ImportError as e: + raise ImportError( + "fsspec is required for remote file access. " + "Install with: pip install fsspec aiohttp" + ) from e + return cast( + "BinaryIO", fsspec.open(str(path), mode="rb", block_size=block_size).open() + ) + return open(path, "rb") + class ND2Reader(abc.ABC): """Abstract Base class for ND2 file readers.""" @@ -47,7 +93,7 @@ def create( Parameters ---------- path : str - Path to the ND2 file. + Path to the ND2 file (local path or remote URL like http://, s3://, etc.) error_radius : int, optional If b"ND2 FILEMAP SIGNATURE NAME 0001!" is not found at expected location and `error_radius` is not None, then an area of +/- `error_radius` bytes will be @@ -62,12 +108,18 @@ def create( "File handles passed to ND2File must be in binary mode" ) ctx: AbstractContextManager[BinaryIO] = nullcontext(path) + fname: str = getattr(path, "name", "") + elif _is_remote_path(path): + # Remote URL - keep as string, use fsspec to open + ctx = _open_file(path) + fname = str(path) else: + # Local path - convert to Path object path = Path(path).expanduser().absolute() ctx = open(path, "rb") + fname = str(path) with ctx as fh: - fname = fh.name fh.seek(0) magic_num = fh.read(4) @@ -80,16 +132,24 @@ def create( def __init__(self, path: FileOrBinaryIO, error_radius: int | None = None) -> None: self._chunkmap: dict | None = None - self._mmap: mmap.mmap | None = None + self._is_remote: bool = False + if hasattr(path, "read"): self._fh: BinaryIO | None = cast("BinaryIO", path) self._was_open = not self._fh.closed - self._path: Path = Path(self._fh.name) - self._mmap = mmap.mmap(self._fh.fileno(), 0, access=mmap.ACCESS_READ) + self._path: Path | str = Path(getattr(self._fh, "name", "")) + # Only create mmap for local files with fileno() + if hasattr(self._fh, "fileno"): + with contextlib.suppress(OSError, ValueError): + self._mmap = mmap.mmap( + self._fh.fileno(), 0, access=mmap.ACCESS_READ + ) else: self._was_open = False - self._path = Path(path) + self._is_remote = _is_remote_path(path) + # Keep remote paths as strings, convert local to Path + self._path = str(path) if self._is_remote else Path(path) self._fh = None self._error_radius: int | None = error_radius self.open() @@ -101,8 +161,13 @@ def is_legacy(self) -> bool: def open(self) -> None: """Open the file handle.""" if self._fh is None or self._fh.closed: - self._fh = open(self._path, "rb") - self._mmap = mmap.mmap(self._fh.fileno(), 0, access=mmap.ACCESS_READ) + self._fh = _open_file(self._path) + # Only create mmap for local files + if not self._is_remote and hasattr(self._fh, "fileno"): + with contextlib.suppress(OSError, ValueError): + self._mmap = mmap.mmap( + self._fh.fileno(), 0, access=mmap.ACCESS_READ + ) def close(self) -> None: """Close the file handle.""" diff --git a/tests/test_remote.py b/tests/test_remote.py new file mode 100644 index 0000000..25aceb5 --- /dev/null +++ b/tests/test_remote.py @@ -0,0 +1,163 @@ +"""Tests for remote/fsspec file access functionality.""" + +from __future__ import annotations + +from pathlib import Path +from unittest.mock import MagicMock, patch + +import numpy as np +import pytest +from nd2._readers.protocol import _is_remote_path, _open_file + +DATA = Path(__file__).parent / "data" + + +class TestIsRemotePath: + """Tests for _is_remote_path() URL detection.""" + + @pytest.mark.parametrize( + "path,expected", + [ + # Remote URLs - should return True + ("http://example.com/file.nd2", True), + ("https://example.com/file.nd2", True), + ("s3://bucket/file.nd2", True), + ("gs://bucket/file.nd2", True), + ("az://container/file.nd2", True), + ("abfs://container/file.nd2", True), + ("smb://server/share/file.nd2", True), + # Local paths - should return False + ("/path/to/file.nd2", False), + ("./relative/path.nd2", False), + ("file.nd2", False), + (Path("/path/to/file.nd2"), False), + (Path("relative/path.nd2"), False), + # Edge cases + ("", False), + ("http", False), # No :// suffix + ("s3", False), + # Non-string types + (123, False), + (None, False), + ], + ) + def test_remote_path_detection(self, path, expected): + """Test that remote URLs are correctly identified.""" + assert _is_remote_path(path) == expected + + +class TestOpenFile: + """Tests for _open_file() function.""" + + def test_open_local_file(self, any_nd2): + """Test opening a local file.""" + fh = _open_file(any_nd2) + try: + assert hasattr(fh, "read") + assert hasattr(fh, "seek") + # Should be able to read some bytes + data = fh.read(4) + assert len(data) == 4 + finally: + fh.close() + + def test_open_local_path_object(self, any_nd2): + """Test opening a local file using Path object.""" + fh = _open_file(Path(any_nd2)) + try: + assert hasattr(fh, "read") + data = fh.read(4) + assert len(data) == 4 + finally: + fh.close() + + def test_open_remote_without_fsspec(self): + """Test that opening remote URL without fsspec raises ImportError.""" + with patch.dict("sys.modules", {"fsspec": None}): + # Force reimport to pick up the patched module + with pytest.raises(ImportError, match="fsspec is required"): + _open_file("https://example.com/file.nd2") + + def test_open_remote_with_fsspec(self): + """Test opening remote URL with mocked fsspec.""" + mock_file = MagicMock() + mock_file.read.return_value = b"\x00\x00\x00\x00" + mock_opener = MagicMock() + mock_opener.open.return_value = mock_file + + mock_fsspec = MagicMock() + mock_fsspec.open.return_value = mock_opener + + with patch.dict("sys.modules", {"fsspec": mock_fsspec}): + # We need to reload to use the mock + from nd2._readers import protocol + + # Call the function + protocol._open_file("https://example.com/file.nd2") + + # Verify fsspec.open was called correctly + mock_fsspec.open.assert_called_once() + call_args = mock_fsspec.open.call_args + assert call_args[0][0] == "https://example.com/file.nd2" + assert call_args[1]["mode"] == "rb" + assert "block_size" in call_args[1] + + +class TestND2FileRemote: + """Integration tests for ND2File with remote URLs.""" + + def test_is_remote_property_local(self, any_nd2): + """Test that is_remote returns False for local files.""" + import nd2 + + with nd2.ND2File(any_nd2) as f: + assert f.is_remote is False + + def test_read_frames_local(self, any_nd2): + """Test read_frames() method on local files.""" + import nd2 + + with nd2.ND2File(any_nd2) as f: + # Read a subset of frames + n_frames = min(3, f.attributes.sequenceCount) + if n_frames > 0: + indices = list(range(n_frames)) + frames = f.read_frames(indices) + assert isinstance(frames, np.ndarray) + assert frames.shape[0] == n_frames + + def test_read_frames_all(self, any_nd2): + """Test read_frames() with no indices reads all frames.""" + import nd2 + + with nd2.ND2File(any_nd2) as f: + seq_count = f.attributes.sequenceCount + if seq_count > 0 and seq_count <= 10: # Only test small files + frames = f.read_frames() + assert isinstance(frames, np.ndarray) + assert frames.shape[0] == seq_count + + +class TestReadFramesParallel: + """Tests for parallel frame reading.""" + + def test_parallel_read_local_uses_sequential(self, any_nd2): + """Test that local files use sequential reading (via mmap).""" + import nd2 + + with nd2.ND2File(any_nd2) as f: + # Local files should have mmap, so parallel read falls back to sequential + if f.attributes.sequenceCount > 0: + frames = f.read_frames([0], max_workers=4) + assert isinstance(frames, np.ndarray) + assert frames.shape[0] == 1 + + def test_read_frames_parallel_method_exists(self, any_nd2): + """Test that read_frames_parallel method exists on ModernReader.""" + import nd2 + from nd2._readers._modern.modern_reader import ModernReader + + with nd2.ND2File(any_nd2) as f: + if isinstance(f._rdr, ModernReader): + assert hasattr(f._rdr, "read_frames_parallel") + assert callable(f._rdr.read_frames_parallel)