diff --git a/step_detect.py b/step_detect.py index 9a417b6..e1b1fc7 100644 --- a/step_detect.py +++ b/step_detect.py @@ -188,14 +188,52 @@ def find_steps(array, threshold): List of indices of the detected steps """ - steps = [] - array = np.abs(array) - above_points = np.where(array > threshold, 1, 0) - ap_dif = np.diff(above_points) - cross_ups = np.where(ap_dif == 1)[0] - cross_dns = np.where(ap_dif == -1)[0] - for upi, dni in zip(cross_ups,cross_dns): - steps.append(np.argmax(array[upi:dni]) + upi) + steps = list() + # First create array ap_dif which is 1 where data in series crossed from below threshold to above threshold + # (upward), -1 where it crosses downward, and 0 elsewhere. + array_abs = np.abs(array) + above_points = np.where(array_abs > threshold, 1, 0) + ap_dif = np.diff(above_points) + + # To handle the edge case where data skips from being greater than the threshold on one side (positive or negative) + # to the opposite side without any transition points, manually insert -1 and 1 values into ap_diff so that these + # transitions will be accounted for. + pos_to_neg_diff = np.diff(np.where(array > threshold, 1, 0) + np.where(array < -threshold, -1, 0)) + pos_to_neg_skips = np.where(np.abs(pos_to_neg_diff) == 2)[0] + for ix in pos_to_neg_skips: + if ap_dif[ix - 1] != 1: # only change if there's not an overlapping jump to avoid making ap_dif inconsistent + ap_dif[ix - 1] = -1 + ap_dif[ix] = 1 + + + # Compute list of indices in series where the differences / "first derivative" cross 0. + # This allows us to handle the edge case where multiple step changes occur in the source data while series remains + # above the threshold the entire time (otherwise only one step would be returned for this region). By checking + # the number of zero crossings that lie within the region bounded by the two threshold crossings + # we can iterate through the odd zero crossing indices and return the maximum value of series between each + # subset, each representing a different step event. deriv_zero_crossings_cnt should normally be 1, however if there + # are two step changes while above threshold it should be 3, three step changes -> 5, and so on. + + series_first_deriv_zero_crossings = np.where(np.diff(np.sign(np.diff(array))))[0] + cross_ups = np.where(ap_dif == 1)[0] + cross_dns = np.where(ap_dif == -1)[0] + + # In case the first identified threshold crossing is down instead of an up, add 0 to array of ups to pair it up + if len(cross_ups) > 0 and len(cross_dns) > 0 and cross_ups[0] > cross_dns[0]: + cross_ups = np.insert(cross_ups, 0, 0) + + for upi, dni in zip(cross_ups, cross_dns): + deriv_zero_crossings = series_first_deriv_zero_crossings[ + (series_first_deriv_zero_crossings > upi) & (series_first_deriv_zero_crossings < dni)] + deriv_zero_crossings_cnt = len(deriv_zero_crossings) + if deriv_zero_crossings_cnt % 2 == 1: + for i, cross in enumerate(deriv_zero_crossings): + if i % 2 == 1: + steps.append(np.argmax(array_abs[upi:cross + 1]) + upi) + upi = cross + steps.append(np.argmax(array_abs[upi:cross + 1]) + upi) + else: + steps.append(np.argmax(array_abs[upi:dni+1]) + upi) return steps diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e787b41 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1 @@ +from . import test_step_detect diff --git a/tests/test_step_detect.py b/tests/test_step_detect.py new file mode 100644 index 0000000..cc242a9 --- /dev/null +++ b/tests/test_step_detect.py @@ -0,0 +1,72 @@ +""" +To run (from root directory): +python -m unittest discover tests +""" +import unittest +import numpy as np +from scipy.ndimage.filters import gaussian_filter1d +import step_detect + + +def step_detect_test_data(amplitude, sd): + """ + Function to generate time series data with defined step changes (from 0 to ``A``) and noise with standard deviation + of ``sd`` (using fixed random seed so that code will be consistent in either failing or passing the unit test + functions). + :param float amplitude: Amplitude of step changes + :param float sd: Standard deviation of the normal distribution that noise is drawn from to add noise + to data (deterministic since using fixed random seed) + :return: tuple of x and y data + :rtype: (numpy.ndarray, numpy.ndarray) + """ + x = np.linspace(0, 500, 1000) + y = np.zeros_like(x) + y[np.logical_and(x > 50, x < 150)] = amplitude + y[np.logical_and(x > 300, x < 350)] = amplitude + y[np.logical_and(x > 357, x < 437.5)] = amplitude + prng = np.random.RandomState(0) + y += prng.normal(scale=sd, size=x.shape) + return y + + +class TestStepDetectFunctions(unittest.TestCase): + def setUp(self): + """ + Method that initializes data before running the tests. + In this case we generate signals with known analytical properties + such that it's easier to identify the correctness of the + functions. + """ + self.jump_magnitude = 1000 + self.series1 = step_detect_test_data(self.jump_magnitude, self.jump_magnitude / 10) + self.series1_filtered = gaussian_filter1d(self.series1.flatten(), 2, order=1) + self.series2 = np.array([1704.70739332, 1687.10112437, 1681.68248344, 1678.11859602, + 1683.25987459, 1676.34517265, 1683.75263377, 1670.12514163, + 1694.67019815, 1675.8585799, 1698.97665797, 1681.66520782, + 1700.68003068, 1671.82624614, 1666.37968373, 3211.59480037, + 3303.86229941, 3289.33136964, 3299.01543621, 3286.30224878, + 3292.78710246, 3297.47755896, 2574.27847639, 1665.98144833, + 1682.55691805, 1681.6211562, 1671.7071523, 1670.22268554, + 1656.43500242, 1661.8255372]) + self.series2_filtered = gaussian_filter1d(self.series2.flatten(), 2, order=1) + self.series3 = np.array([1883.87443401, 1888.4688974, 1893.1127929, 1892.13823514, + 1891.87245162, 1882.15734314, 1909.32780929, 1890.95291424, + 1891.69441096, 1895.73540552, 1879.59040827, 2207.4511012, + 3594.28900568, 3539.73771894, 3512.77316641, 3474.76579655, + 3445.91259433, 3449.58999, 3538.07678893, 5071.54845559, + 5023.52196979, 5008.00727191, 4997.53409712, 4973.39290886, + 4944.09166901, 4943.28154164]) + self.series3 = np.tile(self.series3, 2) + self.series3[int(len(self.series3) / 2):] += 4500 + self.series3_filtered = gaussian_filter1d(self.series3.flatten(), 2, order=1) + + def test_find_steps(self): + steps = step_detect.find_steps(self.series1_filtered, 50) + self.assertTrue(len(steps) == 6, "length of jumps for test series1 ({}) != 6".format(len(steps))) + for i, true_ix in enumerate([100, 300, 600, 700, 714, 875]): + self.assertTrue(np.abs(steps[i] - true_ix) < 3, + "jump location {} more than 2 off from truth for test series 1".format(steps[i])) + steps = step_detect.find_steps(self.series2_filtered, 10) + self.assertTrue(len(steps) == 2, "length of jumps for test series2 ({} != 2)".format(len(steps))) + steps = step_detect.find_steps(self.series3_filtered, 10) + self.assertTrue(len(steps) == 5, "incorrect number of jumps for test series 3 ({} != 5)".format(len(steps))) diff --git a/tests/test_step_detect.pyc b/tests/test_step_detect.pyc new file mode 100644 index 0000000..bf19902 Binary files /dev/null and b/tests/test_step_detect.pyc differ