Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion timely-experiments/scripts/run_stl_online.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ do
--threads=1 \
--num_processes=$NUM_PROCESSES \
--process_index=$process_index \
--seasonality=168 &
--seasonality=168
--prioritization=lifo &
done

python ../stl/stl_online_client.py \
Expand Down
21 changes: 16 additions & 5 deletions timely-experiments/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ fn run_experiment(
global_slide_size: usize,
per_key_slide_size: Option<HashMap<usize, usize>>,
seasonality: usize,
prioritization: &str,
num_keys: usize,
timesteps: usize,
send_rate_hz: f32,
Expand All @@ -31,10 +32,7 @@ fn run_experiment(
num_processes: usize,
) {
let source_str = source_str.to_string();
// TODO: don't execute timely from args.

// if num_processes == 1 {
// }
let prioritization_str = prioritization.to_string();

let config = if num_processes == 1 {
timely::Config::process(threads)
Expand Down Expand Up @@ -69,7 +67,11 @@ fn run_experiment(
} else {
source.sliding_window(global_window_size, global_slide_size)
};
let models = windows.stl_fit(seasonality);
let models = match prioritization_str.as_str() {
"fifo" => windows.stl_fit(seasonality),
"lifo" => windows.stl_fit_lifo(seasonality),
_ => panic!("Unsupported prioritization strategy."),
};
models.to_redis();

// models.inspect(|((k, v), t, count)| {
Expand Down Expand Up @@ -125,6 +127,13 @@ fn main() {
.takes_value(true)
.default_value("4"),
)
.arg(
Arg::with_name("prioritization")
.long("prioritization")
.takes_value(true)
.default_value("fifo")
.help("STL prioritization strategy. Either 'lifo' or 'fifo'"),
)
.arg(
Arg::with_name("threads")
.long("threads")
Expand Down Expand Up @@ -179,6 +188,7 @@ fn main() {
.parse()
.unwrap();
let seasonality = matches.value_of("seasonality").unwrap().parse().unwrap();
let prioritization: String = matches.value_of("prioritization").unwrap().parse().unwrap();
let threads = matches.value_of("threads").unwrap().parse().unwrap();
let num_processes: usize = matches.value_of("num_processes").unwrap().parse().unwrap();
let process_index: usize = matches.value_of("process_index").unwrap().parse().unwrap();
Expand All @@ -198,6 +208,7 @@ fn main() {
global_slide_size,
per_key_slide_size,
seasonality,
&prioritization,
num_keys,
timesteps,
send_rate,
Expand Down
133 changes: 109 additions & 24 deletions timely-experiments/src/operators/stl.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
use std::{
collections::{BinaryHeap, HashMap},
sync::mpsc,
thread,
};

use timely::dataflow::{operators::Map, Scope};

use differential_dataflow::{AsCollection, Collection};
Expand All @@ -7,8 +13,37 @@ use pyo3::types::{IntoPyDict, PyDict};

use crate::{Record, RecordKey, RecordValue};

fn run_stl<K, V>(seasonality: usize, window: Vec<Record<K, V>>) -> Record<K, Vec<u8>>
where
K: RecordKey + ToPyObject,
V: 'static + RecordValue + ToPyObject,
{
let key = window[0].key.clone();
let timestamp = window.last().unwrap().timestamp;
let create_time_ns = window.last().unwrap().create_time_ns;
let model: Vec<u8> = Python::with_gil(move |py| {
let dicts: Vec<&PyDict> = window.into_iter().map(|r| r.into_py_dict(py)).collect();

py.run(
"import sys; import os; sys.path.append(os.getcwd() + '/python/'); import stl",
None,
None,
)
.unwrap();

let locals = [("window", dicts)].into_py_dict(py);
locals.set_item("seasonality", seasonality).unwrap();
py.eval("stl.fit_window(window, seasonality)", None, Some(locals))
.unwrap()
.extract()
.unwrap()
});
Record::new_with_create_time(timestamp, key, model, create_time_ns)
}

pub trait STLFit<S: Scope, K: RecordKey> {
fn stl_fit(&self, seasonality: usize) -> Collection<S, (K, Record<K, Vec<u8>>)>;
fn stl_fit_lifo(&self, seasonality: usize) -> Collection<S, (K, Record<K, Vec<u8>>)>;
}

impl<S, K, V> STLFit<S, K> for Collection<S, (K, Vec<Record<K, V>>)>
Expand All @@ -18,30 +53,80 @@ where
V: 'static + RecordValue + ToPyObject,
{
fn stl_fit(&self, seasonality: usize) -> Collection<S, (K, Record<K, Vec<u8>>)> {
self.map(move |(k, window)| {
let timestamp = window.last().unwrap().timestamp;
let create_time_ns = window.last().unwrap().create_time_ns;
let model: Vec<u8> = Python::with_gil(move |py| {
let dicts: Vec<&PyDict> = window.into_iter().map(|r| r.into_py_dict(py)).collect();

py.run(
"import sys; import os; sys.path.append(os.getcwd() + '/python/'); import stl",
None,
None,
)
.unwrap();

let locals = [("window", dicts)].into_py_dict(py);
locals.set_item("seasonality", seasonality).unwrap();
py.eval("stl.fit_window(window, seasonality)", None, Some(locals))
.unwrap()
.extract()
.unwrap()
});
(
k.clone(),
Record::new_with_create_time(timestamp, k, model, create_time_ns),
)
self.map(move |(k, window)| (k, run_stl(seasonality, window)))
}

// TODO: find a better way to do LIFO than offloading work in a thread,
// and sending once the thread has completed and a new window arrives.
fn stl_fit_lifo(&self, seasonality: usize) -> Collection<S, (K, Record<K, Vec<u8>>)> {
// Spawn worker thread which pulls new records to compute and returns values.
let (operator_tx, worker_rx) = mpsc::channel();
let (worker_tx, operator_rx) = mpsc::channel();
thread::Builder::new()
.name("python-worker".to_string())
.spawn(move || {
while let Ok(window) = worker_rx.recv() {
let model = run_stl(seasonality, window);
if let Err(e) = worker_tx.send(model) {
eprintln!("python-worker errored with {:?}", e);
return;
}
}
})
.unwrap();

// A hashmap of windows in LIFO ordering.
let mut queues: HashMap<K, BinaryHeap<(usize, Vec<Record<K, V>>)>> = HashMap::new();
// Tracks the inserted keys for round-robin.
let mut keys = vec![];
// Used for round-robin processing of keys.
let mut round_robin_idx = 0;
// Tracks the total number of items inserted.
let mut num_items = 0;

self.flat_map(move |(k, window)| {
num_items += 1;
println!("total items: {}", num_items);
if num_items == 1 {
operator_tx.send(window).unwrap();
return vec![];
}

// Add to queue.
if !queues.contains_key(&k) {
keys.push(k.clone());
}
let entry = queues.entry(k).or_default();
entry.push((window[0].timestamp, window));

if let Ok(record) = operator_rx.try_recv() {
// Choose the next heap containing a window.
let mut heap_option = None;
for _ in 0..keys.len() {
round_robin_idx = (round_robin_idx + 1) % keys.len();
heap_option = queues.get_mut(&keys[round_robin_idx]);
if let Some(heap) = heap_option.as_ref() {
if !heap.is_empty() {
break;
}
}
}
if let Some(heap) = heap_option {
// Get the window.
let (_t, window_to_process) = heap.pop().unwrap();

// Remove all older windows.
heap.clear();

// Spawn a new thread to process a window.
operator_tx.send(window_to_process).unwrap();
}

let key = record.key.clone();
vec![(key, record)]
} else {
vec![]
}
})
}
}
Expand Down
12 changes: 6 additions & 6 deletions timely-experiments/src/record.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt::Debug;
use std::{fmt::Debug, hash::Hash};

use abomonation::Abomonation;

Expand All @@ -7,11 +7,11 @@ use differential_dataflow::Data;
use pyo3::prelude::*;
use pyo3::types::{IntoPyDict, PyDict};

pub trait RecordKey: Data + Clone + Abomonation {}
impl<T: Data + Clone + Abomonation> RecordKey for T {}
pub trait RecordKey: Data + Clone + Abomonation + Hash + Send {}
impl<T: Data + Clone + Abomonation + Hash + Send> RecordKey for T {}

pub trait RecordValue: PartialEq + Clone + Abomonation {}
impl<T: PartialEq + Clone + Abomonation> RecordValue for T {}
pub trait RecordValue: PartialEq + Clone + Abomonation + Send {}
impl<T: PartialEq + Clone + Abomonation + Send> RecordValue for T {}

#[derive(Clone, abomonation_derive::Abomonation, Hash)]
pub struct Record<K: RecordKey, V: RecordValue> {
Expand Down Expand Up @@ -58,7 +58,7 @@ impl<K: RecordKey, V: RecordValue> Eq for Record<K, V> {
fn assert_receiver_is_total_eq(&self) {}
}

impl<K: RecordKey, V: PartialEq + Debug + Clone + Abomonation> Debug for Record<K, V> {
impl<K: RecordKey, V: RecordValue + Debug> Debug for Record<K, V> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
Expand Down