From 77a5abda0cf0745a9b8cd3adcde01b35d78ff265 Mon Sep 17 00:00:00 2001 From: Andrew Maguire Date: Thu, 9 Nov 2023 14:01:56 +0000 Subject: [PATCH] Add TimeGPT token and job, enable TimeGPT alerting --- .example.env | 3 + anomstack/jobs/llmalert.py | 2 +- anomstack/jobs/timegptalert.py | 142 ++++++++++++++++++ anomstack/main.py | 14 +- docker-compose.yaml | 9 ++ metrics/defaults/defaults.yaml | 12 ++ metrics/defaults/sql/timegpt.sql | 71 +++++++++ .../bigquery_example_simple.yaml | 8 +- requirements.txt | 1 + 9 files changed, 257 insertions(+), 5 deletions(-) create mode 100644 anomstack/jobs/timegptalert.py create mode 100644 metrics/defaults/sql/timegpt.sql diff --git a/.example.env b/.example.env index f982ce90..cb7f2335 100644 --- a/.example.env +++ b/.example.env @@ -64,5 +64,8 @@ # ANOMSTACK_OPENAI_KEY=XXX # ANOMSTACK_OPENAI_MODEL=gpt-3.5-turbo +# TimeGPT Token +# ANOMSTACK_TIMEGPT_TOKEN=XXX + # some dagster env vars DAGSTER_LOG_LEVEL=INFO diff --git a/anomstack/jobs/llmalert.py b/anomstack/jobs/llmalert.py index 3bbf670c..bf44d059 100644 --- a/anomstack/jobs/llmalert.py +++ b/anomstack/jobs/llmalert.py @@ -40,7 +40,7 @@ def build_llmalert_job(spec) -> JobDefinition: @job(name=f'{spec["metric_batch"]}_llmalert_disabled') def _dummy_job(): - @op(name=f'{spec["metric_batch"]}_noop') + @op(name=f'{spec["metric_batch"]}_llmalert_noop') def noop(): pass diff --git a/anomstack/jobs/timegptalert.py b/anomstack/jobs/timegptalert.py new file mode 100644 index 00000000..787de193 --- /dev/null +++ b/anomstack/jobs/timegptalert.py @@ -0,0 +1,142 @@ +""" +Generate timegptalert jobs and schedules. +""" + +import os +import pandas as pd +from dagster import ( + job, + op, + ScheduleDefinition, + JobDefinition, + DefaultScheduleStatus, + get_dagster_logger, +) +from nixtlats import TimeGPT +from anomstack.config import specs +from anomstack.df.resample import resample +from anomstack.jinja.render import render +from anomstack.sql.read import read_sql + + +def build_timegptalert_job(spec) -> JobDefinition: + """Builds a job definition for the TimeGPT Alert job. + + Args: + spec (dict): A dictionary containing the specifications for the job. + + Returns: + JobDefinition: A job definition for the TimeGPT Alert job. + """ + + timegpt = TimeGPT(token=os.getenv("ANOMSTACK_TIMEGPT_TOKEN")) + + logger = get_dagster_logger() + + if spec.get("disable_timegptalert"): + + @job(name=f'{spec["metric_batch"]}_timegptalert_disabled') + def _dummy_job(): + @op(name=f'{spec["metric_batch"]}_timegptalert_noop') + def noop(): + pass + + noop() + + return _dummy_job + + metric_batch = spec["metric_batch"] + db = spec["db"] + threshold = spec["alert_threshold"] + alert_methods = spec["alert_methods"] + timegptalert_recent_n = spec["timegptalert_recent_n"] + timegptalert_smooth_n = spec["timegptalert_smooth_n"] + timegptalert_metric_rounding = spec.get("timegptalert_metric_rounding", 4) + timegptalert_freq = spec.get("timegptalert_freq", "1D") + timegptalert_freq_agg = spec.get("timegptalert_freq_agg", "mean") + + @job(name=f"{metric_batch}_timegptalert_job") + def _job(): + """A job that runs the TimeGPT Alert. + + Returns: + None + """ + + @op(name=f"{metric_batch}_get_timegptalert_data") + def get_timegptalert_data() -> pd.DataFrame: + """An operation that retrieves the data for the TimeGPT Alert. + + Returns: + pd.DataFrame: A pandas DataFrame containing the data for the TimeGPT Alert. + """ + + df = read_sql(render("timegpt_sql", spec), db) + + return df + + @op(name=f"{metric_batch}_timegptalert") + def timegptalert(context, df: pd.DataFrame) -> None: + """An operation that runs the TimeGPT Alert. + + Args: + context: The context of the operation. + df (pd.DataFrame): A pandas DataFrame containing the data for the TimeGPT Alert. + + Returns: + None + """ + + for metric_name in df["metric_name"].unique(): + df_metric = ( + df[df.metric_name == metric_name] + .sort_values(by="metric_timestamp", ascending=True) + .reset_index(drop=True) + ).dropna() + df_metric["metric_timestamp"] = pd.to_datetime( + df_metric["metric_timestamp"] + ) + + if timegptalert_smooth_n > 0: + df_metric["metric_value"] = ( + df_metric["metric_value"] + .rolling(timegptalert_smooth_n) + .mean() + ) + + df_metric = resample( + df, + timegptalert_freq, + timegptalert_freq_agg + ) + + timegpt_anomalies_df = timegpt.detect_anomalies( + df=df_metric[['metric_timestamp', 'metric_value']].reset_index(drop=True), + time_col="metric_timestamp", + target_col="metric_value", + freq=timegptalert_freq + ) + + logger.info(f"timegpt_anomalies_df: \n{timegpt_anomalies_df}") + + timegptalert(get_timegptalert_data()) + + return _job + + +# Build timegptalert jobs and schedules. +timegptalert_jobs = [] +timegptalert_schedules = [] +for spec_name, spec in specs.items(): + timegptalert_job = build_timegptalert_job(spec) + timegptalert_jobs.append(timegptalert_job) + if spec["timegptalert_default_schedule_status"] == "RUNNING": + timegptalert_default_schedule_status = DefaultScheduleStatus.RUNNING + else: + timegptalert_default_schedule_status = DefaultScheduleStatus.STOPPED + timegptalert_schedule = ScheduleDefinition( + job=timegptalert_job, + cron_schedule=spec["timegptalert_cron_schedule"], + default_status=timegptalert_default_schedule_status, + ) + timegptalert_schedules.append(timegptalert_schedule) diff --git a/anomstack/main.py b/anomstack/main.py index c7d5b228..32612df0 100644 --- a/anomstack/main.py +++ b/anomstack/main.py @@ -8,20 +8,30 @@ from anomstack.jobs.score import score_jobs, score_schedules from anomstack.jobs.alert import alert_jobs, alert_schedules from anomstack.jobs.llmalert import llmalert_jobs, llmalert_schedules +from anomstack.jobs.timegptalert import timegptalert_jobs, timegptalert_schedules from anomstack.jobs.plot import plot_jobs, plot_schedules from anomstack.sensors.failure import email_on_run_failure -jobs = ingest_jobs + train_jobs + score_jobs + alert_jobs + llmalert_jobs + plot_jobs -sensors = [email_on_run_failure] +jobs = ( + ingest_jobs + + train_jobs + + score_jobs + + alert_jobs + + llmalert_jobs + + timegptalert_jobs + + plot_jobs +) schedules = ( ingest_schedules + train_schedules + score_schedules + alert_schedules + llmalert_schedules + + timegptalert_schedules + plot_schedules ) +sensors = [email_on_run_failure] defs = Definitions( jobs=jobs, diff --git a/docker-compose.yaml b/docker-compose.yaml index 60e12a97..f513a19e 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -54,6 +54,9 @@ services: ANOMSTACK_ALERT_EMAIL_PASSWORD: ${ANOMSTACK_ALERT_EMAIL_PASSWORD} ANOMSTACK_GOOGLE_APPLICATION_CREDENTIALS: "/tmp/conf/gcp_credentials.json" ANOMSTACK_GOOGLE_APPLICATION_CREDENTIALS_JSON: ${ANOMSTACK_GOOGLE_APPLICATION_CREDENTIALS_JSON} + ANOMSTACK_OPENAI_KEY: ${ANOMSTACK_OPENAI_KEY} + ANOMSTACK_OPENAI_MODEL: ${ANOMSTACK_OPENAI_MODEL} + ANOMSTACK_TIMEGPT_TOKEN: ${ANOMSTACK_TIMEGPT_TOKEN} networks: - anomstack_network @@ -101,6 +104,9 @@ services: ANOMSTACK_ALERT_EMAIL_PASSWORD: ${ANOMSTACK_ALERT_EMAIL_PASSWORD} ANOMSTACK_GOOGLE_APPLICATION_CREDENTIALS: "/tmp/conf/gcp_credentials.json" ANOMSTACK_GOOGLE_APPLICATION_CREDENTIALS_JSON: ${ANOMSTACK_GOOGLE_APPLICATION_CREDENTIALS_JSON} + ANOMSTACK_OPENAI_KEY: ${ANOMSTACK_OPENAI_KEY} + ANOMSTACK_OPENAI_MODEL: ${ANOMSTACK_OPENAI_MODEL} + ANOMSTACK_TIMEGPT_TOKEN: ${ANOMSTACK_TIMEGPT_TOKEN} volumes: # Make docker client accessible so we can terminate containers from dagit - ${ANOMSTACK_GOOGLE_APPLICATION_CREDENTIALS}:/tmp/conf/gcp_credentials.json - /var/run/docker.sock:/var/run/docker.sock @@ -148,6 +154,9 @@ services: ANOMSTACK_ALERT_EMAIL_PASSWORD: ${ANOMSTACK_ALERT_EMAIL_PASSWORD} ANOMSTACK_GOOGLE_APPLICATION_CREDENTIALS: "/tmp/conf/gcp_credentials.json" ANOMSTACK_GOOGLE_APPLICATION_CREDENTIALS_JSON: ${ANOMSTACK_GOOGLE_APPLICATION_CREDENTIALS_JSON} + ANOMSTACK_OPENAI_KEY: ${ANOMSTACK_OPENAI_KEY} + ANOMSTACK_OPENAI_MODEL: ${ANOMSTACK_OPENAI_MODEL} + ANOMSTACK_TIMEGPT_TOKEN: ${ANOMSTACK_TIMEGPT_TOKEN} volumes: # Make docker client accessible so we can launch containers using host docker - ${ANOMSTACK_GOOGLE_APPLICATION_CREDENTIALS}:/tmp/conf/gcp_credentials.json - /var/run/docker.sock:/var/run/docker.sock diff --git a/metrics/defaults/defaults.yaml b/metrics/defaults/defaults.yaml index 1156cb4c..02f90a14 100644 --- a/metrics/defaults/defaults.yaml +++ b/metrics/defaults/defaults.yaml @@ -25,6 +25,12 @@ alert_methods: "email" # comma separated list of alert methods to use eg "email, llmalert_recent_n: 5 # only llmalert on recent n so as to avoid continually alerting. llmalert_smooth_n: 3 # smooth metric value prior to sending to llm. llmalert_metric_rounding: 4 # round metric values to this number of decimal places. +timegptalert_recent_n: 5 # only timegptalert on recent n so as to avoid continually alerting. +timegptalert_smooth_n: 3 # smooth metric value prior to sending to timegpt. +timegptalert_metric_rounding: 4 # round metric values to this number of decimal places. +timegptalert_max_n: 180 # max n to include for timegpt in an alert. +timegptalert_freq: '1D' # how often to resample for timegpt. +timegptalert_freq_agg: 'mean' # default aggregation function to use for resampling for timegpt. preprocess_fn: > {% include "./defaults/python/preprocess.py" %} preprocess_params: @@ -43,6 +49,8 @@ alert_cron_schedule: "*/5 * * * *" # cron schedule for alerting jobs alert_default_schedule_status: 'STOPPED' # default schedule status for alert jobs (RUNNING or STOPPED) llmalert_cron_schedule: "*/5 * * * *" # cron schedule for llmalerting jobs llmalert_default_schedule_status: 'STOPPED' # default schedule status for llmalert jobs (RUNNING or STOPPED) +timegptalert_cron_schedule: "*/5 * * * *" # cron schedule for timegptalerting jobs +timegptalert_default_schedule_status: 'STOPPED' # default schedule status for timegptalert jobs (RUNNING or STOPPED) plot_cron_schedule: "*/5 * * * *" # cron schedule for plot jobs plot_default_schedule_status: 'STOPPED' # default schedule status for plot jobs (RUNNING or STOPPED) # default templated train sql @@ -57,11 +65,15 @@ alert_sql: > # default templated plot sql plot_sql: > {% include "./defaults/sql/plot.sql" %} +# default templated timegpt sql +timegpt_sql: > + {% include "./defaults/sql/timegpt.sql" %} disable_ingest: False # if you want to disable ingest job for some reason. disable_train: False # if you want to disable train job for some reason. disable_score: False # if you want to disable score job for some reason. disable_alert: False # if you want to disable alert job for some reason. disable_llmalert: True # if you want to disable llmalert job for some reason. +disable_timegptalert: True # if you want to disable timegptalert job for some reason. disable_plot: False # if you want to disable plot job for some reason. prompt_fn: > {% include "./defaults/python/prompt.py" %} diff --git a/metrics/defaults/sql/timegpt.sql b/metrics/defaults/sql/timegpt.sql new file mode 100644 index 00000000..51e455dc --- /dev/null +++ b/metrics/defaults/sql/timegpt.sql @@ -0,0 +1,71 @@ +/* +Template for generating the input data for the timegptalert job. +*/ + +with + +metric_value_data as +( +select distinct + metric_timestamp, + metric_batch, + metric_name, + avg(metric_value) as metric_value +from + {{ table_key }} +where + metric_batch = '{{ metric_batch }}' + and + metric_type = 'metric' + and + -- limit to the last {{ alert_metric_timestamp_max_days_ago }} days + cast(metric_timestamp as datetime) >= CURRENT_DATE - INTERVAL '{{ alert_metric_timestamp_max_days_ago }}' DAY +group by 1,2,3 +), + +metric_value_recency_ranked as +( +select distinct + metric_timestamp, + metric_batch, + metric_name, + metric_value, + rank() over (partition by metric_name order by metric_timestamp desc) as metric_value_recency_rank +from + metric_value_data +), + +data_ranked as +( +select + m.metric_timestamp, + m.metric_batch, + m.metric_name, + m.metric_value, + m.metric_value_recency_rank +from + metric_value_recency_ranked m +), + +data_timegpt as +( +select + metric_timestamp, + metric_batch, + metric_name, + metric_value +from + data_ranked +where + -- only plot most recent {{ timegptalert_max_n }} values + metric_value_recency_rank <= {{ timegptalert_max_n }} +) + +select + metric_timestamp, + metric_batch, + metric_name, + metric_value +from + data_timegpt +; \ No newline at end of file diff --git a/metrics/examples/bigquery/bigquery_example_simple/bigquery_example_simple.yaml b/metrics/examples/bigquery/bigquery_example_simple/bigquery_example_simple.yaml index 35270d4b..dcfb5e8d 100644 --- a/metrics/examples/bigquery/bigquery_example_simple/bigquery_example_simple.yaml +++ b/metrics/examples/bigquery/bigquery_example_simple/bigquery_example_simple.yaml @@ -8,9 +8,13 @@ ingest_cron_schedule: "*/10 * * * *" # ingest every 10 minutes train_cron_schedule: "15 */1 * * *" # train every 1 hour score_cron_schedule: "*/15 * * * *" # score every 15 minutes alert_cron_schedule: "*/20 * * * *" # alert every 20 minutes -llmalert_cron_schedule: "*/20 * * * *" # alert every 20 minutes -disable_llmalert: True +llmalert_cron_schedule: "*/20 * * * *" # llmalert every 20 minutes +timegptalert_cron_schedule: "*/20 * * * *" # timegptalert every 20 minutes plot_cron_schedule: "*/25 * * * *" # plot every 25 minutes +timegptalert_freq: '1H' +timegptalert_freq_agg: 'mean' +disable_llmalert: True +disable_timegptalert: False alert_always: False alert_methods: "email" ingest_sql: > diff --git a/requirements.txt b/requirements.txt index dc299bcd..1263de6e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,6 +12,7 @@ google-auth google-cloud-bigquery Jinja2 matplotlib +nixtlats #oscrypto # for now, install from gh to fix this: https://community.snowflake.com/s/article/Python-Connector-fails-to-connect-with-LibraryNotFoundError-Error-detecting-the-version-of-libcrypto git+https://github.com/wbond/oscrypto.git@master#egg=oscrypto openai