Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1aa0eb9
anyof
sarahwooders Oct 7, 2021
ce03916
fix
simon-mo Oct 7, 2021
cf0ece1
add aws confg
simon-mo Oct 7, 2021
25046b0
revert
simon-mo Oct 7, 2021
2b1b2c0
add initial key policies
sarahwooders Oct 7, 2021
63c5223
add key selection policy options
sarahwooders Oct 7, 2021
8331e81
Merge branch 'main' of github.com:feature-store/experiments into key-…
sarahwooders Oct 7, 2021
60c75f5
only return key
sarahwooders Oct 7, 2021
1220cdd
start implementing time-specific weights
sarahwooders Oct 8, 2021
754ccc8
plan-level simulation
sarahwooders Oct 8, 2021
9de4f9a
add policy evaluation script
sarahwooders Oct 9, 2021
b897fa4
add wandb logging + wikipedia plot notebook
sarahwooders Oct 9, 2021
01dc41b
change to scripts
sarahwooders Oct 12, 2021
7728c35
parallelize preprocessing
sarahwooders Oct 12, 2021
7422f94
add benchmark script to get realistic inference numbers
sarahwooders Oct 13, 2021
16a5dca
semi wokring simulator - about to implement new policies and more rep…
sarahwooders Oct 14, 2021
b342494
add notbeook
sarahwooders Oct 14, 2021
f28a429
stash
sarahwooders Oct 15, 2021
3042b0c
try lp 500
sarahwooders Oct 14, 2021
6fed898
notebook
sarahwooders Oct 15, 2021
3a9669b
stash
sarahwooders Oct 15, 2021
a9a65ca
stash
sarahwooders Oct 15, 2021
2b2193b
wiki graph
sarahwooders Oct 15, 2021
7f38ccb
add rght filepath
sarahwooders Oct 15, 2021
5e0263e
reorganize wiki dir
sarahwooders Oct 19, 2021
1fdb907
rename file
sarahwooders Oct 19, 2021
6d97aa6
update README
sarahwooders Oct 19, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
925 changes: 925 additions & 0 deletions stl/notebooks/STL Offline Plots.ipynb

Large diffs are not rendered by default.

33 changes: 28 additions & 5 deletions stl/offline/config_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import numpy as np
import pandas as pd
from absl import app, flags
from ortools.linear_solver import pywraplp
from sktime.performance_metrics.forecasting import mean_squared_scaled_error

FLAGS = flags.FLAGS
Expand All @@ -23,8 +22,30 @@
required=True,
)

# TODO(simon): add flags for lp solver constraint
flags.DEFINE_integer(
"max_n_fits",
default=None,
help="Max fits for LP",
required=False,
)

flags.DEFINE_integer(
"max_loss",
default=None,
help="Max loss for LP",
required=False,
)

flags.DEFINE_string(
"objective",
default="min_loss",
help="LP optimization goal",
required=False,
)

def run_lp(df: pd.DataFrame, max_n_fits=None, max_loss=None, objective="min_loss"):
def run_lp(df: pd.DataFrame, objective="min_loss"):
from ortools.linear_solver import pywraplp
"""Run through mixed integer program to generate the best plan.

Input:
Expand All @@ -35,6 +56,8 @@ def run_lp(df: pd.DataFrame, max_n_fits=None, max_loss=None, objective="min_loss
Output:
plan(Dict[str, int]): a dictionary mapping key -> optimal n_fits such that loss is minimal.
"""
max_n_fits = FLAGS.max_n_fits
max_loss = FLAGS.max_loss
assert all(df.columns == ["key", "n_fits", "loss"])
assert objective in {"min_loss", "min_fits"}

