diff --git a/timely-experiments/scripts/run_stl_online.sh b/timely-experiments/scripts/run_stl_online.sh index 4e880a2..d552bb6 100644 --- a/timely-experiments/scripts/run_stl_online.sh +++ b/timely-experiments/scripts/run_stl_online.sh @@ -21,7 +21,8 @@ do --threads=1 \ --num_processes=$NUM_PROCESSES \ --process_index=$process_index \ - --seasonality=168 & + --seasonality=168 + --prioritization=lifo & done python ../stl/stl_online_client.py \ diff --git a/timely-experiments/src/main.rs b/timely-experiments/src/main.rs index 5330e90..506e6e5 100644 --- a/timely-experiments/src/main.rs +++ b/timely-experiments/src/main.rs @@ -23,6 +23,7 @@ fn run_experiment( global_slide_size: usize, per_key_slide_size: Option>, seasonality: usize, + prioritization: &str, num_keys: usize, timesteps: usize, send_rate_hz: f32, @@ -31,10 +32,7 @@ fn run_experiment( num_processes: usize, ) { let source_str = source_str.to_string(); - // TODO: don't execute timely from args. - - // if num_processes == 1 { - // } + let prioritization_str = prioritization.to_string(); let config = if num_processes == 1 { timely::Config::process(threads) @@ -69,7 +67,11 @@ fn run_experiment( } else { source.sliding_window(global_window_size, global_slide_size) }; - let models = windows.stl_fit(seasonality); + let models = match prioritization_str.as_str() { + "fifo" => windows.stl_fit(seasonality), + "lifo" => windows.stl_fit_lifo(seasonality), + _ => panic!("Unsupported prioritization strategy."), + }; models.to_redis(); // models.inspect(|((k, v), t, count)| { @@ -125,6 +127,13 @@ fn main() { .takes_value(true) .default_value("4"), ) + .arg( + Arg::with_name("prioritization") + .long("prioritization") + .takes_value(true) + .default_value("fifo") + .help("STL prioritization strategy. Either 'lifo' or 'fifo'"), + ) .arg( Arg::with_name("threads") .long("threads") @@ -179,6 +188,7 @@ fn main() { .parse() .unwrap(); let seasonality = matches.value_of("seasonality").unwrap().parse().unwrap(); + let prioritization: String = matches.value_of("prioritization").unwrap().parse().unwrap(); let threads = matches.value_of("threads").unwrap().parse().unwrap(); let num_processes: usize = matches.value_of("num_processes").unwrap().parse().unwrap(); let process_index: usize = matches.value_of("process_index").unwrap().parse().unwrap(); @@ -198,6 +208,7 @@ fn main() { global_slide_size, per_key_slide_size, seasonality, + &prioritization, num_keys, timesteps, send_rate, diff --git a/timely-experiments/src/operators/stl.rs b/timely-experiments/src/operators/stl.rs index 35be6ae..fc35f92 100644 --- a/timely-experiments/src/operators/stl.rs +++ b/timely-experiments/src/operators/stl.rs @@ -1,3 +1,9 @@ +use std::{ + collections::{BinaryHeap, HashMap}, + sync::mpsc, + thread, +}; + use timely::dataflow::{operators::Map, Scope}; use differential_dataflow::{AsCollection, Collection}; @@ -7,8 +13,37 @@ use pyo3::types::{IntoPyDict, PyDict}; use crate::{Record, RecordKey, RecordValue}; +fn run_stl(seasonality: usize, window: Vec>) -> Record> +where + K: RecordKey + ToPyObject, + V: 'static + RecordValue + ToPyObject, +{ + let key = window[0].key.clone(); + let timestamp = window.last().unwrap().timestamp; + let create_time_ns = window.last().unwrap().create_time_ns; + let model: Vec = Python::with_gil(move |py| { + let dicts: Vec<&PyDict> = window.into_iter().map(|r| r.into_py_dict(py)).collect(); + + py.run( + "import sys; import os; sys.path.append(os.getcwd() + '/python/'); import stl", + None, + None, + ) + .unwrap(); + + let locals = [("window", dicts)].into_py_dict(py); + locals.set_item("seasonality", seasonality).unwrap(); + py.eval("stl.fit_window(window, seasonality)", None, Some(locals)) + .unwrap() + .extract() + .unwrap() + }); + Record::new_with_create_time(timestamp, key, model, create_time_ns) +} + pub trait STLFit { fn stl_fit(&self, seasonality: usize) -> Collection>)>; + fn stl_fit_lifo(&self, seasonality: usize) -> Collection>)>; } impl STLFit for Collection>)> @@ -18,30 +53,80 @@ where V: 'static + RecordValue + ToPyObject, { fn stl_fit(&self, seasonality: usize) -> Collection>)> { - self.map(move |(k, window)| { - let timestamp = window.last().unwrap().timestamp; - let create_time_ns = window.last().unwrap().create_time_ns; - let model: Vec = Python::with_gil(move |py| { - let dicts: Vec<&PyDict> = window.into_iter().map(|r| r.into_py_dict(py)).collect(); - - py.run( - "import sys; import os; sys.path.append(os.getcwd() + '/python/'); import stl", - None, - None, - ) - .unwrap(); - - let locals = [("window", dicts)].into_py_dict(py); - locals.set_item("seasonality", seasonality).unwrap(); - py.eval("stl.fit_window(window, seasonality)", None, Some(locals)) - .unwrap() - .extract() - .unwrap() - }); - ( - k.clone(), - Record::new_with_create_time(timestamp, k, model, create_time_ns), - ) + self.map(move |(k, window)| (k, run_stl(seasonality, window))) + } + + // TODO: find a better way to do LIFO than offloading work in a thread, + // and sending once the thread has completed and a new window arrives. + fn stl_fit_lifo(&self, seasonality: usize) -> Collection>)> { + // Spawn worker thread which pulls new records to compute and returns values. + let (operator_tx, worker_rx) = mpsc::channel(); + let (worker_tx, operator_rx) = mpsc::channel(); + thread::Builder::new() + .name("python-worker".to_string()) + .spawn(move || { + while let Ok(window) = worker_rx.recv() { + let model = run_stl(seasonality, window); + if let Err(e) = worker_tx.send(model) { + eprintln!("python-worker errored with {:?}", e); + return; + } + } + }) + .unwrap(); + + // A hashmap of windows in LIFO ordering. + let mut queues: HashMap>)>> = HashMap::new(); + // Tracks the inserted keys for round-robin. + let mut keys = vec![]; + // Used for round-robin processing of keys. + let mut round_robin_idx = 0; + // Tracks the total number of items inserted. + let mut num_items = 0; + + self.flat_map(move |(k, window)| { + num_items += 1; + println!("total items: {}", num_items); + if num_items == 1 { + operator_tx.send(window).unwrap(); + return vec![]; + } + + // Add to queue. + if !queues.contains_key(&k) { + keys.push(k.clone()); + } + let entry = queues.entry(k).or_default(); + entry.push((window[0].timestamp, window)); + + if let Ok(record) = operator_rx.try_recv() { + // Choose the next heap containing a window. + let mut heap_option = None; + for _ in 0..keys.len() { + round_robin_idx = (round_robin_idx + 1) % keys.len(); + heap_option = queues.get_mut(&keys[round_robin_idx]); + if let Some(heap) = heap_option.as_ref() { + if !heap.is_empty() { + break; + } + } + } + if let Some(heap) = heap_option { + // Get the window. + let (_t, window_to_process) = heap.pop().unwrap(); + + // Remove all older windows. + heap.clear(); + + // Spawn a new thread to process a window. + operator_tx.send(window_to_process).unwrap(); + } + + let key = record.key.clone(); + vec![(key, record)] + } else { + vec![] + } }) } } diff --git a/timely-experiments/src/record.rs b/timely-experiments/src/record.rs index 57b705d..3c4deba 100644 --- a/timely-experiments/src/record.rs +++ b/timely-experiments/src/record.rs @@ -1,4 +1,4 @@ -use std::fmt::Debug; +use std::{fmt::Debug, hash::Hash}; use abomonation::Abomonation; @@ -7,11 +7,11 @@ use differential_dataflow::Data; use pyo3::prelude::*; use pyo3::types::{IntoPyDict, PyDict}; -pub trait RecordKey: Data + Clone + Abomonation {} -impl RecordKey for T {} +pub trait RecordKey: Data + Clone + Abomonation + Hash + Send {} +impl RecordKey for T {} -pub trait RecordValue: PartialEq + Clone + Abomonation {} -impl RecordValue for T {} +pub trait RecordValue: PartialEq + Clone + Abomonation + Send {} +impl RecordValue for T {} #[derive(Clone, abomonation_derive::Abomonation, Hash)] pub struct Record { @@ -58,7 +58,7 @@ impl Eq for Record { fn assert_receiver_is_total_eq(&self) {} } -impl Debug for Record { +impl Debug for Record { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f,