Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
53c38dc
feat: add fsspec-based remote/streaming ND2 reader
Jan 20, 2026
3f6f578
docs: add fsspec remote reader documentation to README
Jan 20, 2026
47d8366
docs: add PR description for upstream submission
Jan 21, 2026
3eed53b
feat: add SMB protocol support for network share access
Jan 21, 2026
9a202b4
fix: correct scene count and Z step parsing
Jan 21, 2026
a3cbed6
fix: use ND2File for reliable local metadata parsing
Jan 21, 2026
3e21a68
perf: increase default parallel workers to 64 and update docs
Jan 21, 2026
a9259b3
style(pre-commit.ci): auto fixes [...]
pre-commit-ci[bot] Jan 21, 2026
0875e10
docs: add installation instructions for fork
Jan 21, 2026
1e53e06
fix: resolve ruff and mypy linting errors in _fsspec.py
Jan 21, 2026
6adfffc
style(pre-commit.ci): auto fixes [...]
pre-commit-ci[bot] Jan 21, 2026
fbf0926
feat: integrate fsspec support into ND2File for remote file access
Jan 21, 2026
a09d6ac
style(pre-commit.ci): auto fixes [...]
pre-commit-ci[bot] Jan 21, 2026
57bd837
fix: resolve ruff and mypy issues in fsspec integration
Jan 21, 2026
e2d86ab
test: add test coverage for fsspec integration
Jan 21, 2026
64a94b6
refactor: remove separate ND2FsspecReader in favor of integrated appr…
Jan 21, 2026
935a42a
fix: simplify test to avoid resource warnings
Jan 21, 2026
7c3599b
docs: simplify README to reflect integrated ND2File approach
Jan 21, 2026
83edcc3
fix: close thread-local file handles in parallel remote reads
Jan 22, 2026
8ac66b4
style(pre-commit.ci): auto fixes [...]
pre-commit-ci[bot] Jan 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ or from conda:
conda install -c conda-forge nd2
```

> **Note:** Using `pip install nd2` will install the default nd2 reader from PyPI. To install this fork with fsspec/remote reader support:
>
> ```sh
> uv pip install git+https://github.com/derekthirstrup/nd2.git@main
> ```

### Legacy nd2 file support

Legacy nd2 (JPEG2000) files are also supported, but require `imagecodecs`. To
Expand Down Expand Up @@ -590,6 +596,45 @@ pytest

(and feel free to open an issue if that doesn't work!)

## Remote File Access (fsspec)

`ND2File` supports reading from remote URLs (HTTP, S3, GCS, Azure) via [fsspec](https://filesystem-spec.readthedocs.io/):

```python
import nd2

# HTTP/HTTPS
with nd2.ND2File("https://example.com/data/file.nd2") as f:
data = f.asarray()

# S3 (requires s3fs)
with nd2.ND2File("s3://bucket/path/file.nd2") as f:
frames = f.read_frames([0, 1, 2], max_workers=64)

# Check if file is remote
print(f.is_remote) # True
```

### Installation

```bash
pip install nd2 fsspec aiohttp

