From a3cbea600bb9fbdd97be3f6dc9090f3df23e6643 Mon Sep 17 00:00:00 2001 From: jonas Date: Thu, 27 Feb 2025 15:28:09 +0100 Subject: [PATCH 01/13] :muscle: add FPCS downsampling algorithm --- downsample_rs/Cargo.toml | 8 +- downsample_rs/benches/bench_fpcs.rs | 92 ++++++ downsample_rs/src/fpcs.rs | 496 ++++++++++++++++++++++++++++ downsample_rs/src/lib.rs | 2 + src/lib.rs | 47 +++ tsdownsample/__init__.py | 4 + tsdownsample/downsamplers.py | 30 ++ 7 files changed, 678 insertions(+), 1 deletion(-) create mode 100644 downsample_rs/benches/bench_fpcs.rs create mode 100644 downsample_rs/src/fpcs.rs diff --git a/downsample_rs/Cargo.toml b/downsample_rs/Cargo.toml index 107469b..0b3b117 100644 --- a/downsample_rs/Cargo.toml +++ b/downsample_rs/Cargo.toml @@ -9,7 +9,9 @@ license = "MIT" [dependencies] # TODO: perhaps use polars? argminmax = { version = "0.6.1", features = ["half"] } -half = { version = "2.3.1", default-features = false , features=["num-traits"], optional = true} +half = { version = "2.3.1", default-features = false, features = [ + "num-traits", +], optional = true } num-traits = { version = "0.2.17", default-features = false } once_cell = "1" rayon = { version = "1.8.0", default-features = false } @@ -35,3 +37,7 @@ harness = false [[bench]] name = "bench_minmaxlttb" harness = false + +[[bench]] +name = "bench_fpcs" +harness = false diff --git a/downsample_rs/benches/bench_fpcs.rs b/downsample_rs/benches/bench_fpcs.rs new file mode 100644 index 0000000..6940c87 --- /dev/null +++ b/downsample_rs/benches/bench_fpcs.rs @@ -0,0 +1,92 @@ +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use dev_utils::{config, utils}; +use downsample_rs::fpcs as fpcs_mod; + +fn fpcs_f32_random_array_long(c: &mut Criterion) { + let n = config::ARRAY_LENGTH_LONG; + let y = utils::get_random_array::(n, f32::MIN, f32::MAX); + let x = (0..n).map(|i| i as i32).collect::>(); + + // Non parallel + // --- without x + c.bench_function("fpcs_vanilla_f32", |b| { + b.iter(|| fpcs_mod::vanilla_fpcs_without_x(black_box(y.as_slice()), black_box(2_000))) + }); + c.bench_function("fpcs_mm_f32", |b| { + b.iter(|| fpcs_mod::fpcs_without_x(black_box(y.as_slice()), black_box(2_000))) + }); + // --- with x + c.bench_function("fpcs_mm_f32_x", |b| { + b.iter(|| { + fpcs_mod::fpcs_with_x( + black_box(x.as_slice()), + black_box(y.as_slice()), + black_box(2_000), + ) + }) + }); + + // Parallel + // --- without x + c.bench_function("fpcs_mm_f32_par", |b| { + b.iter(|| fpcs_mod::fpcs_without_x_parallel(black_box(y.as_slice()), black_box(2_000))) + }); + // --- with x + c.bench_function("fpcs_mm_f32_x_par", |b| { + b.iter(|| { + fpcs_mod::fpcs_with_x_parallel( + black_box(x.as_slice()), + black_box(y.as_slice()), + black_box(2_000), + ) + }) + }); +} + +fn fpcs_f32_random_array_50m(c: &mut Criterion) { + let n = 50_000_000; + let y = utils::get_random_array::(n, f32::MIN, f32::MAX); + let x = (0..n).map(|i| i as i32).collect::>(); + + // Non parallel + // --- without x + c.bench_function("fpcs_vanilla_50M_f32", |b| { + b.iter(|| fpcs_mod::vanilla_fpcs_without_x(black_box(y.as_slice()), black_box(2_000))) + }); + c.bench_function("fpcs_mm_50M_f32", |b| { + b.iter(|| fpcs_mod::fpcs_without_x(black_box(y.as_slice()), black_box(2_000))) + }); + // --- with x + c.bench_function("fpcs_mm_50M_f32_x", |b| { + b.iter(|| { + fpcs_mod::fpcs_with_x( + black_box(x.as_slice()), + black_box(y.as_slice()), + black_box(2_000), + ) + }) + }); + // Parallel + // --- without x + c.bench_function("fpcs_mm_50M_f32_par", |b| { + b.iter(|| fpcs_mod::fpcs_without_x_parallel(black_box(y.as_slice()), black_box(2_000))) + }); + // --- with x + c.bench_function("fpcs_mm_50M_f32_x_par", |b| { + b.iter(|| { + fpcs_mod::fpcs_with_x_parallel( + black_box(x.as_slice()), + black_box(y.as_slice()), + black_box(2_000), + ) + }) + }); +} + +criterion_group!( + benches, + // + fpcs_f32_random_array_long, + fpcs_f32_random_array_50m, +); +criterion_main!(benches); diff --git a/downsample_rs/src/fpcs.rs b/downsample_rs/src/fpcs.rs new file mode 100644 index 0000000..0bff663 --- /dev/null +++ b/downsample_rs/src/fpcs.rs @@ -0,0 +1,496 @@ +// use rayon::iter::IndexedParallelIterator; +use rayon::prelude::*; + +use argminmax::{ArgMinMax, NaNArgMinMax}; +use num_traits::{AsPrimitive, FromPrimitive}; + +// use crate::minmax; + +use super::searchsorted::{ + get_equidistant_bin_idx_iterator, get_equidistant_bin_idx_iterator_parallel, +}; +use super::types::Num; +use super::POOL; + +// ----------------------------------- Helper datastructures ------------------------------------ +struct Point { + x: usize, + y: Ty, +} + +impl Point { + fn update(&mut self, x: usize, y: Ty) { + self.x = x; + self.y = y; + } +} + +#[derive(PartialEq, Eq)] +enum Flag { + Min, + Max, + None, +} + +// ----------------------------------- NON-PARALLEL ------------------------------------ +// ----------- WITH X +macro_rules! fpcs_with_x { + ($func_name:ident, $trait:ident, $f_fpcs:expr) => { + pub fn $func_name(x: &[Tx], y: &[Ty], n_out: usize) -> Vec + where + for<'a> &'a [Ty]: $trait, + Tx: Num + AsPrimitive + FromPrimitive, + Ty: Num + AsPrimitive, + { + let bin_idx_iterator = get_equidistant_bin_idx_iterator(x, n_out - 2); + fpcs_generic(y, bin_idx_iterator, n_out, $f_fpcs) + } + }; +} + +fpcs_with_x!(fpcs_with_x, ArgMinMax, |arr| arr.argminmax()); +fpcs_with_x!(fpcs_with_x_nan, NaNArgMinMax, |arr| arr.nanargminmax()); + +// ----------- WITHOUT X +macro_rules! fpcs_without_x { + ($func_name:ident, $trait:path, $f_argminmax:expr) => { + pub fn $func_name(arr: &[T], n_out: usize) -> Vec + where + for<'a> &'a [T]: $trait, + { + fpcs_generic_without_x(arr, n_out, $f_argminmax) + } + }; +} + +fpcs_without_x!(fpcs_without_x, ArgMinMax, |arr| arr.argminmax()); +fpcs_without_x!(fpcs_without_x_nan, NaNArgMinMax, |arr| arr.nanargminmax()); + +// ------------------------------------- PARALLEL -------------------------------------- +// ----------- WITH X +macro_rules! fpcs_with_x_parallel { + ($func_name:ident, $trait:path, $f_argminmax:expr) => { + pub fn $func_name(x: &[Tx], y: &[Ty], n_out: usize) -> Vec + where + for<'a> &'a [Ty]: $trait, + Tx: Num + AsPrimitive + FromPrimitive + Send + Sync, + Ty: Num + AsPrimitive + Send + Sync, + { + // collect the parrallel iterator to a vector + let bin_idx_iterator = get_equidistant_bin_idx_iterator_parallel(x, n_out - 2); + fpcs_generic_parallel(y, bin_idx_iterator, n_out, $f_argminmax) + } + }; +} + +fpcs_with_x_parallel!(fpcs_with_x_parallel, ArgMinMax, |arr| arr.argminmax()); +fpcs_with_x_parallel!(fpcs_with_x_parallel_nan, NaNArgMinMax, |arr| arr + .nanargminmax()); + +// ----------- WITHOUT X +macro_rules! fpcs_without_x_parallel { + ($func_name:ident, $trait:path, $f_argminmax:expr) => { + pub fn $func_name( + arr: &[Ty], + n_out: usize, + ) -> Vec + where + for<'a> &'a [Ty]: $trait, + // Ty: Num + AsPrimitive + Send + Sync, + { + fpcs_generic_without_x_parallel(arr, n_out, $f_argminmax) + } + }; +} + +fpcs_without_x_parallel!(fpcs_without_x_parallel, ArgMinMax, |arr| arr.argminmax()); +fpcs_without_x_parallel!(fpcs_without_x_parallel_nan, NaNArgMinMax, |arr| arr + .nanargminmax()); + +// ----------- WITHOUT X + +// ------------------------------------- GENERICS -------------------------------------- +// move the inner loop to a separate function to reduce code duplication +fn fpcs_inner_loop( + y: &[Ty], + min_idx: usize, + max_idx: usize, + min_point: &mut Point, + max_point: &mut Point, + potential_point: &mut Point, + previous_min_flag: &mut Flag, + sampled_indices: &mut Vec, +) { + // only update when the min/max is more extreme than the current min/max + if y[max_idx] > max_point.y { + max_point.update(max_idx, y[max_idx]); + } + if y[min_idx] < min_point.y { + min_point.update(min_idx, y[min_idx]); + } + + // if the min is to the left of the max + if min_point.x < max_point.x { + // if the min was selected in the previos bin + if *previous_min_flag == Flag::Min && min_point.x != potential_point.x { + sampled_indices.push(potential_point.x); + } + sampled_indices.push(min_point.x); + + potential_point.update(max_point.x, max_point.y); + min_point.update(max_point.x, max_point.y); + *previous_min_flag = Flag::Min; + } else { + if *previous_min_flag == Flag::Max && max_point.x != potential_point.x { + sampled_indices.push(potential_point.x as usize); + } + sampled_indices.push(max_point.x as usize); + + potential_point.update(min_point.x, min_point.y); + max_point.update(min_point.x, min_point.y); + *previous_min_flag = Flag::Max; + } +} + +// ----------- WITH X +#[inline(always)] +pub(crate) fn fpcs_generic( + y: &[T], + bin_idx_iterator: impl Iterator>, + n_out: usize, + f_argminmax: fn(&[T]) -> (usize, usize), +) -> Vec { + let mut sampled_indices: Vec = Vec::with_capacity(n_out * 2); + + let mut previous_min_flag: Flag = Flag::None; + let mut potential_point: Point = Point { x: 0, y: y[0] }; + let mut max_point: Point = Point { x: 0, y: y[0] }; + let mut min_point: Point = Point { x: 0, y: y[0] }; + + bin_idx_iterator.for_each(|bin| { + if let Some((start, end)) = bin { + if end <= start + 2 { + // if the bin has <= 2 elements, we just add them all + for i in start..end { + sampled_indices.push(i); + } + } else { + // if the bin has at least two elements, we find the min and max + let (min_idx, max_idx) = f_argminmax(&y[start..end]); + + fpcs_inner_loop( + y, + min_idx, + max_idx, + &mut min_point, + &mut max_point, + &mut potential_point, + &mut previous_min_flag, + &mut sampled_indices, + ); + } + } + }); + // add the last sample + sampled_indices.push(y.len() as usize - 1); + sampled_indices +} + +#[inline(always)] +pub(crate) fn fpcs_generic_parallel( + arr: &[T], + bin_idx_iterator: impl IndexedParallelIterator>>, + n_out: usize, + f_argminmax: fn(&[T]) -> (usize, usize), +) -> Vec { + // Pre-allocate the vector with the right capacity - this will store min-max + // from each bin to be used in the FPCS algorithm + let mut minmax_idxs: Vec = Vec::with_capacity((n_out - 2) * 2); + + POOL.install(|| { + // Process bins in parallel to find min-max index pairs from each bin + // Each result contains (bin_position, [min_idx, max_idx]) where: + // - bin_position ensures results maintain correct order when collected + // - min_idx is the index of minimum value in the bin + // - max_idx is the index of maximum value in the bin + let results: Vec<(usize, [usize; 2])> = bin_idx_iterator + .enumerate() + .flat_map(|(bin_index, bin_idx_iterator)| { + bin_idx_iterator + .filter_map(|bin| { + bin.and_then(|(start, end)| { + match end - start { + 0 => None, // Empty bin + 1 => { + // Single element - add it to the result + Some((bin_index * 2, [start, start])) + } + 2 => { + // Two elements - determine min and max + let (min_idx, max_idx) = if arr[start] <= arr[start + 1] { + (start, start + 1) + } else { + (start + 1, start) + }; + Some((bin_index * 2, [min_idx, max_idx])) + } + _ => { + // Larger bins - find min and max + let (min_idx, max_idx) = f_argminmax(&arr[start..end]); + Some((bin_index * 2, [min_idx + start, max_idx + start])) + } + } + }) + }) + .collect::>() + }) + .collect(); + + // Only process results if we have any valid bins + if !results.is_empty() { + // Ensure we have enough capacity to avoid reallocations + minmax_idxs.reserve(results.len() * 2); + + // Populate minmax_idxs with alternating min-max indices + // These will be processed in pairs by fpcs_inner_loop + for (_, [min_idx, max_idx]) in results { + minmax_idxs.push(min_idx); + minmax_idxs.push(max_idx); + } + } + }); + + let mut sampled_indices: Vec = Vec::with_capacity(n_out * 2); + + let mut previous_min_flag: Flag = Flag::None; + let mut potential_point: Point = Point { x: 0, y: arr[0] }; + let mut max_point: Point = Point { x: 0, y: arr[0] }; + let mut min_point: Point = Point { x: 0, y: arr[0] }; + + // Prepend first and last point + sampled_indices.push(0); + + // Process the min/max pairs + for chunk in minmax_idxs.chunks(2) { + if chunk.len() == 2 { + let min_idx = chunk[0]; + let max_idx = chunk[1]; + + fpcs_inner_loop( + arr, + min_idx, + max_idx, + &mut min_point, + &mut max_point, + &mut potential_point, + &mut previous_min_flag, + &mut sampled_indices, + ); + } else if chunk.len() == 1 { + // Handle any remaining single element + sampled_indices.push(chunk[0]); + } + } + + // add the last sample + sampled_indices.push(arr.len() as usize - 1); + sampled_indices +} + +// ----------- WITHOUT X +#[inline(always)] +pub(crate) fn fpcs_generic_without_x( + y: &[Ty], + n_out: usize, // handle n_out behavior on a higher level + f_argminmax: fn(&[Ty]) -> (usize, usize), +) -> Vec { + if n_out >= y.len() { + return (0..y.len()).collect::>(); + } + assert!(n_out >= 3); // avoid division by 0 + // let f_argminmax = y.argminmax(); + + let block_size: f64 = y.len() as f64 / (n_out - 2) as f64; + + let mut sampled_indices: Vec = Vec::with_capacity(n_out * 2); + + let mut previous_min_flag: Flag = Flag::None; + let mut potential_point: Point = Point { x: 0, y: y[0] }; + let mut max_point: Point = Point { x: 0, y: y[0] }; + let mut min_point: Point = Point { x: 0, y: y[0] }; + + let mut lower: usize = 1; + let mut upper: usize = (block_size as usize) + 1; + while upper < y.len() { + let (mut min_idx, mut max_idx) = f_argminmax(&y[lower..upper]); + min_idx += lower; + max_idx += lower; + + fpcs_inner_loop( + y, + min_idx, + max_idx, + &mut min_point, + &mut max_point, + &mut potential_point, + &mut previous_min_flag, + &mut sampled_indices, + ); + + lower = upper; + upper = (upper as f64 + block_size) as usize; + } + + // add the last sample + sampled_indices.push(y.len() as usize - 1); + sampled_indices +} + +#[inline(always)] +pub(crate) fn fpcs_generic_without_x_parallel( + arr: &[T], + n_out: usize, + f_argminmax: fn(&[T]) -> (usize, usize), +) -> Vec { + // --------- 1. parallelize the min-max search + // arr.len() -1 is used to match the delta of a range-index (0..arr.len()-1) + let block_size: f64 = (arr.len() - 1) as f64 / (n_out - 2) as f64; + + // let mut minmax_idxs: Vec = Vec::with_capacity((n_out - 2) * 2); + let mut minmax_idxs: Vec = vec![0; (n_out - 2) * 2]; + + POOL.install(|| { + minmax_idxs + .par_chunks_exact_mut(2) + .for_each(|min_max_index_chunk| { + let i: f64 = unsafe { *min_max_index_chunk.get_unchecked(0) >> 1 } as f64; + let start_idx: usize = (block_size * i) as usize + (i != 0.0) as usize; + let end_idx: usize = (block_size * (i + 1.0)) as usize + 1; + + let (min_index, max_index) = f_argminmax(&arr[start_idx..end_idx]); + + // Add the indexes in min-max order + min_max_index_chunk[0] = min_index + start_idx; + min_max_index_chunk[1] = max_index + start_idx; + }) + }); + + let mut sampled_indices: Vec = Vec::with_capacity(n_out * 2); + + let mut previous_min_flag: Flag = Flag::None; + let mut potential_point: Point = Point { x: 0, y: arr[0] }; + let mut max_point: Point = Point { x: 0, y: arr[0] }; + let mut min_point: Point = Point { x: 0, y: arr[0] }; + + // Prepend first and last point + sampled_indices.push(0); + + // Process the min/max pairs + for chunk in minmax_idxs.chunks(2) { + if chunk.len() == 2 { + let min_idx = chunk[0]; + let max_idx = chunk[1]; + + fpcs_inner_loop( + arr, + min_idx, + max_idx, + &mut min_point, + &mut max_point, + &mut potential_point, + &mut previous_min_flag, + &mut sampled_indices, + ); + } else if chunk.len() == 1 { + // Handle any remaining single element + sampled_indices.push(chunk[0]); + } + } + + // add the last sample + sampled_indices.push(arr.len() as usize - 1); + sampled_indices +} + +pub fn vanilla_fpcs_without_x( + y: &[Ty], + n_out: usize, // NOTE: there will be between [n_out, 2*n_out] output points +) -> Vec { + if n_out >= y.len() { + return (0..y.len()).collect::>(); + } + assert!(n_out >= 3); // avoid division by 0 + + // TODO -> we don't know the size of the output vector + // TODO -> make this more concise in a future version + let mut retain_idxs: Vec = Vec::with_capacity(n_out * 2); + + let mut meta_counter: u32 = 1; + // TODO -> check + let threshold: f64 = y.len() as f64 / (n_out - 2) as f64; + let mut previous_min_flag: Flag = Flag::None; + let mut potential_point: Point = Point { x: 0, y: y[0] }; + let mut max_point: Point = Point { x: 0, y: y[0] }; + let mut min_point: Point = Point { x: 0, y: y[0] }; + + for idx in 1..y.len() { + let val = y[idx]; + + if val > max_point.y { + max_point.update(idx, val); + } + if val < min_point.y { + min_point.update(idx, val); + } + + if (idx > ((meta_counter as f64) * threshold) as usize) || (idx == y.len() - 1) { + // edge case -> previous flag is None + meta_counter += 1; + + // if the min is to the left of the max + if min_point.x < max_point.x { + // if the min was selected in the previos bin + if previous_min_flag == Flag::Min && min_point.x != potential_point.x { + retain_idxs.push(potential_point.x as usize); + } + retain_idxs.push(min_point.x as usize); + + potential_point.update(max_point.x, max_point.y); + min_point.update(max_point.x, max_point.y); + previous_min_flag = Flag::Min; + } else { + if previous_min_flag == Flag::Max && max_point.x != potential_point.x { + retain_idxs.push(potential_point.x as usize); + } + retain_idxs.push(max_point.x as usize); + + potential_point.update(min_point.x, min_point.y); + max_point.update(min_point.x, min_point.y); + previous_min_flag = Flag::Max; + } + } + } + + // add the last sample + retain_idxs.push(y.len() as usize - 1); + return retain_idxs; +} + +// --------------------------------------- TESTS --------------------------------------- +#[cfg(test)] +mod tests { + use dev_utils::utils; + + use super::fpcs_without_x; + + #[test] + fn test_fpcs_with_x() { + let x = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + let y = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]; + let n_out = 3; + // let sampled_indices = fpcs_without_x(&x, &y, n_out); + // TODO -> fix after we have a library + // assert eq(sampled_indices, vec![0, 9]) + // utils::print_vec(&res); + } +} diff --git a/downsample_rs/src/lib.rs b/downsample_rs/src/lib.rs index 409294d..1255bd1 100644 --- a/downsample_rs/src/lib.rs +++ b/downsample_rs/src/lib.rs @@ -11,6 +11,8 @@ pub mod minmaxlttb; pub use minmaxlttb::*; pub mod m4; pub use m4::*; +pub mod fpcs; +pub use fpcs::*; pub(crate) mod helpers; pub(crate) mod searchsorted; pub(crate) mod types; diff --git a/src/lib.rs b/src/lib.rs index 9cd30c2..a84fe06 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -423,6 +423,52 @@ fn minmaxlttb(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { Ok(()) } +// --------------------------------------- FPCS --------------------------------------- + +use downsample_rs::fpcs as fpcs_mod; + +// Create a sub module for the FPCS algorithm +#[pymodule] +fn fpcs(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { + // ----------------- SEQUENTIAL + + let sequential_mod = PyModule::new_bound(_py, "sequential")?; + + // ----- WITHOUT X + { + create_pyfuncs_without_x!(fpcs_mod, fpcs_without_x, sequential_mod); + create_pyfuncs_without_x!(@nan fpcs_mod, fpcs_without_x_nan, sequential_mod); + } + + // ----- WITH X + { + create_pyfuncs_with_x!(fpcs_mod, fpcs_with_x, sequential_mod); + create_pyfuncs_with_x!(@nan fpcs_mod, fpcs_with_x_nan, sequential_mod); + } + + // ----------------- PARALLEL + + let parallel_mod = PyModule::new_bound(_py, "parallel")?; + + // ----- WITHOUT X + { + create_pyfuncs_without_x!(fpcs_mod, fpcs_without_x_parallel, parallel_mod); + create_pyfuncs_without_x!(@nan fpcs_mod, fpcs_without_x_parallel_nan, parallel_mod); + } + + // ----- WITH X + { + create_pyfuncs_with_x!(fpcs_mod, fpcs_with_x_parallel, parallel_mod); + create_pyfuncs_with_x!(@nan fpcs_mod, fpcs_with_x_parallel_nan, parallel_mod); + } + + // Add the sub modules to the module + m.add_submodule(&sequential_mod)?; + m.add_submodule(¶llel_mod)?; + + Ok(()) +} + // ------------------------------- DOWNSAMPLING MODULE ------------------------------ // #[pymodule] // The super module @@ -432,6 +478,7 @@ fn tsdownsample(_py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_wrapped(wrap_pymodule!(m4))?; m.add_wrapped(wrap_pymodule!(lttb))?; m.add_wrapped(wrap_pymodule!(minmaxlttb))?; + m.add_wrapped(wrap_pymodule!(fpcs))?; Ok(()) } diff --git a/tsdownsample/__init__.py b/tsdownsample/__init__.py index bfed60a..73a17c2 100644 --- a/tsdownsample/__init__.py +++ b/tsdownsample/__init__.py @@ -2,10 +2,12 @@ from .downsamplers import ( EveryNthDownsampler, + FPCSDownsampler, LTTBDownsampler, M4Downsampler, MinMaxDownsampler, MinMaxLTTBDownsampler, + NaNFPCSDownsampler, NaNM4Downsampler, NaNMinMaxDownsampler, NaNMinMaxLTTBDownsampler, @@ -16,10 +18,12 @@ __all__ = [ "EveryNthDownsampler", + "FPCSDownsampler", "MinMaxDownsampler", "M4Downsampler", "LTTBDownsampler", "MinMaxLTTBDownsampler", + "NaNFPCSDownsampler", "NaNMinMaxDownsampler", "NaNM4Downsampler", "NaNMinMaxLTTBDownsampler", diff --git a/tsdownsample/downsamplers.py b/tsdownsample/downsamplers.py index 93519b7..4c31db0 100644 --- a/tsdownsample/downsamplers.py +++ b/tsdownsample/downsamplers.py @@ -136,6 +136,36 @@ def downsample( ) +class FPCSDownsampler(AbstractRustDownsampler): + """Downsampler that uses the Feature Preserving Compensated Sampling (FPCS) + algorithm. + + FPCS paper: https://doi.org/10.1109/TVCG.2024.3456375 + """ + + @property + def rust_mod(self): + return _tsdownsample_rs.fpcs + + def downsample(self, *args, n_out: int, parallel: bool = False, **_): + return super().downsample(*args, n_out=n_out, parallel=parallel) + + +class NaNFPCSDownsampler(AbstractRustNaNDownsampler): + """Downsampler that uses the Feature Preserving Compensated Sampling (FPCS) + algorithm. If the y data contains NaNs, the indices of these NaNs are returned. + + FPCS paper: https://doi.org/10.1109/TVCG.2024.3456375 + """ + + @property + def rust_mod(self): + return _tsdownsample_rs.fpcs + + def downsample(self, *args, n_out: int, parallel: bool = False, **_): + return super().downsample(*args, n_out=n_out, parallel=parallel) + + # ------------------ EveryNth Downsampler ------------------ From f8663355c88ee4bdd810cd02feb443f6d72d1dd4 Mon Sep 17 00:00:00 2001 From: jonas Date: Mon, 3 Mar 2025 11:32:19 +0100 Subject: [PATCH 02/13] leverage minmax downsamplers and reduce code duplication --- downsample_rs/src/fpcs.rs | 433 ++++++++++++-------------------------- 1 file changed, 134 insertions(+), 299 deletions(-) diff --git a/downsample_rs/src/fpcs.rs b/downsample_rs/src/fpcs.rs index 0bff663..742fbc6 100644 --- a/downsample_rs/src/fpcs.rs +++ b/downsample_rs/src/fpcs.rs @@ -1,18 +1,9 @@ -// use rayon::iter::IndexedParallelIterator; -use rayon::prelude::*; - +use super::minmax; +use super::types::Num; use argminmax::{ArgMinMax, NaNArgMinMax}; use num_traits::{AsPrimitive, FromPrimitive}; -// use crate::minmax; - -use super::searchsorted::{ - get_equidistant_bin_idx_iterator, get_equidistant_bin_idx_iterator_parallel, -}; -use super::types::Num; -use super::POOL; - -// ----------------------------------- Helper datastructures ------------------------------------ +// ------------------------------- Helper datastructures ------------------------------- struct Point { x: usize, y: Ty, @@ -32,85 +23,8 @@ enum Flag { None, } -// ----------------------------------- NON-PARALLEL ------------------------------------ -// ----------- WITH X -macro_rules! fpcs_with_x { - ($func_name:ident, $trait:ident, $f_fpcs:expr) => { - pub fn $func_name(x: &[Tx], y: &[Ty], n_out: usize) -> Vec - where - for<'a> &'a [Ty]: $trait, - Tx: Num + AsPrimitive + FromPrimitive, - Ty: Num + AsPrimitive, - { - let bin_idx_iterator = get_equidistant_bin_idx_iterator(x, n_out - 2); - fpcs_generic(y, bin_idx_iterator, n_out, $f_fpcs) - } - }; -} - -fpcs_with_x!(fpcs_with_x, ArgMinMax, |arr| arr.argminmax()); -fpcs_with_x!(fpcs_with_x_nan, NaNArgMinMax, |arr| arr.nanargminmax()); - -// ----------- WITHOUT X -macro_rules! fpcs_without_x { - ($func_name:ident, $trait:path, $f_argminmax:expr) => { - pub fn $func_name(arr: &[T], n_out: usize) -> Vec - where - for<'a> &'a [T]: $trait, - { - fpcs_generic_without_x(arr, n_out, $f_argminmax) - } - }; -} - -fpcs_without_x!(fpcs_without_x, ArgMinMax, |arr| arr.argminmax()); -fpcs_without_x!(fpcs_without_x_nan, NaNArgMinMax, |arr| arr.nanargminmax()); - -// ------------------------------------- PARALLEL -------------------------------------- -// ----------- WITH X -macro_rules! fpcs_with_x_parallel { - ($func_name:ident, $trait:path, $f_argminmax:expr) => { - pub fn $func_name(x: &[Tx], y: &[Ty], n_out: usize) -> Vec - where - for<'a> &'a [Ty]: $trait, - Tx: Num + AsPrimitive + FromPrimitive + Send + Sync, - Ty: Num + AsPrimitive + Send + Sync, - { - // collect the parrallel iterator to a vector - let bin_idx_iterator = get_equidistant_bin_idx_iterator_parallel(x, n_out - 2); - fpcs_generic_parallel(y, bin_idx_iterator, n_out, $f_argminmax) - } - }; -} - -fpcs_with_x_parallel!(fpcs_with_x_parallel, ArgMinMax, |arr| arr.argminmax()); -fpcs_with_x_parallel!(fpcs_with_x_parallel_nan, NaNArgMinMax, |arr| arr - .nanargminmax()); - -// ----------- WITHOUT X -macro_rules! fpcs_without_x_parallel { - ($func_name:ident, $trait:path, $f_argminmax:expr) => { - pub fn $func_name( - arr: &[Ty], - n_out: usize, - ) -> Vec - where - for<'a> &'a [Ty]: $trait, - // Ty: Num + AsPrimitive + Send + Sync, - { - fpcs_generic_without_x_parallel(arr, n_out, $f_argminmax) - } - }; -} - -fpcs_without_x_parallel!(fpcs_without_x_parallel, ArgMinMax, |arr| arr.argminmax()); -fpcs_without_x_parallel!(fpcs_without_x_parallel_nan, NaNArgMinMax, |arr| arr - .nanargminmax()); - -// ----------- WITHOUT X - -// ------------------------------------- GENERICS -------------------------------------- -// move the inner loop to a separate function to reduce code duplication +// --------------------------------- helper functions ---------------------------------- +#[inline(always)] fn fpcs_inner_loop( y: &[Ty], min_idx: usize, @@ -152,129 +66,33 @@ fn fpcs_inner_loop( } } -// ----------- WITH X #[inline(always)] -pub(crate) fn fpcs_generic( - y: &[T], - bin_idx_iterator: impl Iterator>, +fn fpcs_outer_loop( + arr: &[Ty], + minmax_idxs: Vec, n_out: usize, - f_argminmax: fn(&[T]) -> (usize, usize), ) -> Vec { - let mut sampled_indices: Vec = Vec::with_capacity(n_out * 2); - let mut previous_min_flag: Flag = Flag::None; - let mut potential_point: Point = Point { x: 0, y: y[0] }; - let mut max_point: Point = Point { x: 0, y: y[0] }; - let mut min_point: Point = Point { x: 0, y: y[0] }; - - bin_idx_iterator.for_each(|bin| { - if let Some((start, end)) = bin { - if end <= start + 2 { - // if the bin has <= 2 elements, we just add them all - for i in start..end { - sampled_indices.push(i); - } - } else { - // if the bin has at least two elements, we find the min and max - let (min_idx, max_idx) = f_argminmax(&y[start..end]); - - fpcs_inner_loop( - y, - min_idx, - max_idx, - &mut min_point, - &mut max_point, - &mut potential_point, - &mut previous_min_flag, - &mut sampled_indices, - ); - } - } - }); - // add the last sample - sampled_indices.push(y.len() as usize - 1); - sampled_indices -} - -#[inline(always)] -pub(crate) fn fpcs_generic_parallel( - arr: &[T], - bin_idx_iterator: impl IndexedParallelIterator>>, - n_out: usize, - f_argminmax: fn(&[T]) -> (usize, usize), -) -> Vec { - // Pre-allocate the vector with the right capacity - this will store min-max - // from each bin to be used in the FPCS algorithm - let mut minmax_idxs: Vec = Vec::with_capacity((n_out - 2) * 2); - - POOL.install(|| { - // Process bins in parallel to find min-max index pairs from each bin - // Each result contains (bin_position, [min_idx, max_idx]) where: - // - bin_position ensures results maintain correct order when collected - // - min_idx is the index of minimum value in the bin - // - max_idx is the index of maximum value in the bin - let results: Vec<(usize, [usize; 2])> = bin_idx_iterator - .enumerate() - .flat_map(|(bin_index, bin_idx_iterator)| { - bin_idx_iterator - .filter_map(|bin| { - bin.and_then(|(start, end)| { - match end - start { - 0 => None, // Empty bin - 1 => { - // Single element - add it to the result - Some((bin_index * 2, [start, start])) - } - 2 => { - // Two elements - determine min and max - let (min_idx, max_idx) = if arr[start] <= arr[start + 1] { - (start, start + 1) - } else { - (start + 1, start) - }; - Some((bin_index * 2, [min_idx, max_idx])) - } - _ => { - // Larger bins - find min and max - let (min_idx, max_idx) = f_argminmax(&arr[start..end]); - Some((bin_index * 2, [min_idx + start, max_idx + start])) - } - } - }) - }) - .collect::>() - }) - .collect(); - - // Only process results if we have any valid bins - if !results.is_empty() { - // Ensure we have enough capacity to avoid reallocations - minmax_idxs.reserve(results.len() * 2); - - // Populate minmax_idxs with alternating min-max indices - // These will be processed in pairs by fpcs_inner_loop - for (_, [min_idx, max_idx]) in results { - minmax_idxs.push(min_idx); - minmax_idxs.push(max_idx); - } - } - }); + let mut potential_point: Point = Point { x: 0, y: arr[0] }; + let mut max_point: Point = Point { x: 0, y: arr[0] }; + let mut min_point: Point = Point { x: 0, y: arr[0] }; let mut sampled_indices: Vec = Vec::with_capacity(n_out * 2); - - let mut previous_min_flag: Flag = Flag::None; - let mut potential_point: Point = Point { x: 0, y: arr[0] }; - let mut max_point: Point = Point { x: 0, y: arr[0] }; - let mut min_point: Point = Point { x: 0, y: arr[0] }; - - // Prepend first and last point + // Prepend first point sampled_indices.push(0); // Process the min/max pairs for chunk in minmax_idxs.chunks(2) { if chunk.len() == 2 { - let min_idx = chunk[0]; - let max_idx = chunk[1]; + let mut min_idx = chunk[0]; + let mut max_idx = chunk[1]; + + // NOTE: the minmax_idxs are not ordered!! + // So we need to check if the min is actually the min + if arr[min_idx] >= arr[max_idx] { + min_idx = chunk[1]; + max_idx = chunk[0]; + } fpcs_inner_loop( arr, @@ -297,121 +115,138 @@ pub(crate) fn fpcs_generic_parallel( sampled_indices } -// ----------- WITHOUT X -#[inline(always)] -pub(crate) fn fpcs_generic_without_x( - y: &[Ty], - n_out: usize, // handle n_out behavior on a higher level - f_argminmax: fn(&[Ty]) -> (usize, usize), -) -> Vec { - if n_out >= y.len() { - return (0..y.len()).collect::>(); - } - assert!(n_out >= 3); // avoid division by 0 - // let f_argminmax = y.argminmax(); +// -------------------------------------- MACROs --------------------------------------- +// ----------------------------------- NON-PARALLEL ------------------------------------ - let block_size: f64 = y.len() as f64 / (n_out - 2) as f64; +// ----------- WITH X - let mut sampled_indices: Vec = Vec::with_capacity(n_out * 2); +macro_rules! fpcs_with_x { + ($func_name:ident, $trait:ident, $f_minmax:expr) => { + pub fn $func_name(x: &[Tx], y: &[Ty], n_out: usize) -> Vec + where + for<'a> &'a [Ty]: $trait, + Tx: Num + FromPrimitive + AsPrimitive, + Ty: Copy + PartialOrd, + { + fpcs_generic(x, y, n_out, $f_minmax) + } + }; +} - let mut previous_min_flag: Flag = Flag::None; - let mut potential_point: Point = Point { x: 0, y: y[0] }; - let mut max_point: Point = Point { x: 0, y: y[0] }; - let mut min_point: Point = Point { x: 0, y: y[0] }; +fpcs_with_x!(fpcs_with_x, ArgMinMax, minmax::min_max_with_x); +fpcs_with_x!(fpcs_with_x_nan, NaNArgMinMax, minmax::min_max_with_x_nan); - let mut lower: usize = 1; - let mut upper: usize = (block_size as usize) + 1; - while upper < y.len() { - let (mut min_idx, mut max_idx) = f_argminmax(&y[lower..upper]); - min_idx += lower; - max_idx += lower; - - fpcs_inner_loop( - y, - min_idx, - max_idx, - &mut min_point, - &mut max_point, - &mut potential_point, - &mut previous_min_flag, - &mut sampled_indices, - ); - - lower = upper; - upper = (upper as f64 + block_size) as usize; - } +// ----------- WITHOUT X - // add the last sample - sampled_indices.push(y.len() as usize - 1); - sampled_indices +macro_rules! fpcs_without_x { + ($func_name:ident, $trait:path, $f_minmax:expr) => { + pub fn $func_name(arr: &[T], n_out: usize) -> Vec + where + for<'a> &'a [T]: $trait, + { + fpcs_generic_without_x(arr, n_out, $f_minmax) + } + }; } -#[inline(always)] -pub(crate) fn fpcs_generic_without_x_parallel( - arr: &[T], - n_out: usize, - f_argminmax: fn(&[T]) -> (usize, usize), -) -> Vec { - // --------- 1. parallelize the min-max search - // arr.len() -1 is used to match the delta of a range-index (0..arr.len()-1) - let block_size: f64 = (arr.len() - 1) as f64 / (n_out - 2) as f64; +fpcs_without_x!(fpcs_without_x, ArgMinMax, minmax::min_max_without_x); +fpcs_without_x!( + fpcs_without_x_nan, + NaNArgMinMax, + minmax::min_max_without_x_nan +); - // let mut minmax_idxs: Vec = Vec::with_capacity((n_out - 2) * 2); - let mut minmax_idxs: Vec = vec![0; (n_out - 2) * 2]; +// ------------------------------------- PARALLEL -------------------------------------- - POOL.install(|| { - minmax_idxs - .par_chunks_exact_mut(2) - .for_each(|min_max_index_chunk| { - let i: f64 = unsafe { *min_max_index_chunk.get_unchecked(0) >> 1 } as f64; - let start_idx: usize = (block_size * i) as usize + (i != 0.0) as usize; - let end_idx: usize = (block_size * (i + 1.0)) as usize + 1; +// ----------- WITH X - let (min_index, max_index) = f_argminmax(&arr[start_idx..end_idx]); +macro_rules! fpcs_with_x_parallel { + ($func_name:ident, $trait:path, $f_argminmax:expr) => { + pub fn $func_name(x: &[Tx], y: &[Ty], n_out: usize) -> Vec + where + for<'a> &'a [Ty]: $trait, + Tx: Num + FromPrimitive + AsPrimitive + Send + Sync, + Ty: Num + Copy + PartialOrd + Send + Sync, + { + // collect the parrallel iterator to a vector + fpcs_generic(x, y, n_out, $f_argminmax) + } + }; +} - // Add the indexes in min-max order - min_max_index_chunk[0] = min_index + start_idx; - min_max_index_chunk[1] = max_index + start_idx; - }) - }); +fpcs_with_x_parallel!( + fpcs_with_x_parallel, + ArgMinMax, + minmax::min_max_with_x_parallel +); +fpcs_with_x_parallel!( + fpcs_with_x_parallel_nan, + NaNArgMinMax, + minmax::min_max_with_x_parallel_nan +); - let mut sampled_indices: Vec = Vec::with_capacity(n_out * 2); +// ----------- WITHOUT X - let mut previous_min_flag: Flag = Flag::None; - let mut potential_point: Point = Point { x: 0, y: arr[0] }; - let mut max_point: Point = Point { x: 0, y: arr[0] }; - let mut min_point: Point = Point { x: 0, y: arr[0] }; +macro_rules! fpcs_without_x_parallel { + ($func_name:ident, $trait:path, $f_argminmax:expr) => { + pub fn $func_name( + arr: &[Ty], + n_out: usize, + ) -> Vec + where + for<'a> &'a [Ty]: $trait, + // Ty: Num + AsPrimitive + Send + Sync, + { + fpcs_generic_without_x(arr, n_out, $f_argminmax) + } + }; +} - // Prepend first and last point - sampled_indices.push(0); +fpcs_without_x_parallel!( + fpcs_without_x_parallel, + ArgMinMax, + minmax::min_max_without_x_parallel +); +fpcs_without_x_parallel!( + fpcs_without_x_parallel_nan, + NaNArgMinMax, + minmax::min_max_without_x_parallel_nan +); - // Process the min/max pairs - for chunk in minmax_idxs.chunks(2) { - if chunk.len() == 2 { - let min_idx = chunk[0]; - let max_idx = chunk[1]; +// ------------------------------------- GENERICS -------------------------------------- - fpcs_inner_loop( - arr, - min_idx, - max_idx, - &mut min_point, - &mut max_point, - &mut potential_point, - &mut previous_min_flag, - &mut sampled_indices, - ); - } else if chunk.len() == 1 { - // Handle any remaining single element - sampled_indices.push(chunk[0]); - } - } +// ----------- WITH X - // add the last sample - sampled_indices.push(arr.len() as usize - 1); - sampled_indices +#[inline(always)] +pub(crate) fn fpcs_generic, Ty: PartialOrd + Copy>( + x: &[Tx], + y: &[Ty], + n_out: usize, + f_minmax: fn(&[Tx], &[Ty], usize) -> Vec, +) -> Vec { + assert_eq!(x.len(), y.len()); + let mut minmax_idxs = f_minmax(&x[1..(x.len() - 1)], &y[1..(x.len() - 1)], n_out * 2); + minmax_idxs.iter_mut().for_each(|elem| *elem += 1); // inplace + 1 + return fpcs_outer_loop(y, minmax_idxs, n_out); } +// ----------- WITHOUT X + +#[inline(always)] +pub(crate) fn fpcs_generic_without_x( + arr: &[T], + n_out: usize, + f_minmax: fn(&[T], usize) -> Vec, +) -> Vec { + let mut minmax_idxs: Vec = f_minmax(&arr[1..(arr.len() - 1)], n_out * 2); + minmax_idxs.iter_mut().for_each(|elem| *elem += 1); // inplace + 1 + return fpcs_outer_loop(arr, minmax_idxs, n_out); +} + +// - +// ------------------------------------- LEGACY -------------------------------------- +// - + pub fn vanilla_fpcs_without_x( y: &[Ty], n_out: usize, // NOTE: there will be between [n_out, 2*n_out] output points From 90c18e5e6253014b3f0ec08080a4facc5c1d3d64 Mon Sep 17 00:00:00 2001 From: jonas Date: Mon, 3 Mar 2025 11:32:40 +0100 Subject: [PATCH 03/13] :mag: improve clean target in Makefile to exclude virtual environment files --- Makefile | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/Makefile b/Makefile index 1c83953..29625ff 100644 --- a/Makefile +++ b/Makefile @@ -49,11 +49,12 @@ all: lint mypy test .PHONY: clean clean: - rm -rf `find . -name __pycache__` - rm -f `find . -type f -name '*.py[co]' ` - rm -f `find . -type f -name '*~' ` - rm -f `find . -type f -name '.*~' ` - rm -f `find . -type f -name '*.cpython-*' ` + # TODO -> this kinda wipes a lot of stuff in the python environment, maybe we should be more specific + rm -rf `find . -name __pycache__ -not -path "./.venv/*"` + rm -f `find . -type f -name '*.py[co]' -not -path "./.venv/*"` + rm -f `find . -type f -name '*~' -not -path "./.venv/*"` + rm -f `find . -type f -name '.*~' -not -path "./.venv/*"` + rm -f `find . -type f -name '*.cpython-*' -not -path "./.venv/*"` rm -rf dist rm -rf build rm -rf target From fbf546c98a3e10548fddf8b8d04c4378822d7d62 Mon Sep 17 00:00:00 2001 From: jonas Date: Mon, 3 Mar 2025 14:49:35 +0100 Subject: [PATCH 04/13] :goat: adding first tests --- tests/test_algos_python_compliance.py | 3 ++ tsdownsample/_python/downsamplers.py | 76 ++++++++++++++++++++++++++- 2 files changed, 78 insertions(+), 1 deletion(-) diff --git a/tests/test_algos_python_compliance.py b/tests/test_algos_python_compliance.py index 8a64163..0cd08f2 100644 --- a/tests/test_algos_python_compliance.py +++ b/tests/test_algos_python_compliance.py @@ -2,6 +2,7 @@ import pytest from tsdownsample import ( + FPCSDownsampler, # TODO -> include MinMaxLTTB? LTTBDownsampler, M4Downsampler, MinMaxDownsampler, @@ -9,6 +10,7 @@ NaNMinMaxDownsampler, ) from tsdownsample._python.downsamplers import ( + FPCS_py, LTTB_py, M4_py, MinMax_py, @@ -23,6 +25,7 @@ (MinMaxDownsampler(), MinMax_py()), (M4Downsampler(), M4_py()), (LTTBDownsampler(), LTTB_py()), + (FPCSDownsampler(), FPCS_py()), # Include NaN downsamplers (NaNMinMaxDownsampler(), NaNMinMax_py()), (NaNM4Downsampler(), NaNM4_py()), diff --git a/tsdownsample/_python/downsamplers.py b/tsdownsample/_python/downsamplers.py index 5343f9f..1d711c6 100644 --- a/tsdownsample/_python/downsamplers.py +++ b/tsdownsample/_python/downsamplers.py @@ -1,4 +1,5 @@ -from typing import Union +from enum import Enum +from typing import NamedTuple, Union import numpy as np @@ -255,3 +256,76 @@ def _downsample( # NOTE: we do not use the np.unique so that all indices are retained return np.array(sorted(rel_idxs)) + + +class FPCS_py(AbstractDownsampler): + def _downsample( + self, x: Union[np.ndarray, None], y: np.ndarray, n_out: int, **kwargs + ) -> np.ndarray: + # fmt: off + # ------------------------- Helper datastructures ------------------------- + class Flag(Enum): + NONE = -1 # -1: no data points have been retained + MAX = 0 # 0: a max has been retained + MIN = 1 # 1: a min has been retained + + Point = NamedTuple("point", [("x", int), ("val", y.dtype)]) + # ------------------------------------------------------------------------ + + # 0. Downsample the data using the MinMax algorithm + MINMAX_FACTOR = 2 + n_1 = len(x) - 1 + # NOTE: as we include the first and last point, we reduce the number of points + downsampled_idxs = MinMax_py().downsample( + x[1:n_1], y[1:n_1], n_out=(n_out - 2) * MINMAX_FACTOR + ) + downsampled_idxs += 1 + + previous_min_flag: Flag = Flag.NONE + potential_point = Point(0, 0) + max_point = Point(0, y[0]) + min_point = Point(0, y[0]) + + sampled_indices = [] + sampled_indices.append(0) # prepend the first point + for i in range(0, len(downsampled_idxs), 2): + # get the min and max indices and convert them to the correct order + min_idx, max_idxs = downsampled_idxs[i], downsampled_idxs[i + 1] + if y[min_idx] > y[max_idxs]: + min_idx, max_idxs = max_idxs, min_idx + bin_min = Point(min_idx, y[min_idx]) + bin_max = Point(max_idxs, y[max_idxs]) + + # update the max and min points based on the extrema of the current bin + if max_point.val < bin_max.val: + max_point = bin_max + if min_point.val > bin_min.val: + min_point = bin_min + + # if the min is to the left of the max + if min_point.x < max_point.x: + # if the min was not selected in the previous bin + if previous_min_flag == Flag.MIN and min_point.x != potential_point.x: + # Both adjacent samplings retain MinPoint, and PotentialPoint and + # MinPoint are not the same point + sampled_indices.append(potential_point.x) + + sampled_indices.append(min_point.x) # receiving min_point b4 max_point -> retain min_point + potential_point = (max_point) # update potential point to unselected max_point + min_point = max_point # update min_point to unselected max_point + previous_min_flag = Flag.MIN # min_point has been selected + + else: + if previous_min_flag == Flag.MAX and max_point.x != potential_point.x: + # # Both adjacent samplings retain MaxPoint, and PotentialPoint and + # MaxPoint are not the same point + sampled_indices.append(potential_point.x) + + sampled_indices.append(max_point.x) # receiving max_point b4 min_point -> retain max_point + potential_point = (min_point) # update potential point to unselected min_point + max_point = min_point # update max_point to unselected min_point + previous_min_flag = Flag.MAX # max_point has been selected + + sampled_indices.append(len(y) - 1) # append the last point + # fmt: on + return np.array(sampled_indices, dtype=np.int64) From 73aea169026a684c7a3bb56a6f3bfbfdf50fb922 Mon Sep 17 00:00:00 2001 From: jonas Date: Mon, 3 Mar 2025 16:38:45 +0100 Subject: [PATCH 05/13] :mag: fix correct minmax index calculations in FPCS algorithm --- downsample_rs/src/fpcs.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/downsample_rs/src/fpcs.rs b/downsample_rs/src/fpcs.rs index 742fbc6..e8f8436 100644 --- a/downsample_rs/src/fpcs.rs +++ b/downsample_rs/src/fpcs.rs @@ -89,7 +89,7 @@ fn fpcs_outer_loop( // NOTE: the minmax_idxs are not ordered!! // So we need to check if the min is actually the min - if arr[min_idx] >= arr[max_idx] { + if arr[min_idx] > arr[max_idx] { min_idx = chunk[1]; max_idx = chunk[0]; } @@ -225,7 +225,7 @@ pub(crate) fn fpcs_generic, Ty: PartialOrd + Copy>( f_minmax: fn(&[Tx], &[Ty], usize) -> Vec, ) -> Vec { assert_eq!(x.len(), y.len()); - let mut minmax_idxs = f_minmax(&x[1..(x.len() - 1)], &y[1..(x.len() - 1)], n_out * 2); + let mut minmax_idxs = f_minmax(&x[1..(x.len() - 1)], &y[1..(x.len() - 1)], (n_out - 2) * 2); minmax_idxs.iter_mut().for_each(|elem| *elem += 1); // inplace + 1 return fpcs_outer_loop(y, minmax_idxs, n_out); } @@ -238,7 +238,7 @@ pub(crate) fn fpcs_generic_without_x( n_out: usize, f_minmax: fn(&[T], usize) -> Vec, ) -> Vec { - let mut minmax_idxs: Vec = f_minmax(&arr[1..(arr.len() - 1)], n_out * 2); + let mut minmax_idxs: Vec = f_minmax(&arr[1..(arr.len() - 1)], (n_out - 2) * 2); minmax_idxs.iter_mut().for_each(|elem| *elem += 1); // inplace + 1 return fpcs_outer_loop(arr, minmax_idxs, n_out); } From 73208b24354bd069196533c7935961f64384533e Mon Sep 17 00:00:00 2001 From: jonas Date: Mon, 3 Mar 2025 16:45:18 +0100 Subject: [PATCH 06/13] :construction: WiP include minmaxlttb tests --- tests/test_algos_python_compliance.py | 18 ++++++- tsdownsample/_python/downsamplers.py | 78 ++++++++++++++++++++++++++- 2 files changed, 92 insertions(+), 4 deletions(-) diff --git a/tests/test_algos_python_compliance.py b/tests/test_algos_python_compliance.py index 0cd08f2..90f6bc3 100644 --- a/tests/test_algos_python_compliance.py +++ b/tests/test_algos_python_compliance.py @@ -2,20 +2,26 @@ import pytest from tsdownsample import ( - FPCSDownsampler, # TODO -> include MinMaxLTTB? + FPCSDownsampler, LTTBDownsampler, M4Downsampler, MinMaxDownsampler, + # MinMaxLTTBDownsampler, + NaNFPCSDownsampler, NaNM4Downsampler, NaNMinMaxDownsampler, + # NaNMinMaxLTTBDownsampler, ) from tsdownsample._python.downsamplers import ( FPCS_py, LTTB_py, M4_py, MinMax_py, + # MinMaxLTTB_py, + NaNFPCS_py, NaNM4_py, NaNMinMax_py, + # NaNMinMaxLTTB_py, ) @@ -25,10 +31,13 @@ (MinMaxDownsampler(), MinMax_py()), (M4Downsampler(), M4_py()), (LTTBDownsampler(), LTTB_py()), + # (MinMaxLTTBDownsampler(), MinMaxLTTB_py()), (FPCSDownsampler(), FPCS_py()), # Include NaN downsamplers (NaNMinMaxDownsampler(), NaNMinMax_py()), (NaNM4Downsampler(), NaNM4_py()), + # (NaNMinMaxLTTBDownsampler(), NaNMinMaxLTTB_py()), + (NaNFPCSDownsampler(), NaNFPCS_py()), ], ) @pytest.mark.parametrize("n", [10_000, 10_032, 20_321, 23_489]) @@ -51,7 +60,12 @@ def test_resampler_accordance(rust_python_pair, n, n_out): @pytest.mark.parametrize( "rust_python_pair", - [(NaNMinMaxDownsampler(), NaNMinMax_py()), (NaNM4Downsampler(), NaNM4_py())], + [ + (NaNMinMaxDownsampler(), NaNMinMax_py()), + (NaNM4Downsampler(), NaNM4_py()), + # (NaNMinMaxLTTBDownsampler(), NaNMinMaxLTTB_py()), + (NaNFPCSDownsampler(), NaNFPCS_py()), + ], ) @pytest.mark.parametrize("n", [10_000, 10_032, 20_321, 23_489]) @pytest.mark.parametrize("n_random_nans", [100, 200, 500, 2000, 5000]) diff --git a/tsdownsample/_python/downsamplers.py b/tsdownsample/_python/downsamplers.py index 1d711c6..0076ab0 100644 --- a/tsdownsample/_python/downsamplers.py +++ b/tsdownsample/_python/downsamplers.py @@ -4,6 +4,7 @@ import numpy as np from ..downsampling_interface import AbstractDownsampler +from abc import ABC def _get_bin_idxs(x: np.ndarray, nb_bins: int) -> np.ndarray: @@ -258,7 +259,59 @@ def _downsample( return np.array(sorted(rel_idxs)) -class FPCS_py(AbstractDownsampler): +class _MinMaxLTTB_py(AbstractDownsampler, ABC): + def __init__( + self, check_contiguous=True, x_dtype_regex_list=None, y_dtype_regex_list=None + ): + super().__init__(check_contiguous, x_dtype_regex_list, y_dtype_regex_list) + self.minmax_downsampler: AbstractDownsampler = None + self.lttb_downsampler: AbstractDownsampler = None + + def _downsample(self, x, y, n_out, **kwargs): + minmax_ratio = kwargs.get("minmax_ratio", 4) + kwargs.pop("minmax_ratio", None) # remove the minmax_ratio from kwargs + + # Is fine for this implementation as this is only used for testing + if x is None: + x = np.arange(y.shape[0]) + + n_1 = len(x) - 1 + idxs = self.minmax_downsampler.downsample( + x[1:n_1], y[1:n_1], n_out=n_out * minmax_ratio, **kwargs + ) + idxs += 1 + idxs = np.concat(([0], idxs, [len(y) - 1])).ravel() + print("idxs", idxs) + return idxs[ + self.lttb_downsampler.downsample(x[idxs], y[idxs], n_out=n_out, **kwargs) + ] + + +class MinMaxLTTB_py(_MinMaxLTTB_py): + def __init__( + self, check_contiguous=True, x_dtype_regex_list=None, y_dtype_regex_list=None + ): + super().__init__(check_contiguous, x_dtype_regex_list, y_dtype_regex_list) + self.minmax_downsampler = MinMax_py() + self.lttb_downsampler = LTTB_py() + + +class NaNMinMaxLTTB_py(_MinMaxLTTB_py): + def __init__( + self, check_contiguous=True, x_dtype_regex_list=None, y_dtype_regex_list=None + ): + super().__init__(check_contiguous, x_dtype_regex_list, y_dtype_regex_list) + self.minmax_downsampler = NaNMinMax_py() + self.lttb_downsampler = LTTB_py() + + +class _FPCS_py(AbstractDownsampler, ABC): + def __init__( + self, check_contiguous=True, x_dtype_regex_list=None, y_dtype_regex_list=None + ): + super().__init__(check_contiguous, x_dtype_regex_list, y_dtype_regex_list) + self.minmax_downsampler: AbstractDownsampler = None + def _downsample( self, x: Union[np.ndarray, None], y: np.ndarray, n_out: int, **kwargs ) -> np.ndarray: @@ -272,11 +325,16 @@ class Flag(Enum): Point = NamedTuple("point", [("x", int), ("val", y.dtype)]) # ------------------------------------------------------------------------ + # NOTE: is fine for this implementation as this is only used for testing + if x is None: + # Is fine for this implementation as this is only used for testing + x = np.arange(y.shape[0]) + # 0. Downsample the data using the MinMax algorithm MINMAX_FACTOR = 2 n_1 = len(x) - 1 # NOTE: as we include the first and last point, we reduce the number of points - downsampled_idxs = MinMax_py().downsample( + downsampled_idxs = self.minmax_downsampler.downsample( x[1:n_1], y[1:n_1], n_out=(n_out - 2) * MINMAX_FACTOR ) downsampled_idxs += 1 @@ -329,3 +387,19 @@ class Flag(Enum): sampled_indices.append(len(y) - 1) # append the last point # fmt: on return np.array(sampled_indices, dtype=np.int64) + + +class FPCS_py(_FPCS_py): + def __init__( + self, check_contiguous=True, x_dtype_regex_list=None, y_dtype_regex_list=None + ): + super().__init__(check_contiguous, x_dtype_regex_list, y_dtype_regex_list) + self.minmax_downsampler = MinMax_py() + + +class NaNFPCS_py(_FPCS_py): + def __init__( + self, check_contiguous=True, x_dtype_regex_list=None, y_dtype_regex_list=None + ): + super().__init__(check_contiguous, x_dtype_regex_list, y_dtype_regex_list) + self.minmax_downsampler = NaNMinMax_py() From 2bc1c3133f1eeb5bead06fc2dd305b9be5dd314e Mon Sep 17 00:00:00 2001 From: jonas Date: Mon, 3 Mar 2025 16:54:43 +0100 Subject: [PATCH 07/13] :see_no_evil: organize imports --- tsdownsample/_python/downsamplers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tsdownsample/_python/downsamplers.py b/tsdownsample/_python/downsamplers.py index 0076ab0..76299d6 100644 --- a/tsdownsample/_python/downsamplers.py +++ b/tsdownsample/_python/downsamplers.py @@ -1,10 +1,10 @@ +from abc import ABC from enum import Enum from typing import NamedTuple, Union import numpy as np from ..downsampling_interface import AbstractDownsampler -from abc import ABC def _get_bin_idxs(x: np.ndarray, nb_bins: int) -> np.ndarray: From 0339deb6f1d3a576e1df47da034d85a08f51c03b Mon Sep 17 00:00:00 2001 From: jonas Date: Tue, 4 Mar 2025 13:15:11 +0100 Subject: [PATCH 08/13] :goat: identify non-alignment between LTTB_py and LTTB_rs :mag: --- tsdownsample/_python/downsamplers.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/tsdownsample/_python/downsamplers.py b/tsdownsample/_python/downsamplers.py index 76299d6..4390dc8 100644 --- a/tsdownsample/_python/downsamplers.py +++ b/tsdownsample/_python/downsamplers.py @@ -78,8 +78,8 @@ def _downsample( # Construct the output array sampled_x = np.empty(n_out, dtype="int64") + # Add the first point sampled_x[0] = 0 - sampled_x[-1] = x.shape[0] - 1 # Convert x & y to int if it is boolean if x.dtype == np.bool_: @@ -93,7 +93,17 @@ def _downsample( LTTB_py._argmax_area( prev_x=x[a], prev_y=y[a], - avg_next_x=np.mean(x[offset[i + 1] : offset[i + 2]]), + # NOTE: In a 100% correct implementation of LTTB the next x average + # should be implemented as the following: + # avg_next_x=np.mean(x[offset[i + 1] : offset[i + 2]]), + # To improve performance we use the following approximation + # which is the average of the first and last point of the next bucket + # NOTE: this is not as accurate when x is not sampled equidistant + # or when the buckets do not contain tht much data points, but it: + # (1) aligns with visual perception (visual middle) + # (2) is much faster + # (3) is how the LTTB rust implementation works + avg_next_x=(x[offset[i + 1]] + x[offset[i + 2] - 1]) / 2.0, avg_next_y=y[offset[i + 1] : offset[i + 2]].mean(), x_bucket=x[offset[i] : offset[i + 1]], y_bucket=y[offset[i] : offset[i + 1]], @@ -115,6 +125,8 @@ def _downsample( ) + offset[-2] ) + # Always include the last point + sampled_x[-1] = x.shape[0] - 1 return sampled_x From 95b29afe84cbbe0f7e65424c1363a6f5c2fb6dc5 Mon Sep 17 00:00:00 2001 From: jonas Date: Tue, 4 Mar 2025 13:27:53 +0100 Subject: [PATCH 09/13] :dash: formatting + adding more tests --- tests/test_algos_python_compliance.py | 14 +++++++------- tests/test_tsdownsample.py | 12 ++++++++++-- tsdownsample/_python/downsamplers.py | 2 +- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/tests/test_algos_python_compliance.py b/tests/test_algos_python_compliance.py index 90f6bc3..3bcc507 100644 --- a/tests/test_algos_python_compliance.py +++ b/tests/test_algos_python_compliance.py @@ -6,22 +6,22 @@ LTTBDownsampler, M4Downsampler, MinMaxDownsampler, - # MinMaxLTTBDownsampler, + MinMaxLTTBDownsampler, NaNFPCSDownsampler, NaNM4Downsampler, NaNMinMaxDownsampler, - # NaNMinMaxLTTBDownsampler, + NaNMinMaxLTTBDownsampler, ) from tsdownsample._python.downsamplers import ( FPCS_py, LTTB_py, M4_py, MinMax_py, - # MinMaxLTTB_py, + MinMaxLTTB_py, NaNFPCS_py, NaNM4_py, NaNMinMax_py, - # NaNMinMaxLTTB_py, + NaNMinMaxLTTB_py, ) @@ -31,12 +31,12 @@ (MinMaxDownsampler(), MinMax_py()), (M4Downsampler(), M4_py()), (LTTBDownsampler(), LTTB_py()), - # (MinMaxLTTBDownsampler(), MinMaxLTTB_py()), + (MinMaxLTTBDownsampler(), MinMaxLTTB_py()), (FPCSDownsampler(), FPCS_py()), # Include NaN downsamplers (NaNMinMaxDownsampler(), NaNMinMax_py()), (NaNM4Downsampler(), NaNM4_py()), - # (NaNMinMaxLTTBDownsampler(), NaNMinMaxLTTB_py()), + (NaNMinMaxLTTBDownsampler(), NaNMinMaxLTTB_py()), (NaNFPCSDownsampler(), NaNFPCS_py()), ], ) @@ -63,7 +63,7 @@ def test_resampler_accordance(rust_python_pair, n, n_out): [ (NaNMinMaxDownsampler(), NaNMinMax_py()), (NaNM4Downsampler(), NaNM4_py()), - # (NaNMinMaxLTTBDownsampler(), NaNMinMaxLTTB_py()), + (NaNMinMaxLTTBDownsampler(), NaNMinMaxLTTB_py()), (NaNFPCSDownsampler(), NaNFPCS_py()), ], ) diff --git a/tests/test_tsdownsample.py b/tests/test_tsdownsample.py index 993faa6..dcc1c58 100644 --- a/tests/test_tsdownsample.py +++ b/tests/test_tsdownsample.py @@ -6,10 +6,12 @@ from tsdownsample import ( # MeanDownsampler,; MedianDownsampler, EveryNthDownsampler, + FPCSDownsampler, LTTBDownsampler, M4Downsampler, MinMaxDownsampler, MinMaxLTTBDownsampler, + NaNFPCSDownsampler, NaNM4Downsampler, NaNMinMaxDownsampler, NaNMinMaxLTTBDownsampler, @@ -28,12 +30,14 @@ M4Downsampler(), LTTBDownsampler(), MinMaxLTTBDownsampler(), + FPCSDownsampler(), ] RUST_NAN_DOWNSAMPLERS = [ NaNMinMaxDownsampler(), NaNM4Downsampler(), NaNMinMaxLTTBDownsampler(), + NaNFPCSDownsampler(), ] OTHER_DOWNSAMPLERS = [EveryNthDownsampler()] @@ -167,8 +171,12 @@ def test_downsampling_with_gaps_in_x(downsampler: AbstractDownsampler): idx = np.arange(len(arr)) idx[: len(idx) // 2] += len(idx) // 2 # add large gap in x s_downsampled = downsampler.downsample(idx, arr, n_out=100) - assert len(s_downsampled) <= 100 - assert len(s_downsampled) >= 66 + if "FPCS" in downsampler.__class__.__name__: + assert len(s_downsampled) >= 100 + assert len(s_downsampled) <= 200 + else: + assert len(s_downsampled) <= 100 + assert len(s_downsampled) >= 66 @pytest.mark.parametrize("downsampler", generate_rust_downsamplers()) diff --git a/tsdownsample/_python/downsamplers.py b/tsdownsample/_python/downsamplers.py index 4390dc8..b1a9c2a 100644 --- a/tsdownsample/_python/downsamplers.py +++ b/tsdownsample/_python/downsamplers.py @@ -93,7 +93,7 @@ def _downsample( LTTB_py._argmax_area( prev_x=x[a], prev_y=y[a], - # NOTE: In a 100% correct implementation of LTTB the next x average + # NOTE: In a 100% correct implementation of LTTB the next x average # should be implemented as the following: # avg_next_x=np.mean(x[offset[i + 1] : offset[i + 2]]), # To improve performance we use the following approximation From d40f5a48858b292d34855c3dba3f2c6346700d9b Mon Sep 17 00:00:00 2001 From: jonas Date: Tue, 4 Mar 2025 13:28:37 +0100 Subject: [PATCH 10/13] :mag: add per-file ignores for line length in downsamplers.py --- pyproject.toml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index 876db6e..83afd94 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,6 +43,10 @@ select = ["E", "F", "I"] line-length = 88 extend-select = ["Q"] ignore = ["E402", "F403"] + +[tool.ruff.per-file-ignores] +"tsdownsample/_python/downsamplers.py" = ["E501"] # NOTE "downsamplers.py" also works + # Formatting [tool.black] From 590fa1d782ce1857d6a0ee0d084af58fce207b89 Mon Sep 17 00:00:00 2001 From: jonas Date: Wed, 5 Mar 2025 14:18:49 +0100 Subject: [PATCH 11/13] :triangular_ruler: align python-rust (NaN) FPCS algorithm implementation --- downsample_rs/src/fpcs.rs | 120 +++++++++++++++++---------- tsdownsample/_python/downsamplers.py | 17 ++-- 2 files changed, 85 insertions(+), 52 deletions(-) diff --git a/downsample_rs/src/fpcs.rs b/downsample_rs/src/fpcs.rs index e8f8436..8252ff2 100644 --- a/downsample_rs/src/fpcs.rs +++ b/downsample_rs/src/fpcs.rs @@ -2,9 +2,11 @@ use super::minmax; use super::types::Num; use argminmax::{ArgMinMax, NaNArgMinMax}; use num_traits::{AsPrimitive, FromPrimitive}; +use std::{fmt::Debug, ops::Not}; // ------------------------------- Helper datastructures ------------------------------- -struct Point { +// NOTE: the pub(crate) is added to match the visibility of the fpcs_generic function +pub(crate) struct Point { x: usize, y: Ty, } @@ -25,24 +27,13 @@ enum Flag { // --------------------------------- helper functions ---------------------------------- #[inline(always)] -fn fpcs_inner_loop( - y: &[Ty], - min_idx: usize, - max_idx: usize, +fn fpcs_inner_core( min_point: &mut Point, max_point: &mut Point, potential_point: &mut Point, previous_min_flag: &mut Flag, sampled_indices: &mut Vec, ) { - // only update when the min/max is more extreme than the current min/max - if y[max_idx] > max_point.y { - max_point.update(max_idx, y[max_idx]); - } - if y[min_idx] < min_point.y { - min_point.update(min_idx, y[min_idx]); - } - // if the min is to the left of the max if min_point.x < max_point.x { // if the min was selected in the previos bin @@ -67,10 +58,29 @@ fn fpcs_inner_loop( } #[inline(always)] -fn fpcs_outer_loop( +fn fpcs_inner_comp( + y: &[Ty], + min_idx: usize, + max_idx: usize, + min_point: &mut Point, + max_point: &mut Point, +) { + // NOTE: the > and >= stem from the pseudocode details of the FPCS algorithm + // NOTE: this comparison is inverted as nan comparisons always return false + if (max_point.y > y[max_idx]).not() { + max_point.update(max_idx, y[max_idx]); + } + if (min_point.y <= y[min_idx]).not() { + min_point.update(min_idx, y[min_idx]); + } +} + +#[inline(always)] +fn fpcs_outer_loop( arr: &[Ty], minmax_idxs: Vec, n_out: usize, + inner_comp_fn: fn(&[Ty], usize, usize, &mut Point, &mut Point), ) -> Vec { let mut previous_min_flag: Flag = Flag::None; let mut potential_point: Point = Point { x: 0, y: arr[0] }; @@ -81,6 +91,7 @@ fn fpcs_outer_loop( // Prepend first point sampled_indices.push(0); + println!("minmax_idxs: {:?}", minmax_idxs); // Process the min/max pairs for chunk in minmax_idxs.chunks(2) { if chunk.len() == 2 { @@ -94,10 +105,9 @@ fn fpcs_outer_loop( max_idx = chunk[0]; } - fpcs_inner_loop( - arr, - min_idx, - max_idx, + inner_comp_fn(arr, min_idx, max_idx, &mut min_point, &mut max_point); + + fpcs_inner_core( &mut min_point, &mut max_point, &mut potential_point, @@ -121,39 +131,55 @@ fn fpcs_outer_loop( // ----------- WITH X macro_rules! fpcs_with_x { - ($func_name:ident, $trait:ident, $f_minmax:expr) => { + ($func_name:ident, $trait:ident, $f_minmax:expr, $f_fpcs_inner_comp:expr) => { pub fn $func_name(x: &[Tx], y: &[Ty], n_out: usize) -> Vec where for<'a> &'a [Ty]: $trait, Tx: Num + FromPrimitive + AsPrimitive, - Ty: Copy + PartialOrd, + Ty: Copy + PartialOrd + Debug, { - fpcs_generic(x, y, n_out, $f_minmax) + fpcs_generic(x, y, n_out, $f_minmax, $f_fpcs_inner_comp) } }; } -fpcs_with_x!(fpcs_with_x, ArgMinMax, minmax::min_max_with_x); -fpcs_with_x!(fpcs_with_x_nan, NaNArgMinMax, minmax::min_max_with_x_nan); +fpcs_with_x!( + fpcs_with_x, + ArgMinMax, + minmax::min_max_with_x, + fpcs_inner_comp +); +fpcs_with_x!( + fpcs_with_x_nan, + NaNArgMinMax, + minmax::min_max_with_x_nan, + fpcs_inner_comp +); // ----------- WITHOUT X macro_rules! fpcs_without_x { - ($func_name:ident, $trait:path, $f_minmax:expr) => { - pub fn $func_name(arr: &[T], n_out: usize) -> Vec + ($func_name:ident, $trait:path, $f_minmax:expr, $f_fpcs_inner_comp:expr) => { + pub fn $func_name(arr: &[T], n_out: usize) -> Vec where for<'a> &'a [T]: $trait, { - fpcs_generic_without_x(arr, n_out, $f_minmax) + fpcs_generic_without_x(arr, n_out, $f_minmax, $f_fpcs_inner_comp) } }; } -fpcs_without_x!(fpcs_without_x, ArgMinMax, minmax::min_max_without_x); +fpcs_without_x!( + fpcs_without_x, + ArgMinMax, + minmax::min_max_without_x, + fpcs_inner_comp +); fpcs_without_x!( fpcs_without_x_nan, NaNArgMinMax, - minmax::min_max_without_x_nan + minmax::min_max_without_x_nan, + fpcs_inner_comp ); // ------------------------------------- PARALLEL -------------------------------------- @@ -161,15 +187,15 @@ fpcs_without_x!( // ----------- WITH X macro_rules! fpcs_with_x_parallel { - ($func_name:ident, $trait:path, $f_argminmax:expr) => { + ($func_name:ident, $trait:path, $f_argminmax:expr, $f_fpcs_inner_comp:expr) => { pub fn $func_name(x: &[Tx], y: &[Ty], n_out: usize) -> Vec where for<'a> &'a [Ty]: $trait, Tx: Num + FromPrimitive + AsPrimitive + Send + Sync, - Ty: Num + Copy + PartialOrd + Send + Sync, + Ty: Num + Copy + PartialOrd + Send + Sync + Debug, { // collect the parrallel iterator to a vector - fpcs_generic(x, y, n_out, $f_argminmax) + fpcs_generic(x, y, n_out, $f_argminmax, $f_fpcs_inner_comp) } }; } @@ -177,19 +203,21 @@ macro_rules! fpcs_with_x_parallel { fpcs_with_x_parallel!( fpcs_with_x_parallel, ArgMinMax, - minmax::min_max_with_x_parallel + minmax::min_max_with_x_parallel, + fpcs_inner_comp ); fpcs_with_x_parallel!( fpcs_with_x_parallel_nan, NaNArgMinMax, - minmax::min_max_with_x_parallel_nan + minmax::min_max_with_x_parallel_nan, + fpcs_inner_comp ); // ----------- WITHOUT X macro_rules! fpcs_without_x_parallel { - ($func_name:ident, $trait:path, $f_argminmax:expr) => { - pub fn $func_name( + ($func_name:ident, $trait:path, $f_argminmax:expr, $f_fpcs_inner_comp:expr) => { + pub fn $func_name( arr: &[Ty], n_out: usize, ) -> Vec @@ -197,7 +225,7 @@ macro_rules! fpcs_without_x_parallel { for<'a> &'a [Ty]: $trait, // Ty: Num + AsPrimitive + Send + Sync, { - fpcs_generic_without_x(arr, n_out, $f_argminmax) + fpcs_generic_without_x(arr, n_out, $f_argminmax, $f_fpcs_inner_comp) } }; } @@ -205,12 +233,14 @@ macro_rules! fpcs_without_x_parallel { fpcs_without_x_parallel!( fpcs_without_x_parallel, ArgMinMax, - minmax::min_max_without_x_parallel + minmax::min_max_without_x_parallel, + fpcs_inner_comp ); fpcs_without_x_parallel!( fpcs_without_x_parallel_nan, NaNArgMinMax, - minmax::min_max_without_x_parallel_nan + minmax::min_max_without_x_parallel_nan, + fpcs_inner_comp ); // ------------------------------------- GENERICS -------------------------------------- @@ -218,29 +248,31 @@ fpcs_without_x_parallel!( // ----------- WITH X #[inline(always)] -pub(crate) fn fpcs_generic, Ty: PartialOrd + Copy>( +pub(crate) fn fpcs_generic, Ty: PartialOrd + Copy + Debug>( x: &[Tx], y: &[Ty], n_out: usize, f_minmax: fn(&[Tx], &[Ty], usize) -> Vec, + fpcs_inner_comp: fn(&[Ty], usize, usize, &mut Point, &mut Point), ) -> Vec { assert_eq!(x.len(), y.len()); let mut minmax_idxs = f_minmax(&x[1..(x.len() - 1)], &y[1..(x.len() - 1)], (n_out - 2) * 2); minmax_idxs.iter_mut().for_each(|elem| *elem += 1); // inplace + 1 - return fpcs_outer_loop(y, minmax_idxs, n_out); + return fpcs_outer_loop(y, minmax_idxs, n_out, fpcs_inner_comp); } // ----------- WITHOUT X #[inline(always)] -pub(crate) fn fpcs_generic_without_x( - arr: &[T], +pub(crate) fn fpcs_generic_without_x( + arr: &[Ty], n_out: usize, - f_minmax: fn(&[T], usize) -> Vec, + f_minmax: fn(&[Ty], usize) -> Vec, + fpcs_inner_comp: fn(&[Ty], usize, usize, &mut Point, &mut Point), ) -> Vec { let mut minmax_idxs: Vec = f_minmax(&arr[1..(arr.len() - 1)], (n_out - 2) * 2); minmax_idxs.iter_mut().for_each(|elem| *elem += 1); // inplace + 1 - return fpcs_outer_loop(arr, minmax_idxs, n_out); + return fpcs_outer_loop(arr, minmax_idxs, n_out, fpcs_inner_comp); } // - diff --git a/tsdownsample/_python/downsamplers.py b/tsdownsample/_python/downsamplers.py index b1a9c2a..1e8c68d 100644 --- a/tsdownsample/_python/downsamplers.py +++ b/tsdownsample/_python/downsamplers.py @@ -293,7 +293,6 @@ def _downsample(self, x, y, n_out, **kwargs): ) idxs += 1 idxs = np.concat(([0], idxs, [len(y) - 1])).ravel() - print("idxs", idxs) return idxs[ self.lttb_downsampler.downsample(x[idxs], y[idxs], n_out=n_out, **kwargs) ] @@ -334,7 +333,7 @@ class Flag(Enum): MAX = 0 # 0: a max has been retained MIN = 1 # 1: a min has been retained - Point = NamedTuple("point", [("x", int), ("val", y.dtype)]) + Point = NamedTuple("point", [("x", int), ("y", y.dtype)]) # ------------------------------------------------------------------------ # NOTE: is fine for this implementation as this is only used for testing @@ -366,10 +365,12 @@ class Flag(Enum): bin_min = Point(min_idx, y[min_idx]) bin_max = Point(max_idxs, y[max_idxs]) - # update the max and min points based on the extrema of the current bin - if max_point.val < bin_max.val: + # Use the (nan-aware) comparison function to update the min and max points + # As comparisons with NaN always return False, we inverted the comparison + # the (inverted) > and <= stem from the pseudo code details in the paper + if not (max_point.y > bin_max.y): max_point = bin_max - if min_point.val > bin_min.val: + if not (min_point.y <= bin_min.y): min_point = bin_min # if the min is to the left of the max @@ -381,7 +382,7 @@ class Flag(Enum): sampled_indices.append(potential_point.x) sampled_indices.append(min_point.x) # receiving min_point b4 max_point -> retain min_point - potential_point = (max_point) # update potential point to unselected max_point + potential_point = max_point # update potential point to unselected max_point min_point = max_point # update min_point to unselected max_point previous_min_flag = Flag.MIN # min_point has been selected @@ -392,11 +393,11 @@ class Flag(Enum): sampled_indices.append(potential_point.x) sampled_indices.append(max_point.x) # receiving max_point b4 min_point -> retain max_point - potential_point = (min_point) # update potential point to unselected min_point + potential_point = min_point # update potential point to unselected min_point max_point = min_point # update max_point to unselected min_point previous_min_flag = Flag.MAX # max_point has been selected - sampled_indices.append(len(y) - 1) # append the last point + sampled_indices.append(len(y) - 1) # append the last point # fmt: on return np.array(sampled_indices, dtype=np.int64) From 5fcc477a4d619927dddcd0bab6ceb9452f219b25 Mon Sep 17 00:00:00 2001 From: jonas Date: Wed, 5 Mar 2025 14:19:09 +0100 Subject: [PATCH 12/13] :pushpin: add tests for NaN and non-NaN resampler accordance --- tests/test_algos_python_compliance.py | 30 +++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/tests/test_algos_python_compliance.py b/tests/test_algos_python_compliance.py index 3bcc507..8749c93 100644 --- a/tests/test_algos_python_compliance.py +++ b/tests/test_algos_python_compliance.py @@ -84,3 +84,33 @@ def test_nan_resampler_accordance(rust_python_pair, n, n_random_nans, n_out): rust_downsampler.downsample(x, y, n_out=n_out), python_downsampler.downsample(x, y, n_out=n_out), ) + + +@pytest.mark.parametrize( + "nan_no_nan_pair", + [ + (NaNFPCS_py(), FPCS_py()), + (NaNM4_py(), M4_py()), + (NaNMinMax_py(), MinMax_py()), + (NaNMinMaxLTTB_py(), MinMaxLTTB_py()), + (NaNFPCSDownsampler(), FPCSDownsampler()), + (NaNM4Downsampler(), M4Downsampler()), + (NaNMinMaxDownsampler(), MinMaxDownsampler()), + (NaNMinMaxLTTBDownsampler(), MinMaxLTTBDownsampler()), + ], +) +@pytest.mark.parametrize("n", [10_000, 10_032, 20_321, 23_489]) +@pytest.mark.parametrize("n_out", [100, 200, 252]) +def test_nan_no_nan_resampler_accordance(nan_no_nan_pair, n, n_out): + nan_downsampler, no_nan_downsampler = nan_no_nan_pair + x = np.arange(n) + y = np.random.randn(n) + # Wihtout x passed to the downsamplers + nan_result = nan_downsampler.downsample(y, n_out=n_out) + no_nan_result = no_nan_downsampler.downsample(y, n_out=n_out) + assert np.allclose(nan_result, no_nan_result) + + # With x passed to the downsamplers + nan_result = nan_downsampler.downsample(x, y, n_out=n_out) + no_nan_result = no_nan_downsampler.downsample(x, y, n_out=n_out) + assert np.allclose(nan_result, no_nan_result) From 499d53816e71db31885797d6d8e5341232e353eb Mon Sep 17 00:00:00 2001 From: jonas Date: Wed, 5 Mar 2025 14:29:44 +0100 Subject: [PATCH 13/13] :see_no_evil: remove debug trait --- downsample_rs/src/fpcs.rs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/downsample_rs/src/fpcs.rs b/downsample_rs/src/fpcs.rs index 8252ff2..2ea4c37 100644 --- a/downsample_rs/src/fpcs.rs +++ b/downsample_rs/src/fpcs.rs @@ -2,7 +2,7 @@ use super::minmax; use super::types::Num; use argminmax::{ArgMinMax, NaNArgMinMax}; use num_traits::{AsPrimitive, FromPrimitive}; -use std::{fmt::Debug, ops::Not}; +use std::ops::Not; // ------------------------------- Helper datastructures ------------------------------- // NOTE: the pub(crate) is added to match the visibility of the fpcs_generic function @@ -76,7 +76,7 @@ fn fpcs_inner_comp( } #[inline(always)] -fn fpcs_outer_loop( +fn fpcs_outer_loop( arr: &[Ty], minmax_idxs: Vec, n_out: usize, @@ -91,7 +91,6 @@ fn fpcs_outer_loop( // Prepend first point sampled_indices.push(0); - println!("minmax_idxs: {:?}", minmax_idxs); // Process the min/max pairs for chunk in minmax_idxs.chunks(2) { if chunk.len() == 2 { @@ -136,7 +135,7 @@ macro_rules! fpcs_with_x { where for<'a> &'a [Ty]: $trait, Tx: Num + FromPrimitive + AsPrimitive, - Ty: Copy + PartialOrd + Debug, + Ty: Copy + PartialOrd, { fpcs_generic(x, y, n_out, $f_minmax, $f_fpcs_inner_comp) } @@ -160,7 +159,7 @@ fpcs_with_x!( macro_rules! fpcs_without_x { ($func_name:ident, $trait:path, $f_minmax:expr, $f_fpcs_inner_comp:expr) => { - pub fn $func_name(arr: &[T], n_out: usize) -> Vec + pub fn $func_name(arr: &[T], n_out: usize) -> Vec where for<'a> &'a [T]: $trait, { @@ -192,7 +191,7 @@ macro_rules! fpcs_with_x_parallel { where for<'a> &'a [Ty]: $trait, Tx: Num + FromPrimitive + AsPrimitive + Send + Sync, - Ty: Num + Copy + PartialOrd + Send + Sync + Debug, + Ty: Num + Copy + PartialOrd + Send + Sync, { // collect the parrallel iterator to a vector fpcs_generic(x, y, n_out, $f_argminmax, $f_fpcs_inner_comp) @@ -217,7 +216,7 @@ fpcs_with_x_parallel!( macro_rules! fpcs_without_x_parallel { ($func_name:ident, $trait:path, $f_argminmax:expr, $f_fpcs_inner_comp:expr) => { - pub fn $func_name( + pub fn $func_name( arr: &[Ty], n_out: usize, ) -> Vec @@ -248,7 +247,7 @@ fpcs_without_x_parallel!( // ----------- WITH X #[inline(always)] -pub(crate) fn fpcs_generic, Ty: PartialOrd + Copy + Debug>( +pub(crate) fn fpcs_generic, Ty: PartialOrd + Copy>( x: &[Tx], y: &[Ty], n_out: usize, @@ -264,7 +263,7 @@ pub(crate) fn fpcs_generic, Ty: PartialOrd + Copy + D // ----------- WITHOUT X #[inline(always)] -pub(crate) fn fpcs_generic_without_x( +pub(crate) fn fpcs_generic_without_x( arr: &[Ty], n_out: usize, f_minmax: fn(&[Ty], usize) -> Vec,