diff --git a/.example.env b/.example.env index 367e0396..45789838 100644 --- a/.example.env +++ b/.example.env @@ -65,6 +65,9 @@ ANOMSTACK_OPENAI_KEY= ANOMSTACK_OPENAI_MODEL=gpt-3.5-turbo # ANOMSTACK_OPENAI_MODEL=gpt-4-1106-preview +# TimeGPT Token +# ANOMSTACK_TIMEGPT_TOKEN=XXX + # some dagster env vars DAGSTER_LOG_LEVEL=DEBUG diff --git a/anomstack/jobs/llmalert.py b/anomstack/jobs/llmalert.py index 375a0edd..c0adca88 100644 --- a/anomstack/jobs/llmalert.py +++ b/anomstack/jobs/llmalert.py @@ -44,7 +44,7 @@ def build_llmalert_job(spec) -> JobDefinition: tags={MAX_RUNTIME_SECONDS_TAG: ANOMSTACK_MAX_RUNTIME_SECONDS_TAG}, ) def _dummy_job(): - @op(name=f'{spec["metric_batch"]}_noop') + @op(name=f'{spec["metric_batch"]}_llmalert_noop') def noop(): pass diff --git a/anomstack/jobs/timegptalert.py b/anomstack/jobs/timegptalert.py new file mode 100644 index 00000000..787de193 --- /dev/null +++ b/anomstack/jobs/timegptalert.py @@ -0,0 +1,142 @@ +""" +Generate timegptalert jobs and schedules. +""" + +import os +import pandas as pd +from dagster import ( + job, + op, + ScheduleDefinition, + JobDefinition, + DefaultScheduleStatus, + get_dagster_logger, +) +from nixtlats import TimeGPT +from anomstack.config import specs +from anomstack.df.resample import resample +from anomstack.jinja.render import render +from anomstack.sql.read import read_sql + + +def build_timegptalert_job(spec) -> JobDefinition: + """Builds a job definition for the TimeGPT Alert job. + + Args: + spec (dict): A dictionary containing the specifications for the job. + + Returns: + JobDefinition: A job definition for the TimeGPT Alert job. + """ + + timegpt = TimeGPT(token=os.getenv("ANOMSTACK_TIMEGPT_TOKEN")) + + logger = get_dagster_logger() + + if spec.get("disable_timegptalert"): + + @job(name=f'{spec["metric_batch"]}_timegptalert_disabled') + def _dummy_job(): + @op(name=f'{spec["metric_batch"]}_timegptalert_noop') + def noop(): + pass + + noop() + + return _dummy_job + + metric_batch = spec["metric_batch"] + db = spec["db"] + threshold = spec["alert_threshold"] + alert_methods = spec["alert_methods"] + timegptalert_recent_n = spec["timegptalert_recent_n"] + timegptalert_smooth_n = spec["timegptalert_smooth_n"] + timegptalert_metric_rounding = spec.get("timegptalert_metric_rounding", 4) + timegptalert_freq = spec.get("timegptalert_freq", "1D") + timegptalert_freq_agg = spec.get("timegptalert_freq_agg", "mean") + + @job(name=f"{metric_batch}_timegptalert_job") + def _job(): + """A job that runs the TimeGPT Alert. + + Returns: + None + """ + + @op(name=f"{metric_batch}_get_timegptalert_data") + def get_timegptalert_data() -> pd.DataFrame: + """An operation that retrieves the data for the TimeGPT Alert. + + Returns: + pd.DataFrame: A pandas DataFrame containing the data for the TimeGPT Alert. + """ + + df = read_sql(render("timegpt_sql", spec), db) + + return df + + @op(name=f"{metric_batch}_timegptalert") + def timegptalert(context, df: pd.DataFrame) -> None: + """An operation that runs the TimeGPT Alert. + + Args: + context: The context of the operation. + df (pd.DataFrame): A pandas DataFrame containing the data for the TimeGPT Alert. + + Returns: + None + """ + + for metric_name in df["metric_name"].unique(): + df_metric = ( + df[df.metric_name == metric_name] + .sort_values(by="metric_timestamp", ascending=True) + .reset_index(drop=True) + ).dropna() + df_metric["metric_timestamp"] = pd.to_datetime( + df_metric["metric_timestamp"] + ) + + if timegptalert_smooth_n > 0: + df_metric["metric_value"] = ( + df_metric["metric_value"] + .rolling(timegptalert_smooth_n) + .mean() + ) + + df_metric = resample( + df, + timegptalert_freq, + timegptalert_freq_agg + ) + + timegpt_anomalies_df = timegpt.detect_anomalies( + df=df_metric[['metric_timestamp', 'metric_value']].reset_index(drop=True), + time_col="metric_timestamp", + target_col="metric_value", + freq=timegptalert_freq + ) + + logger.info(f"timegpt_anomalies_df: \n{timegpt_anomalies_df}") + + timegptalert(get_timegptalert_data()) + + return _job + + +# Build timegptalert jobs and schedules. +timegptalert_jobs = [] +timegptalert_schedules = [] +for spec_name, spec in specs.items(): + timegptalert_job = build_timegptalert_job(spec) + timegptalert_jobs.append(timegptalert_job) + if spec["timegptalert_default_schedule_status"] == "RUNNING": + timegptalert_default_schedule_status = DefaultScheduleStatus.RUNNING + else: + timegptalert_default_schedule_status = DefaultScheduleStatus.STOPPED + timegptalert_schedule = ScheduleDefinition( + job=timegptalert_job, + cron_schedule=spec["timegptalert_cron_schedule"], + default_status=timegptalert_default_schedule_status, + ) + timegptalert_schedules.append(timegptalert_schedule) diff --git a/anomstack/main.py b/anomstack/main.py index a5b276f5..c576fa34 100644 --- a/anomstack/main.py +++ b/anomstack/main.py @@ -8,6 +8,7 @@ from anomstack.jobs.change import change_jobs, change_schedules from anomstack.jobs.ingest import ingest_jobs, ingest_schedules from anomstack.jobs.llmalert import llmalert_jobs, llmalert_schedules +from anomstack.jobs.timegptalert import timegptalert_jobs, timegptalert_schedules from anomstack.jobs.plot import plot_jobs, plot_schedules from anomstack.jobs.score import score_jobs, score_schedules from anomstack.jobs.train import train_jobs, train_schedules @@ -15,15 +16,17 @@ # from anomstack.sensors.failure import email_on_run_failure jobs = ( + ( ingest_jobs + train_jobs + score_jobs + alert_jobs + llmalert_jobs + + timegptalert_jobs + plot_jobs + change_jobs ) -# sensors = [email_on_run_failure] +# ) schedules = ( ingest_schedules + train_schedules @@ -32,7 +35,9 @@ + llmalert_schedules + plot_schedules + change_schedules + + timegptalert_schedules ) +sensors = [email_on_run_failure] defs = Definitions( jobs=jobs, diff --git a/metrics/defaults/defaults.yaml b/metrics/defaults/defaults.yaml index 15556f3e..a5999145 100644 --- a/metrics/defaults/defaults.yaml +++ b/metrics/defaults/defaults.yaml @@ -36,6 +36,12 @@ alert_methods: "email" # comma separated list of alert methods to use eg "email, llmalert_recent_n: 5 # only llmalert on recent n so as to avoid continually alerting. llmalert_smooth_n: 3 # smooth metric value prior to sending to llm. llmalert_metric_rounding: 4 # round metric values to this number of decimal places. +timegptalert_recent_n: 5 # only timegptalert on recent n so as to avoid continually alerting. +timegptalert_smooth_n: 3 # smooth metric value prior to sending to timegpt. +timegptalert_metric_rounding: 4 # round metric values to this number of decimal places. +timegptalert_max_n: 180 # max n to include for timegpt in an alert. +timegptalert_freq: '1D' # how often to resample for timegpt. +timegptalert_freq_agg: 'mean' # default aggregation function to use for resampling for timegpt. preprocess_fn: > {% include "./defaults/python/preprocess.py" %} preprocess_params: @@ -56,6 +62,8 @@ change_cron_schedule: "*/5 * * * *" # cron schedule for change detection jobs change_default_schedule_status: 'STOPPED' # default schedule status for alert jobs (RUNNING or STOPPED) llmalert_cron_schedule: "*/5 * * * *" # cron schedule for llmalerting jobs llmalert_default_schedule_status: 'STOPPED' # default schedule status for llmalert jobs (RUNNING or STOPPED) +timegptalert_cron_schedule: "*/5 * * * *" # cron schedule for timegptalerting jobs +timegptalert_default_schedule_status: 'STOPPED' # default schedule status for timegptalert jobs (RUNNING or STOPPED) plot_cron_schedule: "*/5 * * * *" # cron schedule for plot jobs plot_default_schedule_status: 'STOPPED' # default schedule status for plot jobs (RUNNING or STOPPED) # default templated train sql @@ -72,6 +80,9 @@ change_sql: > # default templated plot sql plot_sql: > {% include "./defaults/sql/plot.sql" %} +# default templated timegpt sql +timegpt_sql: > + {% include "./defaults/sql/timegpt.sql" %} # default templated dashboard sql dashboard_sql: > {% include "./defaults/sql/dashboard.sql" %} @@ -81,6 +92,7 @@ disable_score: False # if you want to disable score job for some reason. disable_alert: False # if you want to disable alert job for some reason. disable_change: False # if you want to disable change detection job for some reason. disable_llmalert: True # if you want to disable llmalert job for some reason. +disable_timegptalert: True # if you want to disable timegptalert job for some reason. disable_plot: False # if you want to disable plot job for some reason. prompt_fn: > {% include "./defaults/python/prompt.py" %} diff --git a/metrics/defaults/sql/timegpt.sql b/metrics/defaults/sql/timegpt.sql new file mode 100644 index 00000000..51e455dc --- /dev/null +++ b/metrics/defaults/sql/timegpt.sql @@ -0,0 +1,71 @@ +/* +Template for generating the input data for the timegptalert job. +*/ + +with + +metric_value_data as +( +select distinct + metric_timestamp, + metric_batch, + metric_name, + avg(metric_value) as metric_value +from + {{ table_key }} +where + metric_batch = '{{ metric_batch }}' + and + metric_type = 'metric' + and + -- limit to the last {{ alert_metric_timestamp_max_days_ago }} days + cast(metric_timestamp as datetime) >= CURRENT_DATE - INTERVAL '{{ alert_metric_timestamp_max_days_ago }}' DAY +group by 1,2,3 +), + +metric_value_recency_ranked as +( +select distinct + metric_timestamp, + metric_batch, + metric_name, + metric_value, + rank() over (partition by metric_name order by metric_timestamp desc) as metric_value_recency_rank +from + metric_value_data +), + +data_ranked as +( +select + m.metric_timestamp, + m.metric_batch, + m.metric_name, + m.metric_value, + m.metric_value_recency_rank +from + metric_value_recency_ranked m +), + +data_timegpt as +( +select + metric_timestamp, + metric_batch, + metric_name, + metric_value +from + data_ranked +where + -- only plot most recent {{ timegptalert_max_n }} values + metric_value_recency_rank <= {{ timegptalert_max_n }} +) + +select + metric_timestamp, + metric_batch, + metric_name, + metric_value +from + data_timegpt +; \ No newline at end of file diff --git a/metrics/examples/bigquery/bigquery_example_simple/bigquery_example_simple.yaml b/metrics/examples/bigquery/bigquery_example_simple/bigquery_example_simple.yaml index 04892edb..4667d5d7 100644 --- a/metrics/examples/bigquery/bigquery_example_simple/bigquery_example_simple.yaml +++ b/metrics/examples/bigquery/bigquery_example_simple/bigquery_example_simple.yaml @@ -8,10 +8,16 @@ ingest_cron_schedule: "*/10 * * * *" # ingest every 10 minutes train_cron_schedule: "15 */1 * * *" # train every 1 hour score_cron_schedule: "*/15 * * * *" # score every 15 minutes alert_cron_schedule: "*/20 * * * *" # alert every 20 minutes +llmalert_cron_schedule: "*/20 * * * *" # llmalert every 20 minutes +timegptalert_cron_schedule: "*/20 * * * *" # timegptalert every 20 minutes change_cron_schedule: "*/20 * * * *" # change detection every 20 minutes llmalert_cron_schedule: "*/20 * * * *" # alert every 20 minutes disable_llmalert: True plot_cron_schedule: "*/25 * * * *" # plot every 25 minutes +timegptalert_freq: '1H' +timegptalert_freq_agg: 'mean' +disable_llmalert: True +disable_timegptalert: False alert_always: False alert_methods: "email" # metric_tags is a map of metric key value tags to metric names diff --git a/requirements.txt b/requirements.txt index b54bb174..f55899ea 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,6 +12,7 @@ google-auth google-cloud-bigquery Jinja2 matplotlib +nixtlats numpy oscrypto openai>=1.2.4