# For S3: pip install s3fs
# For GCS: pip install gcsfs
# For Azure: pip install adlfs
```

### Parallel I/O for High-Bandwidth Networks

For remote files, `read_frames()` uses parallel requests to saturate high-bandwidth connections:

```python
with nd2.ND2File("s3://bucket/large_file.nd2") as f:
# Read multiple frames in parallel (default: 64 workers for remote)
frames = f.read_frames(list(range(100)), max_workers=64)
```

## alternatives

Here are some other nd2 readers that I know of, though many
Expand Down
52 changes: 51 additions & 1 deletion src/nd2/_nd2file.py
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,54 @@ def read_frame(self, frame_index: SupportsInt) -> np.ndarray:
frame.shape = self._raw_frame_shape
return frame.transpose((2, 0, 1, 3)).squeeze()

def read_frames(
self,
indices: Sequence[int] | None = None,
max_workers: int | None = None,
) -> np.ndarray:
"""Read multiple frames with optional parallel I/O.

For remote files (HTTP, S3, etc.), this uses parallel requests
to saturate high-bandwidth connections (e.g., 10Gbit+).

Parameters
----------
indices : Sequence[int], optional
Frame indices to read. If None, reads all frames.
max_workers : int, optional
Number of parallel workers. Default: 4 for local, 64 for remote.

Returns
-------
np.ndarray
Stacked array of frames with shape matching self.shape
"""
if indices is None:
indices = list(range(self._frame_count))

# Determine optimal worker count
if max_workers is None:
max_workers = 64 if self.is_remote else 4

# Use parallel reader if available
if hasattr(self._rdr, "read_frames_parallel"):
frames = self._rdr.read_frames_parallel(list(indices), max_workers)
else:
frames = [self._rdr.read_frame(i) for i in indices]

# Reshape and transpose frames to match expected output
reshaped = []
for frame in frames:
frame.shape = self._raw_frame_shape
reshaped.append(frame.transpose((2, 0, 1, 3)).squeeze())

return np.stack(reshaped)

@property
def is_remote(self) -> bool:
"""Whether this file is accessed via a remote URL (http, s3, etc.)."""
return getattr(self._rdr, "_is_remote", False)

@cached_property
def loop_indices(self) -> tuple[dict[str, int], ...]:
"""Return a tuple of dicts of loop indices for each frame.
Expand Down Expand Up @@ -1349,7 +1397,9 @@ def __repr__(self) -> str:
"""Return a string representation of the ND2File."""
try:
details = " (closed)" if self.closed else f" {self.dtype}: {self.sizes!r}"
extra = f": {self._path.name!r}{details}"
# Handle both Path objects and remote URL strings
name = self._path.name if hasattr(self._path, "name") else self._path
extra = f": {name!r}{details}"
except Exception:
extra = ""
return f"<ND2File at {hex(id(self))}{extra}>"
Expand Down
158 changes: 146 additions & 12 deletions src/nd2/_readers/_modern/modern_reader.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from __future__ import annotations

import os
import threading
import warnings
import zlib
from concurrent.futures import ThreadPoolExecutor, as_completed
from contextlib import suppress
from typing import TYPE_CHECKING, Any, cast

Expand Down Expand Up @@ -331,18 +333,23 @@ def read_frame(self, index: int) -> np.ndarray:
if self.attributes().compressionType == "lossless":
return self._read_compressed_frame(index)

try:
return np.ndarray(
shape=self._actual_frame_shape(),
dtype=self._dtype(),
buffer=self._mmap,
offset=offset,
strides=self._strides,
)
except TypeError:
# If the chunkmap is wrong, and the mmap isn't long enough
# for the requested offset & size, a TypeError is raised.
return self._missing_frame(index)
# Fast path: mmap for local files
if self._mmap is not None:
try:
return np.ndarray(
shape=self._actual_frame_shape(),
dtype=self._dtype(),
buffer=self._mmap,
offset=offset,
strides=self._strides,
)
except TypeError:
# If the chunkmap is wrong, and the mmap isn't long enough
# for the requested offset & size, a TypeError is raised.
return self._missing_frame(index)

# Fallback: seek/read for remote files (no mmap available)
return self._read_frame_fallback(index, offset)

def _read_compressed_frame(self, index: int) -> np.ndarray:
ch = self._load_chunk(f"ImageDataSeq|{index}!".encode())
Expand All @@ -357,6 +364,133 @@ def _missing_frame(self, index: int = 0) -> np.ndarray:
# TODO: add other modes for filling missing data
return np.zeros(self._raw_frame_shape(), self._dtype())

def _read_frame_fallback(self, index: int, offset: int) -> np.ndarray:
"""Read frame via seek/read for remote files without mmap."""
try:
if self._fh is None:
raise ValueError("File not open")
self._fh.seek(offset)
attr = self.attributes()
# Calculate frame size in bytes
frame_bytes = attr.heightPx * (
attr.widthBytes
or (attr.widthPx or 1)
* (attr.bitsPerComponentInMemory // 8)
* attr.componentCount
)
data = self._fh.read(frame_bytes)
arr = np.frombuffer(data, dtype=self._dtype())
return arr.reshape(self._actual_frame_shape())
except Exception:
return self._missing_frame(index)

def _frame_size_bytes(self) -> int:
"""Calculate frame size in bytes."""
attr = self.attributes()
return attr.heightPx * (
attr.widthBytes
or (attr.widthPx or 1)
* (attr.bitsPerComponentInMemory // 8)
* attr.componentCount
)

def read_frames_parallel(
self,
indices: list[int],
max_workers: int = 64,
) -> list[np.ndarray]:
"""Read multiple frames in parallel (optimized for remote files).

For local files with mmap, this offers minimal benefit over sequential reads.
For remote files, this can saturate high-bandwidth connections by issuing
parallel byte-range HTTP requests.

Parameters
----------
indices : list[int]
Frame indices to read
max_workers : int
Number of parallel workers. Default 64 is good for remote files
to saturate 10Gbit+ connections. For local files, 4-8 is sufficient.

Returns
-------
list[np.ndarray]
List of frame arrays in the same order as indices
"""
if not self._fh:
raise ValueError("Attempt to read from closed nd2 file")

# For local files with mmap, parallel reading has minimal benefit
# Sequential is simpler and nearly as fast
if self._mmap is not None:
return [self.read_frame(i) for i in indices]

# For remote files: parallel reads via thread pool
return self._read_frames_parallel_remote(indices, max_workers)

def _read_frames_parallel_remote(
self,
indices: list[int],
max_workers: int,
) -> list[np.ndarray]:
"""Read frames in parallel using concurrent file handles."""
from nd2._readers.protocol import _open_file

# Pre-compute all offsets
frame_info: list[tuple[int, int | None]] = []
for idx in indices:
offset = self._frame_offsets.get(idx, None)
frame_info.append((idx, offset))

frame_bytes = self._frame_size_bytes()
dtype = self._dtype()
shape = self._actual_frame_shape()
path = self._path
results: dict[int, np.ndarray] = {}

# Thread-local file handles for connection reuse
thread_local = threading.local()
# Track all opened file handles for cleanup
open_handles: list[Any] = []
handles_lock = threading.Lock()

def get_file() -> Any:
if not hasattr(thread_local, "fh"):
thread_local.fh = _open_file(path)
with handles_lock:
open_handles.append(thread_local.fh)
return thread_local.fh

def read_one(info: tuple[int, int | None]) -> tuple[int, np.ndarray]:
idx, offset = info
if offset is None:
return idx, np.zeros(self._raw_frame_shape(), dtype)

fh = get_file()
fh.seek(offset)
data = fh.read(frame_bytes)
arr = np.frombuffer(data, dtype=dtype).reshape(shape)
return idx, arr.copy() # Copy to release buffer

try:
# Execute in parallel
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(read_one, info): info[0] for info in frame_info
}
for future in as_completed(futures):
idx, arr = future.result()
results[idx] = arr
finally:
# Close all thread-local file handles
for fh in open_handles:
with suppress(Exception):
fh.close()

# Return in original order
return [results[idx] for idx in indices]

def _raw_frame_shape(self) -> tuple[int, ...]:
if self._raw_frame_shape_ is None:
attr = self.attributes()
Expand Down
Loading