Expand Down Expand Up @@ -96,17 +119,17 @@ def run_lp(df: pd.DataFrame, max_n_fits=None, max_loss=None, objective="min_loss


def get_loss_per_key(key: int, csv_dir):
key_one = glob(f"{csv_dir}/slide_*_key_A4Benchmark-TS{key}.csv")
key_one = glob(f"{csv_dir}/fifo_slide_*_key_{key}.csv")
assert len(key_one) > 0

oracle_residual = pd.read_csv(f"{csv_dir}/oracle_key_A4Benchmark-TS{key}.csv")[
oracle_residual = pd.read_csv(f"./oracle/{key}.csv")[
"pred_residual"
]

losses = []
for path in key_one:
slide_size = int(
os.path.basename(path).split("_key_A4")[0].replace("slide_", "")
os.path.basename(path).split("_key_")[0].replace("fifo_slide_", "")
)
df = pd.read_csv(path)
residual = df["pred_residual"]
Expand Down
8 changes: 8 additions & 0 deletions stl/offline/default_plans.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import json

plan_dir = "/data/wooders/stl/results"
slides = [1, 6, 12, 18, 24, 48, 96, 168, 192, 336, 672]

for slide in slides:
weights = {i: slide for i in range(1, 101, 1)}
open(f"{plan_dir}/plan_baseline_{slide}.json", "w").write(json.dumps(weights))
48 changes: 48 additions & 0 deletions stl/offline/evaluate_loss.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from sktime.performance_metrics.forecasting import mean_squared_scaled_error
import numpy as np
import pandas as pd
from tqdm import tqdm
import argparse

def get_loss_per_key(key: int, csv_dir, oracle_dir):
path = f"{csv_dir}/{key}.csv"

oracle_residual = pd.read_csv(f"{oracle_dir}/oracle_key_A4Benchmark-TS{key}.csv")[
"pred_residual"
]

df = pd.read_csv(path)
print(path)
residual = df["pred_residual"]
print("residual", len(residual.tolist()))
mask = ~np.isnan(residual)
print("residual", len(residual[mask].tolist()))
loss = mean_squared_scaled_error(
y_true=oracle_residual[mask], y_pred=residual[mask], y_train=df["value"]
)
loss = {
"loss": loss,
"n_fits": df["model_version"].dropna().nunique(),
}
return loss



if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Specify experiment config")
parser.add_argument("--csv-path", type=str)
parser.add_argument("--oracle-path", type=str)
args = parser.parse_args()

raw_data = []
for key in tqdm(range(1, 101)):
entry = get_loss_per_key(key, csv_dir=args.csv_path, oracle_dir=args.oracle_path)
raw_data.append({"key": key, **entry})

df = pd.DataFrame(raw_data)
print("loss per n_fits")
print(df.groupby("n_fits")["loss"].describe())
print(f"loss per key (sample of 10 out of {len(df)})")
print(df.groupby("key")["loss"].describe().sample(10))
df.to_csv("final_results.csv")

85 changes: 67 additions & 18 deletions stl/offline/evaluation.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
import argparse
import time
from multiprocessing import Pool
import json
import os
import bisect

from tqdm import tqdm
import numpy as np
import pandas as pd
import time
from statsmodels.tsa.seasonal import STL


def train(data, window_size, seasonality):
window = data[-window_size:]
values = [r["value"] for r in window]
st = time.time()
stl_result = STL(values, period=seasonality, robust=True).fit()
print(time.time() - st)
timestamp = data[-1]["timestamp"]
return {
"timestamp": timestamp,
Expand All @@ -37,27 +43,43 @@ def predict(event, model):
SEASONALITY = 24 * 7


def offline_eval(yahoo_csv_path, plan_json_path):
df = pd.read_csv(yahoo_csv_path)
df["timestamp"] = list(range(len(df)))
def offline_eval(yahoo_csv_path, plan_json_path, key, output_path):

# Headers
# processing_time window_start_seq_id window_end_seq_id key
print(output_path)

# get plan DF for key
plan_df = pd.read_json(plan_json_path)
if key is not None:
plan_df_key = plan_df[plan_df["key"] == int(key)]
else:
plan_df_key = plan_df
plan_df_key.index = pd.RangeIndex(start=0, stop=len(plan_df_key.index))

# get original data
df = pd.read_csv(yahoo_csv_path)
df["timestamp"] = list(range(len(df)))

# Given our model versions from offline plan, run training on corresponding
# events.
offline_stl = {}
for _, row in plan_df.iterrows():
records = df.iloc[row.window_start_seq_id : row.window_end_seq_id + 1].to_dict(
print(plan_df_key)
for _, row in tqdm(plan_df_key.iterrows()): # note: doesn't preserve types
st = time.time()
records = df.iloc[int(row.window_start_seq_id) : int(row.window_end_seq_id) + 1].to_dict(
orient="records"
)
#print("find time", time.time() - st)

# The yahoo dataset seasonaly can be 12hr, daily, and weekly.
# Each record is an hourly record. Here we chose weekly seasonality.
st = time.time()
trained = train(records, window_size=len(records), seasonality=SEASONALITY)
#print("fit time", time.time() - st)
offline_stl[row.processing_time] = trained

print(offline_stl.keys())


# Assign the trained model with every events in the source file.
def find_freshest_model_version(event_time, model_versions):
model_loc = bisect.bisect_left(model_versions, event_time) - 1
Expand All @@ -66,12 +88,13 @@ def find_freshest_model_version(event_time, model_versions):
return model_versions[model_loc]

df["model_version"] = [
find_freshest_model_version(et, plan_df["processing_time"])
find_freshest_model_version(et, plan_df_key["processing_time"])
for et in df["timestamp"]
]

# Run prediction!
predicted = []
print("running prediction")
for _, row in df.iterrows():
model_version = row["model_version"]
if np.isnan(model_version):
Expand All @@ -96,10 +119,27 @@ def find_freshest_model_version(event_time, model_versions):
add_df = pd.DataFrame(predicted)
for new_col in add_df.columns:
df[new_col] = add_df[new_col]
return df
print("writing", output_path)
df.to_csv(output_path, index=None)

def offline_eval_all(yahoo_path, plan_json_path, output_path, param_path):

policy_params = json.load(open(param_path))

# loop through each key
inputs = []
for key in policy_params.keys():
key_output_path = f"{output_path}/{key}.csv"
inputs.append((f"{yahoo_path}/{key}.csv", plan_json_path, key, key_output_path))

p = Pool(100)
p.starmap(offline_eval, inputs)
p.close()
return

def offline_oracle(yahoo_csv_path):


def offline_oracle(yahoo_csv_path, output_path):
df = pd.read_csv(yahoo_csv_path)
df["timestamp"] = list(range(len(df)))
df["model_version"] = "oracle"
Expand All @@ -111,15 +151,20 @@ def offline_oracle(yahoo_csv_path):
df["pred_seasonality"] = oracle_model["stl_result"].seasonal
df["pred_staleness"] = 0

return df
df.to_csv(output_path)


def run_exp(csv_path, plan_path, output_path, run_oracle=False):
def run_exp(csv_path, plan_path, output_path, run_policy=False, run_oracle=False, param_path=None):
if run_oracle:
df = offline_oracle(csv_path)
df = offline_oracle(csv_path, output_path)
elif run_policy:
offline_eval_all(csv_path, plan_path, output_path, param_path)
else:
df = offline_eval(csv_path, plan_path)
df.to_csv(output_path, index=None)

# Headers
# processing_time window_start_seq_id window_end_seq_id key
#plan_df = pd.read_json(plan_path)
offline_eval(csv_path, plan_path, None, output_path)


def _ensure_dir(path):
Expand All @@ -132,19 +177,23 @@ def main():
parser.add_argument("--offline-yahoo-csv-path", type=str)
parser.add_argument("--offline-plan-path", type=str)
parser.add_argument("--output-path", type=str)
parser.add_argument("--offline-run-oracle", type=bool, default=False)
parser.add_argument("--offline-run-oracle", default=False, action='store_true')
parser.add_argument("--run-policy", default=False, action='store_true')
parser.add_argument("--param-path", type=str, default=None)
args = parser.parse_args()

assert args.offline_yahoo_csv_path
if not args.offline_run_oracle:
assert args.offline_plan_path
_ensure_dir(args.output_path)
#_ensure_dir(args.output_path)

run_exp(
csv_path=args.offline_yahoo_csv_path,
plan_path=args.offline_plan_path,
output_path=args.output_path,
run_oracle=args.offline_run_oracle,
run_policy=args.run_policy,
param_path=args.param_path,
)


Expand Down
57 changes: 57 additions & 0 deletions stl/offline/extend_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import numpy as np
import pandas as pd
import random
import statistics
import glob
import os

max_length = 1680 # double length
noise = 2
max_seasonality = 24*7
# over_sampling_rate = 1
path = "yahoo_train_data/"
output_path = "yahoo_eval_data/"
input_path = "yahoo_train_data/*"
files = glob.glob(input_path)
print(files)
for filename in files:
df = pd.read_csv(filename)

max_outlier_value, min_outlier_value = max(df['noise']), min(df['noise'])
mean, stddev = statistics.mean(df['noise']), statistics.stdev(df['noise'])

initial_trend = df['trend'][0]
last_trend = df['trend'].iloc[-1]
trend_subtracted_series = df['trend'] - initial_trend
# trend_subtracted_series = np.repeat(trend_subtracted_series, over_sampling_rate)

seasonality = df['seasonality1'] + df['seasonality2'] + df['seasonality3']
# seasonality = np.repeat(seasonality, over_sampling_rate)

repeat_length = (len(trend_subtracted_series) // max_seasonality) * max_seasonality

count = 0
generated_trend = [last_trend] * max_length
generated_noise = [0] * max_length
generated_outlier = [0] * max_length
generated_seasonality = [0] * max_length

for i in range(max_length):
if count >= repeat_length:
count = 0
last_trend = generated_trend[i-1]
generated_trend[i] = last_trend + trend_subtracted_series[count]
generated_seasonality[i] = seasonality[count]
generated_noise[i] = random.gauss(mean, stddev)
generated_outlier[i] = 0
if random.randint(0, 100) > 100 - noise:
if random.randint(0, 100) > 50:
generated_outlier[i] = max_outlier_value * random.randint(70,100) // 100
else:
generated_outlier[i] = min_outlier_value * random.randint(70,100) // 100
count += 1

new_df = pd.DataFrame({"trend": generated_trend, "noise": generated_noise, "outlier": generated_outlier, "seasonality": generated_seasonality })
new_df['value'] = new_df['trend'] + new_df['noise'] + new_df['outlier'] + new_df['seasonality']
print(os.path.basename(filename))
new_df.to_csv(os.path.join(output_path, os.path.basename(filename)))
Loading