Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .coveragerc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[run]
omit = */tests/*
concurrency = multiprocessing
concurrency = multiprocessing,thread

[paths]
source =
Expand Down
5 changes: 3 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ jobs:
MPLBACKEND: agg

- name: Upload coverage to Codecov
# This specific version uses Node 20 - see codecov-action#1293
uses: codecov/codecov-action@v3.1.5
uses: codecov/codecov-action@v5
with:
token: ${{ secrets.CODECOV_TOKEN }}

publish:
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion .readthedocs.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: 2 # Required

build:
os: ubuntu-20.04
os: ubuntu-24.04
tools:
python: "3.12"

Expand Down
69 changes: 66 additions & 3 deletions extra_data/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from .exceptions import SourceNameError
from .reader import DataCollection, by_id, by_index
from .read_machinery import DataChunk, roi_shape, split_trains
from .utils import default_num_threads
from .writer import FileWriter
from .write_cxi import XtdfCXIWriter, JUNGFRAUCXIWriter

Expand Down Expand Up @@ -1216,7 +1217,56 @@
axis=0, out=mod_out[tgt_pulse_sel]
)

def ndarray(self, *, fill_value=None, out=None, roi=(), astype=None, module_gaps=False):
def _read_parallel_decompress(self, out, module_gaps, threads=16):
from .compression import multi_dataset_decompressor, parallel_decompress_chunks

modno_to_keydata_no_virtual = {}
all_datasets = []
for (m, vkd) in self.modno_to_keydata.items():
modno_to_keydata_no_virtual[m] = kd = vkd._without_virtual_overview()
all_datasets.extend([f.file[kd.hdf5_data_path] for f in kd.files])

if any(d.chunks != (1,) + d.shape[1:] for d in all_datasets):
return False # Chunking not as we expect

Check warning on line 1230 in extra_data/components.py

View check run for this annotation

Codecov / codecov/patch

extra_data/components.py#L1230

Added line #L1230 was not covered by tests

decomp_proto = multi_dataset_decompressor(all_datasets)
if decomp_proto is None:
return False # No suitable fast decompression path

load_tasks = []
for i, (modno, kd) in enumerate(sorted(modno_to_keydata_no_virtual.items())):
mod_ix = (modno - self.det._modnos_start_at) if module_gaps else i
# 'chunk' in the lines below means a range of consecutive indices
# in one HDF5 dataset, as elsewhere in EXtra-data.
# We use this to build a list of HDF5 chunks (1 frame per chunk)
# to be loaded & decompressed. Sorry about that.
for chunk in kd._data_chunks:
dset = chunk.dataset

for tgt_slice, chunk_slice in self.det._split_align_chunk(
chunk, self.det.train_ids_perframe,
):
inc_pulses_chunk = self._sel_frames[tgt_slice]
if inc_pulses_chunk.sum() == 0: # No data from this chunk selected
continue

Check warning on line 1251 in extra_data/components.py

View check run for this annotation

Codecov / codecov/patch

extra_data/components.py#L1251

Added line #L1251 was not covered by tests

dataset_ixs = np.nonzero(inc_pulses_chunk)[0] + chunk_slice.start

# Where does this data go in the target array?
tgt_start_ix = self._sel_frames[:tgt_slice.start].sum()

# Each task is a h5py.h5d.DatasetID, coordinates & array destination
load_tasks.extend([
(dset.id, (ds_ix, 0, 0), out[mod_ix, tgt_start_ix + i])
for i, ds_ix in enumerate(dataset_ixs)]
)

parallel_decompress_chunks(load_tasks, decomp_proto, threads=threads)

return True

def ndarray(self, *, fill_value=None, out=None, roi=(), astype=None,
module_gaps=False, decompress_threads=None):
"""Get an array of per-pulse data (image.*) for xtdf detector"""
out_shape = self.buffer_shape(module_gaps=module_gaps, roi=roi)

Expand All @@ -1226,6 +1276,14 @@
elif out.shape != out_shape:
raise ValueError(f'requires output array of shape {out_shape}')

if roi == () and astype is None:
if decompress_threads is None:
decompress_threads = default_num_threads(fixed_limit=16)

if decompress_threads > 1:
if self._read_parallel_decompress(out, module_gaps, decompress_threads):
return out

reading_view = out.view()
if self._extraneous_dim:
reading_view.shape = out.shape[:2] + (1,) + out.shape[2:]
Expand All @@ -1252,8 +1310,13 @@
})

def xarray(self, *, pulses=None, fill_value=None, roi=(), astype=None,
subtrain_index='pulseId', unstack_pulses=False):
arr = self.ndarray(fill_value=fill_value, roi=roi, astype=astype)
subtrain_index='pulseId', unstack_pulses=False, decompress_threads=None):
arr = self.ndarray(
fill_value=fill_value,
roi=roi,
astype=astype,
decompress_threads=decompress_threads,
)
out = self._wrap_xarray(arr, subtrain_index)

if unstack_pulses:
Expand Down
125 changes: 125 additions & 0 deletions extra_data/compression.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import threading
from copy import copy
from multiprocessing.pool import ThreadPool

import h5py
import numpy as np
from zlib_into import decompress_into


def filter_ids(dset: h5py.Dataset):
dcpl = dset.id.get_create_plist()
return [dcpl.get_filter(i)[0] for i in range(dcpl.get_nfilters())]


class DeflateDecompressor:
def __init__(self, deflate_filter_idx=0):
self.deflate_filter_bit = 1 << deflate_filter_idx

@classmethod
def for_dataset(cls, dset: h5py.Dataset):
filters = filter_ids(dset)
if filters == [h5py.h5z.FILTER_DEFLATE]:
return cls()
if dset.dtype.itemsize == 1 and filters == [
h5py.h5z.FILTER_SHUFFLE,
h5py.h5z.FILTER_DEFLATE,
]:
# The shuffle filter doesn't change single byte values, so we can
# skip it.
return cls(deflate_filter_idx=1)

return None

def clone(self):
return copy(self)

def apply_filters(self, data, filter_mask, out):
if filter_mask & self.deflate_filter_bit:
# The deflate filter is skipped, so just copy the data
memoryview(out)[:] = data

Check warning on line 40 in extra_data/compression.py

View check run for this annotation

Codecov / codecov/patch

extra_data/compression.py#L40

Added line #L40 was not covered by tests
else:
decompress_into(data, out)


class ShuffleDeflateDecompressor:
def __init__(self, chunk_shape, dtype):
self.chunk_shape = chunk_shape
self.dtype = dtype
chunk_nbytes = dtype.itemsize
for l in chunk_shape:
chunk_nbytes *= l
# This will hold the decompressed data before shuffling
self.chunk_buf = np.zeros(chunk_nbytes, dtype=np.uint8)
self.shuffled_view = ( # E.g. for int32 data with chunks (10, 5):
self.chunk_buf # (200,) uint8
.reshape((dtype.itemsize, -1)) # (4, 50)
.transpose() # (50, 4)
)
# Check this is still a view on the buffered data
assert self.shuffled_view.base is self.chunk_buf

@classmethod
def for_dataset(cls, dset: h5py.Dataset):
if filter_ids(dset) == [h5py.h5z.FILTER_SHUFFLE, h5py.h5z.FILTER_DEFLATE]:
return cls(dset.chunks, dset.dtype)

return None

def clone(self):
return type(self)(self.chunk_shape, self.dtype)

Check warning on line 70 in extra_data/compression.py

View check run for this annotation

Codecov / codecov/patch

extra_data/compression.py#L70

Added line #L70 was not covered by tests

def apply_filters(self, data, filter_mask, out):
if filter_mask & 2:
# The deflate filter is skipped
memoryview(self.chunk_buf)[:] = data

Check warning on line 75 in extra_data/compression.py

View check run for this annotation

Codecov / codecov/patch

extra_data/compression.py#L75

Added line #L75 was not covered by tests
else:
decompress_into(data, self.chunk_buf)

if filter_mask & 1:
# The shuffle filter is skipped
memoryview(out)[:] = self.chunk_buf

Check warning on line 81 in extra_data/compression.py

View check run for this annotation

Codecov / codecov/patch

extra_data/compression.py#L81

Added line #L81 was not covered by tests
else:
# Numpy does the shuffling by copying data between views
out.reshape((-1, 1)).view(np.uint8)[:] = self.shuffled_view


def dataset_decompressor(dset):
for cls in [DeflateDecompressor, ShuffleDeflateDecompressor]:
if (inst := cls.for_dataset(dset)) is not None:
return inst

return None


def multi_dataset_decompressor(dsets):
if not dsets:
return None

chunk = dsets[0].chunks
dtype = dsets[0]
filters = filter_ids(dsets[0])
for d in dsets[1:]:
if d.chunks != chunk or d.dtype != dtype or filter_ids(d) != filters:
return None # Datasets are not consistent

return dataset_decompressor(dsets[0])


def parallel_decompress_chunks(tasks, decompressor_proto, threads=16):
tlocal = threading.local()

def load_one(dset_id, coord, dest):
try:
decomp = tlocal.decompressor
except AttributeError:
tlocal.decompressor = decomp = decompressor_proto.clone()

if dset_id.get_chunk_info_by_coord(coord).byte_offset is None:
return # Chunk not allocated in file

filter_mask, compdata = dset_id.read_direct_chunk(coord)
decomp.apply_filters(compdata, filter_mask, dest)

with ThreadPool(threads) as pool:
pool.starmap(load_one, tasks)
15 changes: 15 additions & 0 deletions extra_data/keydata.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,21 @@ def source_file_paths(self):
from pathlib import Path
return [Path(p) for p in paths]

def _without_virtual_overview(self):
if not self.files[0].file[self.hdf5_data_path].is_virtual:
# We're already looking at regular source files
return self

return KeyData(
self.source, self.key,
train_ids=self.train_ids,
files=[FileAccess(p) for p in self.source_file_paths],
section=self.section,
dtype=self.dtype,
eshape=self.entry_shape,
inc_suspect_trains=self.inc_suspect_trains,
)

def _find_attributes(self, dset):
"""Find Karabo attributes belonging to a dataset."""
attrs = dict(dset.attrs)
Expand Down
5 changes: 5 additions & 0 deletions extra_data/tests/make_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,11 @@ def make_modern_spb_proc_run(dir_path, format_version='1.2'):
legacy_name=f'SPB_DET_AGIPD1M-1/DET/{modno}CH0')
], ntrains=64, chunksize=32, format_version=format_version)

# Ensure one chunk of mask data is actually written
with h5py.File(path, 'r+') as f:
ds = f['INSTRUMENT/SPB_DET_AGIPD1M-1/CORR/15CH0:output/image/mask']
ds[0, 0, 5] = 1


def make_agipd1m_run(
dir_path,
Expand Down
33 changes: 22 additions & 11 deletions extra_data/tests/mockdata/detectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,22 @@ def write_control(self, f):
def image_keys(self):
if self.raw:
return [
('data', 'u2', self.image_dims),
('length', 'u4', (1,)),
('status', 'u2', (1,)),
('data', 'u2', self.image_dims, {}),
('length', 'u4', (1,), {}),
('status', 'u2', (1,), {}),
]

else:
return [
('data', 'f4', self.image_dims),
('mask', 'u4', self.image_dims),
('gain', 'u1', self.image_dims),
('length', 'u4', (1,)),
('status', 'u2', (1,)),
('data', 'f4', self.image_dims, {}),
('mask', 'u4', self.image_dims, {
'compression': 'gzip', 'compression_opts': 1
}),
('gain', 'u1', self.image_dims, {
'compression': 'gzip', 'compression_opts': 1
}),
('length', 'u4', (1,), {}),
('status', 'u2', (1,), {}),
]

@property
Expand Down Expand Up @@ -138,9 +142,16 @@ def write_instrument(self, f):
)

max_len = None if self.raw else nframes
for (key, datatype, dims) in self.image_keys:
f.create_dataset(f'INSTRUMENT/{inst_source}/image/{key}',
(nframes,) + dims, datatype, maxshape=((max_len,) + dims))
for (key, datatype, dims, kw) in self.image_keys:
if dims == self.image_dims and 'chunks' not in kw:
kw['chunks'] = (1,) + dims
f.create_dataset(
f'INSTRUMENT/{inst_source}/image/{key}',
shape=(nframes,) + dims,
dtype=datatype,
maxshape=((max_len,) + dims),
**kw
)


# INSTRUMENT (other parts)
Expand Down
17 changes: 17 additions & 0 deletions extra_data/tests/test_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,23 @@ def test_modern_corr_sources(mock_modern_spb_proc_run, mock_spb_raw_run_fmt1):
assert 'image.mask' in agipd_dflt


def test_decompress_threads(mock_modern_spb_proc_run):
run = RunDirectory(mock_modern_spb_proc_run)

agipd = AGIPD1M(run[:1])
# Load
ref = agipd['image.mask'].ndarray(decompress_threads=1)
print(ref.shape)
print(ref[15, 0, :10, :10])
assert ref[15, 0, 0, 0] == 0
assert ref[15, 0, 0, 5] == 1

import h5py
h5py._errors.unsilence_errors()
arr = agipd['image.mask'].ndarray(decompress_threads=16)
np.testing.assert_array_equal(arr, ref)


def test_write_virtual_cxi(mock_spb_proc_run, tmpdir):
run = RunDirectory(mock_spb_proc_run)
det = AGIPD1M(run)
Expand Down
Loading