From dcd7756abe7412392bafbc2cd33cf822a1199433 Mon Sep 17 00:00:00 2001 From: Yossi Mosbacher Date: Sat, 28 Aug 2021 18:16:10 +0300 Subject: [PATCH 1/4] inclusive split exclusive concat --- strax/chunk.py | 68 +++++++++++++++++++++++++++----- tests/test_general_processing.py | 19 +++++---- 2 files changed, 71 insertions(+), 16 deletions(-) diff --git a/strax/chunk.py b/strax/chunk.py index bb6bf84a3..52146763f 100644 --- a/strax/chunk.py +++ b/strax/chunk.py @@ -38,7 +38,8 @@ def __init__(self, end, data, subruns=None, - target_size_mb=default_chunk_size_mb): + target_size_mb=default_chunk_size_mb, + strict_bounds=True): self.data_type = data_type self.data_kind = data_kind self.dtype = np.dtype(dtype) @@ -72,7 +73,7 @@ def __init__(self, raise ValueError(f"Attempt to create chunk {self} " f"with negative length") - if len(self.data): + if len(self.data)and strict_bounds: data_starts_at = self.data[0]['time'] # Check the last 500 samples (arbitrary number) as sanity check data_ends_at = strax.endtime(self.data[-500:]).max() @@ -151,16 +152,25 @@ def _mbs(self): def split(self, t: ty.Union[int, None], - allow_early_split=False): + allow_early_split=False, + allow_overlap=False): """Return (chunk_left, chunk_right) split at time t. :param t: Time at which to split the data. - All data in the left chunk will have their (exclusive) end <= t, - all data in the right chunk will have (inclusive) start >=t. + :param allow_early_split: If False, raise CannotSplit if the requirements above cannot be met. If True, split at the closest possible time before t. + :param allow_overlap: + Whether to allow the split chunks to overlap. + if True, data will be included in a given interval (before/after t) + based on whether it overlaps the interval or not. + if False data will be included based on containment within a given interval: + All data in the left chunk will have their (exclusive) end <= t, + all data in the right chunk will have (inclusive) start >=t. + """ + t = max(min(t, self.end), self.start) if t == self.end: data1, data2 = self.data, self.data[:0] @@ -170,7 +180,8 @@ def split(self, data1, data2, t = split_array( data=self.data, t=t, - allow_early_split=allow_early_split) + allow_early_split=allow_early_split, + allow_overlap=allow_overlap) common_kwargs = dict( run_id=self.run_id, @@ -190,6 +201,37 @@ def split(self, data=data2, **common_kwargs) return c1, c2 + + def split_inclusive(self, t: int): + """Return (chunk_left, chunk_right) split at time t. + + :param t: Time at which to split the data. + All data in the left chunk will have their start <= t, + all data in the right chunk will have their end >=t. + This results in overlapping intervals being included in both chunks. + """ + data1 = self.data[self.data['time']<=t] + data2 = self.data[strax.endtime(self.data)>=t] + + common_kwargs = dict( + run_id=self.run_id, + dtype=self.dtype, + data_type=self.data_type, + data_kind=self.data_kind, + target_size_mb=self.target_size_mb, + strict_bounds=False) + + c1 = strax.Chunk( + start=self.start, + end=max(self.start, t), + data=data1, + **common_kwargs) + c2 = strax.Chunk( + start=max(self.start, t), + end=max(t, self.end), + data=data2, + **common_kwargs) + return c1, c2 @classmethod def merge(cls, chunks, data_type=''): @@ -247,7 +289,7 @@ def merge(cls, chunks, data_type=''): target_size_mb=max([c.target_size_mb for c in chunks])) @classmethod - def concatenate(cls, chunks): + def concatenate(cls, chunks, trim_overlap=False): """Create chunk by concatenating chunks of same data type You can pass None's, they will be ignored """ @@ -272,6 +314,9 @@ def concatenate(cls, chunks): run_id = run_ids[0] subruns = _update_subruns_in_chunk(chunks) + if trim_overlap: + for c in chunks: + c.data = c.data[c.data['time']>=c.start] prev_end = 0 for c in chunks: @@ -292,7 +337,6 @@ def concatenate(cls, chunks): data=np.concatenate([c.data for c in chunks]), target_size_mb=max([c.target_size_mb for c in chunks])) - @export def continuity_check(chunk_iter): """Check continuity of chunks yielded by chunk_iter as they are yielded""" @@ -329,7 +373,7 @@ class CannotSplit(Exception): @export @numba.njit(cache=True, nogil=True) -def split_array(data, t, allow_early_split=False): +def split_array(data, t, allow_early_split=False, allow_overlap=False): """Return (data left of t, data right of t, t), or raise CannotSplit if that would split a data element in two. @@ -348,6 +392,12 @@ def split_array(data, t, allow_early_split=False): if data[0]['time'] >= t: return data[:0], data, t + # Overlaps allowed, split is trivial. + # All data starting before t go in first part + # all data ending after t goes in second part + if allow_overlap: + return data[data['time']<=t], data[strax.endtime(data)>=t], t + # Find: # i_first_beyond: the first element starting after t # splittable_i: nearest index left of t where we can safely split BEFORE diff --git a/tests/test_general_processing.py b/tests/test_general_processing.py index 726bcc790..00d9c887c 100644 --- a/tests/test_general_processing.py +++ b/tests/test_general_processing.py @@ -169,17 +169,18 @@ def test_split(things, split_indices): @hypothesis.settings(deadline=None) @hypothesis.given(strax.testutils.several_fake_records, hypothesis.strategies.integers(0, 50), + hypothesis.strategies.booleans(), hypothesis.strategies.booleans()) -def test_split_array(data, t, allow_early_split): +def test_split_array(data, t, allow_early_split, allow_overlap): print(f"\nCalled with {np.transpose([data['time'], strax.endtime(data)]).tolist()}, " f"{t}, {allow_early_split}") try: data1, data2, tsplit = strax.split_array( - data, t, allow_early_split=allow_early_split) + data, t, allow_early_split=allow_early_split, allow_overlap=allow_overlap) except strax.CannotSplit: - assert not allow_early_split + assert not allow_early_split and not allow_overlap # There must be data straddling t for d in data: if d['time'] < t < strax.endtime(d): @@ -191,10 +192,14 @@ def test_split_array(data, t, allow_early_split): if allow_early_split: assert tsplit <= t t = tsplit - - assert len(data1) + len(data2) == len(data) - assert np.all(strax.endtime(data1) <= t) - assert np.all(data2['time'] >= t) + if allow_overlap: + assert tsplit == t + assert np.all(strax.endtime(data2) >= t) + assert np.all(data1['time'] <= t) + else: + assert len(data1) + len(data2) == len(data) + assert np.all(strax.endtime(data1) <= t) + assert np.all(data2['time'] >= t) @hypothesis.settings(deadline=None) From 4f221aeba69dbd7092d89ffe662942da24b3fabb Mon Sep 17 00:00:00 2001 From: Yossi Mosbacher Date: Sat, 28 Aug 2021 18:19:06 +0300 Subject: [PATCH 2/4] fix typo --- strax/chunk.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/strax/chunk.py b/strax/chunk.py index 52146763f..f5b4ad792 100644 --- a/strax/chunk.py +++ b/strax/chunk.py @@ -73,7 +73,7 @@ def __init__(self, raise ValueError(f"Attempt to create chunk {self} " f"with negative length") - if len(self.data)and strict_bounds: + if len(self.data) and strict_bounds: data_starts_at = self.data[0]['time'] # Check the last 500 samples (arbitrary number) as sanity check data_ends_at = strax.endtime(self.data[-500:]).max() From cc5e36137fcb478a77c9f747e38950f44f0c3318 Mon Sep 17 00:00:00 2001 From: Yossi Mosbacher Date: Sat, 28 Aug 2021 18:19:55 +0300 Subject: [PATCH 3/4] remove redundent method --- strax/chunk.py | 31 ------------------------------- 1 file changed, 31 deletions(-) diff --git a/strax/chunk.py b/strax/chunk.py index f5b4ad792..58f2e1fc3 100644 --- a/strax/chunk.py +++ b/strax/chunk.py @@ -202,37 +202,6 @@ def split(self, **common_kwargs) return c1, c2 - def split_inclusive(self, t: int): - """Return (chunk_left, chunk_right) split at time t. - - :param t: Time at which to split the data. - All data in the left chunk will have their start <= t, - all data in the right chunk will have their end >=t. - This results in overlapping intervals being included in both chunks. - """ - data1 = self.data[self.data['time']<=t] - data2 = self.data[strax.endtime(self.data)>=t] - - common_kwargs = dict( - run_id=self.run_id, - dtype=self.dtype, - data_type=self.data_type, - data_kind=self.data_kind, - target_size_mb=self.target_size_mb, - strict_bounds=False) - - c1 = strax.Chunk( - start=self.start, - end=max(self.start, t), - data=data1, - **common_kwargs) - c2 = strax.Chunk( - start=max(self.start, t), - end=max(t, self.end), - data=data2, - **common_kwargs) - return c1, c2 - @classmethod def merge(cls, chunks, data_type=''): """Create chunk by merging columns of chunks of same data kind From 4914fa5b57fc795ba8089674310b37908d94fad4 Mon Sep 17 00:00:00 2001 From: Yossi Mosbacher Date: Sat, 28 Aug 2021 18:27:52 +0300 Subject: [PATCH 4/4] auto trim chunks on concat --- strax/chunk.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/strax/chunk.py b/strax/chunk.py index 58f2e1fc3..d192ace1a 100644 --- a/strax/chunk.py +++ b/strax/chunk.py @@ -47,6 +47,7 @@ def __init__(self, self.start = start self.end = end self.subruns = subruns + self.strict_bounds = strict_bounds if data is None: data = np.empty(0, dtype) self.data = data @@ -188,7 +189,8 @@ def split(self, dtype=self.dtype, data_type=self.data_type, data_kind=self.data_kind, - target_size_mb=self.target_size_mb) + target_size_mb=self.target_size_mb, + strict_bounds=not allow_overlap) c1 = strax.Chunk( start=self.start, @@ -258,7 +260,7 @@ def merge(cls, chunks, data_type=''): target_size_mb=max([c.target_size_mb for c in chunks])) @classmethod - def concatenate(cls, chunks, trim_overlap=False): + def concatenate(cls, chunks): """Create chunk by concatenating chunks of same data type You can pass None's, they will be ignored """ @@ -283,8 +285,8 @@ def concatenate(cls, chunks, trim_overlap=False): run_id = run_ids[0] subruns = _update_subruns_in_chunk(chunks) - if trim_overlap: - for c in chunks: + for c in chunks: + if not c.strict_bounds: c.data = c.data[c.data['time']>=c.start] prev_end = 0 @@ -304,7 +306,8 @@ def concatenate(cls, chunks, trim_overlap=False): run_id=run_id, subruns=subruns, data=np.concatenate([c.data for c in chunks]), - target_size_mb=max([c.target_size_mb for c in chunks])) + target_size_mb=max([c.target_size_mb for c in chunks]), + strict_bounds=all([c.strict_bounds for c in chunks])) @export def continuity_check(chunk_iter):