Skip to content
135 changes: 121 additions & 14 deletions extra_data/keydata.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
from .exceptions import TrainIDError, NoDataError
from .file_access import FileAccess
from .read_machinery import (
contiguous_regions, DataChunk, select_train_ids, split_trains, roi_shape
contiguous_regions, DataChunk, select_train_ids, split_trains, roi_shape,
zeros_shared
)
from .utils import available_cpu_cores

class KeyData:
"""Data for one key in one source
Expand Down Expand Up @@ -216,33 +218,128 @@ def as_single_value(self, rtol=1e-5, atol=0.0, reduce_by='median'):

# Getting data as different kinds of array: -------------------------------

def ndarray(self, roi=(), out=None):
def _read_tasks(self, parts=1):
"""Produce file chunks with corresponding slices of the output array"""
dest_cursor = 0
for part in self.split_trains(parts=parts):
for chunk in part._data_chunks_nonempty:
dest_chunk_end = dest_cursor + chunk.total_count
yield chunk, np.s_[dest_cursor:dest_chunk_end]
dest_cursor = dest_chunk_end

# In worker processes, we need the shared output array accessible as global
# without copying it. Each object launches its own worker processes to
# read data, so global variables in the workers are safe.
_out_arr = None
@classmethod
def _set_out_arr(cls, out):
cls._out_arr = out

@classmethod
def _read_chunk(cls, chunk, dest_slice, roi):
out = cls._out_arr
chunk.dataset.read_direct(
out[dest_slice], source_sel=(chunk.slice,) + roi
)

@staticmethod
def _read_direct_frames(chunk):
return [
chunk.dataset.id.read_direct_chunk((i, 0, 0, 0))[1]
for i in range(chunk.first, int(chunk.first) + chunk.total_count)
]

def _read_compressed_frames(self, out, roi=(), read_procs=1, decomp_threads=1):
"""Read frame-wise compressed data (mask/gain) for 2D detectors,
decompressing data in parallel"""
from multiprocessing.pool import Pool, ThreadPool
import zlib

if read_procs < 0:
read_procs = 10 # Seems to be a decent guess on GPFS
if decomp_threads < 0:
decomp_threads = available_cpu_cores()

if read_procs == 1:
all_compressed_frames = sum([
self._read_direct_frames(chk) for chk in self._data_chunks_nonempty
], [])
else:
chunks = sum([
p._data_chunks_nonempty for p in self.split_trains(parts=read_procs)
], [])
with Pool(processes=read_procs) as ppool:
chks_compressed = ppool.map(self._read_direct_frames, chunks)
all_compressed_frames = sum(chks_compressed, [])

def decompress_frame(target_ix, compressed):
shuffled = zlib.decompress(compressed)
# Equivalent to the HDF5 'shuffle' filter: transpose bytes
data = np.ascontiguousarray(
np.frombuffer(shuffled, dtype=np.uint8).reshape((out.itemsize, -1)).transpose()
).view(out.dtype).reshape(1, 512, 1024)
out[target_ix] = data[roi]

with ThreadPool(decomp_threads) as tpool:
tpool.starmap(decompress_frame, enumerate(all_compressed_frames))

return out

def ndarray(self, roi=(), out=None, read_procs=1, decomp_threads=1):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decomp_threads failed for me when loading an AGIPD module with shape (200000, 512, 128):
image

"""Load this data as a numpy array

*roi* may be a ``numpy.s_[]`` expression to load e.g. only part of each
image from a camera. If *out* is not given, a suitable array will be
allocated.

