diff --git a/.coveragerc b/.coveragerc index 0eeff90f..4c099339 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,6 +1,6 @@ [run] omit = */tests/* -concurrency = multiprocessing +concurrency = multiprocessing,thread [paths] source = diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 64a36b61..52b4a437 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -40,8 +40,9 @@ jobs: MPLBACKEND: agg - name: Upload coverage to Codecov - # This specific version uses Node 20 - see codecov-action#1293 - uses: codecov/codecov-action@v3.1.5 + uses: codecov/codecov-action@v5 + with: + token: ${{ secrets.CODECOV_TOKEN }} publish: runs-on: ubuntu-latest diff --git a/.readthedocs.yml b/.readthedocs.yml index 7c23d3fa..38390cce 100644 --- a/.readthedocs.yml +++ b/.readthedocs.yml @@ -1,7 +1,7 @@ version: 2 # Required build: - os: ubuntu-20.04 + os: ubuntu-24.04 tools: python: "3.12" diff --git a/extra_data/components.py b/extra_data/components.py index 82c95989..c15ac9a7 100644 --- a/extra_data/components.py +++ b/extra_data/components.py @@ -13,6 +13,7 @@ from .exceptions import SourceNameError from .reader import DataCollection, by_id, by_index from .read_machinery import DataChunk, roi_shape, split_trains +from .utils import default_num_threads from .writer import FileWriter from .write_cxi import XtdfCXIWriter, JUNGFRAUCXIWriter @@ -1216,7 +1217,56 @@ def _read_chunk(self, chunk: DataChunk, mod_out, roi): axis=0, out=mod_out[tgt_pulse_sel] ) - def ndarray(self, *, fill_value=None, out=None, roi=(), astype=None, module_gaps=False): + def _read_parallel_decompress(self, out, module_gaps, threads=16): + from .compression import multi_dataset_decompressor, parallel_decompress_chunks + + modno_to_keydata_no_virtual = {} + all_datasets = [] + for (m, vkd) in self.modno_to_keydata.items(): + modno_to_keydata_no_virtual[m] = kd = vkd._without_virtual_overview() + all_datasets.extend([f.file[kd.hdf5_data_path] for f in kd.files]) + + if any(d.chunks != (1,) + d.shape[1:] for d in all_datasets): + return False # Chunking not as we expect + + decomp_proto = multi_dataset_decompressor(all_datasets) + if decomp_proto is None: + return False # No suitable fast decompression path + + load_tasks = [] + for i, (modno, kd) in enumerate(sorted(modno_to_keydata_no_virtual.items())): + mod_ix = (modno - self.det._modnos_start_at) if module_gaps else i + # 'chunk' in the lines below means a range of consecutive indices + # in one HDF5 dataset, as elsewhere in EXtra-data. + # We use this to build a list of HDF5 chunks (1 frame per chunk) + # to be loaded & decompressed. Sorry about that. + for chunk in kd._data_chunks: + dset = chunk.dataset + + for tgt_slice, chunk_slice in self.det._split_align_chunk( + chunk, self.det.train_ids_perframe, + ): + inc_pulses_chunk = self._sel_frames[tgt_slice] + if inc_pulses_chunk.sum() == 0: # No data from this chunk selected + continue + + dataset_ixs = np.nonzero(inc_pulses_chunk)[0] + chunk_slice.start + + # Where does this data go in the target array? + tgt_start_ix = self._sel_frames[:tgt_slice.start].sum() + + # Each task is a h5py.h5d.DatasetID, coordinates & array destination + load_tasks.extend([ + (dset.id, (ds_ix, 0, 0), out[mod_ix, tgt_start_ix + i]) + for i, ds_ix in enumerate(dataset_ixs)] + ) + + parallel_decompress_chunks(load_tasks, decomp_proto, threads=threads) + + return True + + def ndarray(self, *, fill_value=None, out=None, roi=(), astype=None, + module_gaps=False, decompress_threads=None): """Get an array of per-pulse data (image.*) for xtdf detector""" out_shape = self.buffer_shape(module_gaps=module_gaps, roi=roi) @@ -1226,6 +1276,14 @@ def ndarray(self, *, fill_value=None, out=None, roi=(), astype=None, module_gaps elif out.shape != out_shape: raise ValueError(f'requires output array of shape {out_shape}') + if roi == () and astype is None: + if decompress_threads is None: + decompress_threads = default_num_threads(fixed_limit=16) + + if decompress_threads > 1: + if self._read_parallel_decompress(out, module_gaps, decompress_threads): + return out + reading_view = out.view() if self._extraneous_dim: reading_view.shape = out.shape[:2] + (1,) + out.shape[2:] @@ -1252,8 +1310,13 @@ def _wrap_xarray(self, arr, subtrain_index='pulseId'): }) def xarray(self, *, pulses=None, fill_value=None, roi=(), astype=None, - subtrain_index='pulseId', unstack_pulses=False): - arr = self.ndarray(fill_value=fill_value, roi=roi, astype=astype) + subtrain_index='pulseId', unstack_pulses=False, decompress_threads=None): + arr = self.ndarray( + fill_value=fill_value, + roi=roi, + astype=astype, + decompress_threads=decompress_threads, + ) out = self._wrap_xarray(arr, subtrain_index) if unstack_pulses: diff --git a/extra_data/compression.py b/extra_data/compression.py new file mode 100644 index 00000000..45c7b424 --- /dev/null +++ b/extra_data/compression.py @@ -0,0 +1,125 @@ +import threading +from copy import copy +from multiprocessing.pool import ThreadPool + +import h5py +import numpy as np +from zlib_into import decompress_into + + +def filter_ids(dset: h5py.Dataset): + dcpl = dset.id.get_create_plist() + return [dcpl.get_filter(i)[0] for i in range(dcpl.get_nfilters())] + + +class DeflateDecompressor: + def __init__(self, deflate_filter_idx=0): + self.deflate_filter_bit = 1 << deflate_filter_idx + + @classmethod + def for_dataset(cls, dset: h5py.Dataset): + filters = filter_ids(dset) + if filters == [h5py.h5z.FILTER_DEFLATE]: + return cls() + if dset.dtype.itemsize == 1 and filters == [ + h5py.h5z.FILTER_SHUFFLE, + h5py.h5z.FILTER_DEFLATE, + ]: + # The shuffle filter doesn't change single byte values, so we can + # skip it. + return cls(deflate_filter_idx=1) + + return None + + def clone(self): + return copy(self) + + def apply_filters(self, data, filter_mask, out): + if filter_mask & self.deflate_filter_bit: + # The deflate filter is skipped, so just copy the data + memoryview(out)[:] = data + else: + decompress_into(data, out) + + +class ShuffleDeflateDecompressor: + def __init__(self, chunk_shape, dtype): + self.chunk_shape = chunk_shape + self.dtype = dtype + chunk_nbytes = dtype.itemsize + for l in chunk_shape: + chunk_nbytes *= l + # This will hold the decompressed data before shuffling + self.chunk_buf = np.zeros(chunk_nbytes, dtype=np.uint8) + self.shuffled_view = ( # E.g. for int32 data with chunks (10, 5): + self.chunk_buf # (200,) uint8 + .reshape((dtype.itemsize, -1)) # (4, 50) + .transpose() # (50, 4) + ) + # Check this is still a view on the buffered data + assert self.shuffled_view.base is self.chunk_buf + + @classmethod + def for_dataset(cls, dset: h5py.Dataset): + if filter_ids(dset) == [h5py.h5z.FILTER_SHUFFLE, h5py.h5z.FILTER_DEFLATE]: + return cls(dset.chunks, dset.dtype) + + return None + + def clone(self): + return type(self)(self.chunk_shape, self.dtype) + + def apply_filters(self, data, filter_mask, out): + if filter_mask & 2: + # The deflate filter is skipped + memoryview(self.chunk_buf)[:] = data + else: + decompress_into(data, self.chunk_buf) + + if filter_mask & 1: + # The shuffle filter is skipped + memoryview(out)[:] = self.chunk_buf + else: + # Numpy does the shuffling by copying data between views + out.reshape((-1, 1)).view(np.uint8)[:] = self.shuffled_view + + +def dataset_decompressor(dset): + for cls in [DeflateDecompressor, ShuffleDeflateDecompressor]: + if (inst := cls.for_dataset(dset)) is not None: + return inst + + return None + + +def multi_dataset_decompressor(dsets): + if not dsets: + return None + + chunk = dsets[0].chunks + dtype = dsets[0] + filters = filter_ids(dsets[0]) + for d in dsets[1:]: + if d.chunks != chunk or d.dtype != dtype or filter_ids(d) != filters: + return None # Datasets are not consistent + + return dataset_decompressor(dsets[0]) + + +def parallel_decompress_chunks(tasks, decompressor_proto, threads=16): + tlocal = threading.local() + + def load_one(dset_id, coord, dest): + try: + decomp = tlocal.decompressor + except AttributeError: + tlocal.decompressor = decomp = decompressor_proto.clone() + + if dset_id.get_chunk_info_by_coord(coord).byte_offset is None: + return # Chunk not allocated in file + + filter_mask, compdata = dset_id.read_direct_chunk(coord) + decomp.apply_filters(compdata, filter_mask, dest) + + with ThreadPool(threads) as pool: + pool.starmap(load_one, tasks) diff --git a/extra_data/keydata.py b/extra_data/keydata.py index 0060f254..b2d45ac7 100644 --- a/extra_data/keydata.py +++ b/extra_data/keydata.py @@ -165,6 +165,21 @@ def source_file_paths(self): from pathlib import Path return [Path(p) for p in paths] + def _without_virtual_overview(self): + if not self.files[0].file[self.hdf5_data_path].is_virtual: + # We're already looking at regular source files + return self + + return KeyData( + self.source, self.key, + train_ids=self.train_ids, + files=[FileAccess(p) for p in self.source_file_paths], + section=self.section, + dtype=self.dtype, + eshape=self.entry_shape, + inc_suspect_trains=self.inc_suspect_trains, + ) + def _find_attributes(self, dset): """Find Karabo attributes belonging to a dataset.""" attrs = dict(dset.attrs) diff --git a/extra_data/tests/make_examples.py b/extra_data/tests/make_examples.py index cd04082d..949bb19c 100644 --- a/extra_data/tests/make_examples.py +++ b/extra_data/tests/make_examples.py @@ -355,6 +355,11 @@ def make_modern_spb_proc_run(dir_path, format_version='1.2'): legacy_name=f'SPB_DET_AGIPD1M-1/DET/{modno}CH0') ], ntrains=64, chunksize=32, format_version=format_version) + # Ensure one chunk of mask data is actually written + with h5py.File(path, 'r+') as f: + ds = f['INSTRUMENT/SPB_DET_AGIPD1M-1/CORR/15CH0:output/image/mask'] + ds[0, 0, 5] = 1 + def make_agipd1m_run( dir_path, diff --git a/extra_data/tests/mockdata/detectors.py b/extra_data/tests/mockdata/detectors.py index e48a62d2..b7a881a4 100644 --- a/extra_data/tests/mockdata/detectors.py +++ b/extra_data/tests/mockdata/detectors.py @@ -38,18 +38,22 @@ def write_control(self, f): def image_keys(self): if self.raw: return [ - ('data', 'u2', self.image_dims), - ('length', 'u4', (1,)), - ('status', 'u2', (1,)), + ('data', 'u2', self.image_dims, {}), + ('length', 'u4', (1,), {}), + ('status', 'u2', (1,), {}), ] else: return [ - ('data', 'f4', self.image_dims), - ('mask', 'u4', self.image_dims), - ('gain', 'u1', self.image_dims), - ('length', 'u4', (1,)), - ('status', 'u2', (1,)), + ('data', 'f4', self.image_dims, {}), + ('mask', 'u4', self.image_dims, { + 'compression': 'gzip', 'compression_opts': 1 + }), + ('gain', 'u1', self.image_dims, { + 'compression': 'gzip', 'compression_opts': 1 + }), + ('length', 'u4', (1,), {}), + ('status', 'u2', (1,), {}), ] @property @@ -138,9 +142,16 @@ def write_instrument(self, f): ) max_len = None if self.raw else nframes - for (key, datatype, dims) in self.image_keys: - f.create_dataset(f'INSTRUMENT/{inst_source}/image/{key}', - (nframes,) + dims, datatype, maxshape=((max_len,) + dims)) + for (key, datatype, dims, kw) in self.image_keys: + if dims == self.image_dims and 'chunks' not in kw: + kw['chunks'] = (1,) + dims + f.create_dataset( + f'INSTRUMENT/{inst_source}/image/{key}', + shape=(nframes,) + dims, + dtype=datatype, + maxshape=((max_len,) + dims), + **kw + ) # INSTRUMENT (other parts) diff --git a/extra_data/tests/test_components.py b/extra_data/tests/test_components.py index 8f750d91..054a68ae 100644 --- a/extra_data/tests/test_components.py +++ b/extra_data/tests/test_components.py @@ -592,6 +592,23 @@ def test_modern_corr_sources(mock_modern_spb_proc_run, mock_spb_raw_run_fmt1): assert 'image.mask' in agipd_dflt +def test_decompress_threads(mock_modern_spb_proc_run): + run = RunDirectory(mock_modern_spb_proc_run) + + agipd = AGIPD1M(run[:1]) + # Load + ref = agipd['image.mask'].ndarray(decompress_threads=1) + print(ref.shape) + print(ref[15, 0, :10, :10]) + assert ref[15, 0, 0, 0] == 0 + assert ref[15, 0, 0, 5] == 1 + + import h5py + h5py._errors.unsilence_errors() + arr = agipd['image.mask'].ndarray(decompress_threads=16) + np.testing.assert_array_equal(arr, ref) + + def test_write_virtual_cxi(mock_spb_proc_run, tmpdir): run = RunDirectory(mock_spb_proc_run) det = AGIPD1M(run) diff --git a/extra_data/tests/test_compression.py b/extra_data/tests/test_compression.py new file mode 100644 index 00000000..32e3f7d1 --- /dev/null +++ b/extra_data/tests/test_compression.py @@ -0,0 +1,58 @@ +"""Test the decompression machinery""" + +import h5py +import numpy as np + +from extra_data.compression import ( + DeflateDecompressor, ShuffleDeflateDecompressor, + dataset_decompressor, multi_dataset_decompressor, filter_ids, +) + +def test_deflate(tmp_path): + f = h5py.File(tmp_path / 'test.h5', 'w') + # Shuffling single-byte data is a no-op + arr = np.arange(200, dtype=np.uint8).reshape(4, 50) + ds = f.create_dataset('d', data=arr, chunks=(4, 10), shuffle=True, compression='gzip') + assert filter_ids(ds) == [h5py.h5z.FILTER_SHUFFLE, h5py.h5z.FILTER_DEFLATE] + + decomp = dataset_decompressor(ds) + assert isinstance(decomp, DeflateDecompressor) + + filter_mask, data = ds.id.read_direct_chunk((0, 0)) + out = np.zeros((4, 10), dtype=np.uint8) + decomp.apply_filters(data, filter_mask, out) + np.testing.assert_array_equal(out, arr[:, :10]) + + +def test_shuffle_deflate(tmp_path): + f = h5py.File(tmp_path / 'test.h5', 'w') + # Shuffling single-byte data is a no-op + arr = np.arange(200, dtype=np.uint32).reshape(4, 50) + ds = f.create_dataset('d', data=arr, chunks=(4, 10), shuffle=True, compression='gzip') + assert filter_ids(ds) == [h5py.h5z.FILTER_SHUFFLE, h5py.h5z.FILTER_DEFLATE] + + decomp = dataset_decompressor(ds) + assert isinstance(decomp, ShuffleDeflateDecompressor) + + filter_mask, data = ds.id.read_direct_chunk((0, 0)) + out = np.zeros((4, 10), dtype=np.uint32) + decomp.apply_filters(data, filter_mask, out) + np.testing.assert_array_equal(out, arr[:, :10]) + + +def test_multi_dataset_decompressor(tmp_path): + f = h5py.File(tmp_path / 'test.h5', 'w') + ds1 = f.create_dataset('a', shape=(10, 50), chunks=(1, 50), + compression='gzip', dtype=np.uint32) + ds2 = f.create_dataset('b', shape=(20, 50), chunks=(1, 50), + compression='gzip', dtype=np.uint32) + ds3 = f.create_dataset('c', shape=(10, 50), chunks=(1, 50), + compression='gzip', dtype=np.uint8) + + # Differing shape is OK + assert isinstance(multi_dataset_decompressor([ds1, ds2]), DeflateDecompressor) + + # But dtype needs to match + assert multi_dataset_decompressor([ds1, ds3]) is None + + assert multi_dataset_decompressor([]) is None diff --git a/extra_data/utils.py b/extra_data/utils.py index 592c08f3..8e25bbc8 100644 --- a/extra_data/utils.py +++ b/extra_data/utils.py @@ -23,6 +23,16 @@ def available_cpu_cores(): return min(ncpu, 8) +def default_num_threads(fixed_limit=16): + # Default to 16, EXTRA_NUM_THREADS, or available CPU cores (picking lowest) + threads_limits = ([fixed_limit, available_cpu_cores()]) + try: + threads_limits.append(int(os.environ['EXTRA_NUM_THREADS'])) + except (KeyError, ValueError): # Not set, or not an integer + pass + return min(threads_limits) + + def progress_bar(done, total, suffix=" "): line = f"Progress: {done}/{total}{suffix}[{{}}]" length = min(get_terminal_size().columns - len(line), 50) diff --git a/setup.py b/setup.py index 48ac5362..e33aa0e8 100755 --- a/setup.py +++ b/setup.py @@ -58,6 +58,7 @@ def find_version(*parts): 'pandas', 'xarray', 'pyyaml', + 'zlib_into', ], extras_require={ 'bridge': [