Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 46 additions & 8 deletions step_detect.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,14 +188,52 @@ def find_steps(array, threshold):
List of indices of the detected steps

"""
steps = []
array = np.abs(array)
above_points = np.where(array > threshold, 1, 0)
ap_dif = np.diff(above_points)
cross_ups = np.where(ap_dif == 1)[0]
cross_dns = np.where(ap_dif == -1)[0]
for upi, dni in zip(cross_ups,cross_dns):
steps.append(np.argmax(array[upi:dni]) + upi)
steps = list()
# First create array ap_dif which is 1 where data in series crossed from below threshold to above threshold
# (upward), -1 where it crosses downward, and 0 elsewhere.
array_abs = np.abs(array)
above_points = np.where(array_abs > threshold, 1, 0)
ap_dif = np.diff(above_points)

# To handle the edge case where data skips from being greater than the threshold on one side (positive or negative)
# to the opposite side without any transition points, manually insert -1 and 1 values into ap_diff so that these
# transitions will be accounted for.
pos_to_neg_diff = np.diff(np.where(array > threshold, 1, 0) + np.where(array < -threshold, -1, 0))
pos_to_neg_skips = np.where(np.abs(pos_to_neg_diff) == 2)[0]
for ix in pos_to_neg_skips:
if ap_dif[ix - 1] != 1: # only change if there's not an overlapping jump to avoid making ap_dif inconsistent
ap_dif[ix - 1] = -1
ap_dif[ix] = 1


# Compute list of indices in series where the differences / "first derivative" cross 0.
# This allows us to handle the edge case where multiple step changes occur in the source data while series remains
# above the threshold the entire time (otherwise only one step would be returned for this region). By checking
# the number of zero crossings that lie within the region bounded by the two threshold crossings
# we can iterate through the odd zero crossing indices and return the maximum value of series between each
# subset, each representing a different step event. deriv_zero_crossings_cnt should normally be 1, however if there
# are two step changes while above threshold it should be 3, three step changes -> 5, and so on.

series_first_deriv_zero_crossings = np.where(np.diff(np.sign(np.diff(array))))[0]
cross_ups = np.where(ap_dif == 1)[0]
cross_dns = np.where(ap_dif == -1)[0]

# In case the first identified threshold crossing is down instead of an up, add 0 to array of ups to pair it up
if len(cross_ups) > 0 and len(cross_dns) > 0 and cross_ups[0] > cross_dns[0]:
cross_ups = np.insert(cross_ups, 0, 0)

for upi, dni in zip(cross_ups, cross_dns):
deriv_zero_crossings = series_first_deriv_zero_crossings[
(series_first_deriv_zero_crossings > upi) & (series_first_deriv_zero_crossings < dni)]
deriv_zero_crossings_cnt = len(deriv_zero_crossings)
if deriv_zero_crossings_cnt % 2 == 1:
for i, cross in enumerate(deriv_zero_crossings):
if i % 2 == 1:
steps.append(np.argmax(array_abs[upi:cross + 1]) + upi)
upi = cross
steps.append(np.argmax(array_abs[upi:cross + 1]) + upi)
else:
steps.append(np.argmax(array_abs[upi:dni+1]) + upi)
return steps


Expand Down
1 change: 1 addition & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from . import test_step_detect
72 changes: 72 additions & 0 deletions tests/test_step_detect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""
To run (from root directory):
python -m unittest discover tests
"""
import unittest
import numpy as np
from scipy.ndimage.filters import gaussian_filter1d
import step_detect


def step_detect_test_data(amplitude, sd):
"""
Function to generate time series data with defined step changes (from 0 to ``A``) and noise with standard deviation
of ``sd`` (using fixed random seed so that code will be consistent in either failing or passing the unit test
functions).
:param float amplitude: Amplitude of step changes
:param float sd: Standard deviation of the normal distribution that noise is drawn from to add noise
to data (deterministic since using fixed random seed)
:return: tuple of x and y data
:rtype: (numpy.ndarray, numpy.ndarray)
"""
x = np.linspace(0, 500, 1000)
y = np.zeros_like(x)
y[np.logical_and(x > 50, x < 150)] = amplitude
y[np.logical_and(x > 300, x < 350)] = amplitude
y[np.logical_and(x > 357, x < 437.5)] = amplitude
prng = np.random.RandomState(0)
y += prng.normal(scale=sd, size=x.shape)
return y


class TestStepDetectFunctions(unittest.TestCase):
def setUp(self):
"""
Method that initializes data before running the tests.
In this case we generate signals with known analytical properties
such that it's easier to identify the correctness of the
functions.
"""
self.jump_magnitude = 1000
self.series1 = step_detect_test_data(self.jump_magnitude, self.jump_magnitude / 10)
self.series1_filtered = gaussian_filter1d(self.series1.flatten(), 2, order=1)
self.series2 = np.array([1704.70739332, 1687.10112437, 1681.68248344, 1678.11859602,
1683.25987459, 1676.34517265, 1683.75263377, 1670.12514163,
1694.67019815, 1675.8585799, 1698.97665797, 1681.66520782,
1700.68003068, 1671.82624614, 1666.37968373, 3211.59480037,
3303.86229941, 3289.33136964, 3299.01543621, 3286.30224878,
3292.78710246, 3297.47755896, 2574.27847639, 1665.98144833,
1682.55691805, 1681.6211562, 1671.7071523, 1670.22268554,
1656.43500242, 1661.8255372])
self.series2_filtered = gaussian_filter1d(self.series2.flatten(), 2, order=1)
self.series3 = np.array([1883.87443401, 1888.4688974, 1893.1127929, 1892.13823514,
1891.87245162, 1882.15734314, 1909.32780929, 1890.95291424,
1891.69441096, 1895.73540552, 1879.59040827, 2207.4511012,
3594.28900568, 3539.73771894, 3512.77316641, 3474.76579655,
3445.91259433, 3449.58999, 3538.07678893, 5071.54845559,
5023.52196979, 5008.00727191, 4997.53409712, 4973.39290886,
4944.09166901, 4943.28154164])
self.series3 = np.tile(self.series3, 2)
self.series3[int(len(self.series3) / 2):] += 4500
self.series3_filtered = gaussian_filter1d(self.series3.flatten(), 2, order=1)

def test_find_steps(self):
steps = step_detect.find_steps(self.series1_filtered, 50)
self.assertTrue(len(steps) == 6, "length of jumps for test series1 ({}) != 6".format(len(steps)))
for i, true_ix in enumerate([100, 300, 600, 700, 714, 875]):
self.assertTrue(np.abs(steps[i] - true_ix) < 3,
"jump location {} more than 2 off from truth for test series 1".format(steps[i]))
steps = step_detect.find_steps(self.series2_filtered, 10)
self.assertTrue(len(steps) == 2, "length of jumps for test series2 ({} != 2)".format(len(steps)))
steps = step_detect.find_steps(self.series3_filtered, 10)
self.assertTrue(len(steps) == 5, "incorrect number of jumps for test series 3 ({} != 5)".format(len(steps)))
Binary file added tests/test_step_detect.pyc
Binary file not shown.