Passing *read_procs* will use multiple processes to read the data in
parallel; using up to 8-10 processes seems to speed up reading larger
data, if your code is not parallelised at another level. For certain
compressed data (gain & mask information from corrected 2D detectors),
you can decompress it in parallel by passing *decomp_threads*. A value
of -1 will use one thread per CPU core.
"""
if not isinstance(roi, tuple):
roi = (roi,)

req_shape = self.shape[:1] + roi_shape(self.entry_shape, roi)

if out is None:
out = np.empty(req_shape, dtype=self.dtype)
if read_procs == 1:
out = np.empty(req_shape, dtype=self.dtype)
else:
out = zeros_shared(req_shape, self.dtype)
elif out is not None and out.shape != req_shape:
raise ValueError(f'requires output array of shape {req_shape}')

# Read the data from each chunk into the result array
dest_cursor = 0
for chunk in self._data_chunks_nonempty:
dest_chunk_end = dest_cursor + chunk.total_count
if decomp_threads != 1 and self.ndim >= 3:
dset = self.files[0].file[self.hdf5_data_path]
if (dset.chunks == (1,) + self.entry_shape
and dset.compression == 'gzip'
and dset.shuffle
and not dset.fletcher32
and not dset.scaleoffset
):
return self._read_compressed_frames(out, roi, read_procs, decomp_threads)

slices = (chunk.slice,) + roi
chunk.dataset.read_direct(
out[dest_cursor:dest_chunk_end], source_sel=slices
)
dest_cursor = dest_chunk_end

# Read the data from each chunk into the result array
if read_procs == 1:
for chunk, dest_slice in self._read_tasks():
chunk.dataset.read_direct(
out[dest_slice], source_sel=(chunk.slice,) + roi
)
else:
from functools import partial
from multiprocessing import Pool
if read_procs < 0:
read_procs = 10 # Seems to be a decent guess on GPFS

with Pool(processes=read_procs, initializer=self._set_out_arr, initargs=(out,)) as pool:
pool.starmap(
partial(self._read_chunk, roi=roi),
self._read_tasks(parts=read_procs)
)

return out

Expand All @@ -268,7 +365,7 @@ def train_id_coordinates(self):
]
return np.concatenate(chunks_trainids)

def xarray(self, extra_dims=None, roi=(), name=None):
def xarray(self, extra_dims=None, roi=(), name=None, read_procs=1, decomp_threads=1):
"""Load this data as a labelled xarray array or dataset.

The first dimension is labelled with train IDs. Other dimensions
Expand Down Expand Up @@ -298,10 +395,20 @@ def xarray(self, extra_dims=None, roi=(), name=None):
name: str
Name the array itself. The default is the source and key joined
by a dot. Ignored for structured data when a dataset is returned.
read_procs: int
Use this many processes to read data. Using up to 8-10 processes
seems to accelerate data access.
decomp_threads: int
Use this many threads to decompress certain data, or -1 for one
thread per CPU core. This only applies to compressed gain/mask
data from corrected 2D detectors, and will be ignored for other
keys.
"""
import xarray

ndarr = self.ndarray(roi=roi)
ndarr = self.ndarray(
roi=roi, read_procs=read_procs, decomp_threads=decomp_threads
)

# Dimension labels
if extra_dims is None:
Expand Down
19 changes: 19 additions & 0 deletions extra_data/read_machinery.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,3 +271,22 @@ def same_run(*args) -> bool:


glob_wildcards_re = re.compile(r'([*?[])')


# Based on _alloc function in pasha
def zeros_shared(shape, dtype):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we expose this? Or add something like KeyData.allocate_out(shared=True)?

# Allocate shared memory via mmap.
import mmap

n_elements = 1
for _s in shape:
n_elements *= _s

n_bytes = n_elements * np.dtype(dtype).itemsize
n_pages = n_bytes // mmap.PAGESIZE + 1

buf = mmap.mmap(-1, n_pages * mmap.PAGESIZE,
flags=mmap.MAP_SHARED | mmap.MAP_ANONYMOUS,
prot=mmap.PROT_READ | mmap.PROT_WRITE)

return np.frombuffer(memoryview(buf)[:n_bytes], dtype=dtype).reshape(shape)
9 changes: 9 additions & 0 deletions extra_data/tests/test_keydata.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,15 @@ def test_ndarray_out(mock_spb_raw_run):
assert buf_in is buf_out


def test_ndarray_read_multiproc(mock_spb_raw_run):
f = RunDirectory(mock_spb_raw_run)
cam = f['SPB_IRU_CAM/CAM/SIDEMIC:daqOutput', 'data.image.dims']

arr = cam.ndarray()
arr_p = cam.ndarray(read_procs=4)
np.testing.assert_array_equal(arr_p, arr)


def test_xarray_structured_data(mock_remi_run):
run = RunDirectory(mock_remi_run)
dset = run['SQS_REMI_DLD6/DET/TOP:output', 'rec.hits'].xarray()
Expand Down