Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .example.env
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ ANOMSTACK_OPENAI_KEY=
ANOMSTACK_OPENAI_MODEL=gpt-3.5-turbo
# ANOMSTACK_OPENAI_MODEL=gpt-4-1106-preview

# TimeGPT Token
# ANOMSTACK_TIMEGPT_TOKEN=XXX

# some dagster env vars
DAGSTER_LOG_LEVEL=DEBUG

Expand Down
2 changes: 1 addition & 1 deletion anomstack/jobs/llmalert.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def build_llmalert_job(spec) -> JobDefinition:
tags={MAX_RUNTIME_SECONDS_TAG: ANOMSTACK_MAX_RUNTIME_SECONDS_TAG},
)
def _dummy_job():
@op(name=f'{spec["metric_batch"]}_noop')
@op(name=f'{spec["metric_batch"]}_llmalert_noop')
def noop():
pass

Expand Down
142 changes: 142 additions & 0 deletions anomstack/jobs/timegptalert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
"""
Generate timegptalert jobs and schedules.
"""

import os
import pandas as pd
from dagster import (
job,
op,
ScheduleDefinition,
JobDefinition,
DefaultScheduleStatus,
get_dagster_logger,
)
from nixtlats import TimeGPT
from anomstack.config import specs
from anomstack.df.resample import resample
from anomstack.jinja.render import render
from anomstack.sql.read import read_sql


def build_timegptalert_job(spec) -> JobDefinition:
"""Builds a job definition for the TimeGPT Alert job.

Args:
spec (dict): A dictionary containing the specifications for the job.

Returns:
JobDefinition: A job definition for the TimeGPT Alert job.
"""

timegpt = TimeGPT(token=os.getenv("ANOMSTACK_TIMEGPT_TOKEN"))

logger = get_dagster_logger()

if spec.get("disable_timegptalert"):

@job(name=f'{spec["metric_batch"]}_timegptalert_disabled')
def _dummy_job():
@op(name=f'{spec["metric_batch"]}_timegptalert_noop')
def noop():
pass

noop()

return _dummy_job

metric_batch = spec["metric_batch"]
db = spec["db"]
threshold = spec["alert_threshold"]
alert_methods = spec["alert_methods"]
timegptalert_recent_n = spec["timegptalert_recent_n"]
timegptalert_smooth_n = spec["timegptalert_smooth_n"]
timegptalert_metric_rounding = spec.get("timegptalert_metric_rounding", 4)
timegptalert_freq = spec.get("timegptalert_freq", "1D")
timegptalert_freq_agg = spec.get("timegptalert_freq_agg", "mean")

@job(name=f"{metric_batch}_timegptalert_job")
def _job():
"""A job that runs the TimeGPT Alert.

Returns:
None
"""

@op(name=f"{metric_batch}_get_timegptalert_data")
def get_timegptalert_data() -> pd.DataFrame:
"""An operation that retrieves the data for the TimeGPT Alert.

Returns:
pd.DataFrame: A pandas DataFrame containing the data for the TimeGPT Alert.
"""

df = read_sql(render("timegpt_sql", spec), db)

return df

@op(name=f"{metric_batch}_timegptalert")
def timegptalert(context, df: pd.DataFrame) -> None:
"""An operation that runs the TimeGPT Alert.

Args:
context: The context of the operation.
df (pd.DataFrame): A pandas DataFrame containing the data for the TimeGPT Alert.

Returns:
None
"""

for metric_name in df["metric_name"].unique():
df_metric = (
df[df.metric_name == metric_name]
.sort_values(by="metric_timestamp", ascending=True)
.reset_index(drop=True)
).dropna()
df_metric["metric_timestamp"] = pd.to_datetime(
df_metric["metric_timestamp"]
)

if timegptalert_smooth_n > 0:
df_metric["metric_value"] = (
df_metric["metric_value"]
.rolling(timegptalert_smooth_n)
.mean()
)

df_metric = resample(
df,
timegptalert_freq,
timegptalert_freq_agg
)

timegpt_anomalies_df = timegpt.detect_anomalies(
df=df_metric[['metric_timestamp', 'metric_value']].reset_index(drop=True),
time_col="metric_timestamp",
target_col="metric_value",
freq=timegptalert_freq
)

logger.info(f"timegpt_anomalies_df: \n{timegpt_anomalies_df}")

timegptalert(get_timegptalert_data())

return _job


# Build timegptalert jobs and schedules.
timegptalert_jobs = []
timegptalert_schedules = []
for spec_name, spec in specs.items():
timegptalert_job = build_timegptalert_job(spec)
timegptalert_jobs.append(timegptalert_job)
if spec["timegptalert_default_schedule_status"] == "RUNNING":
timegptalert_default_schedule_status = DefaultScheduleStatus.RUNNING
else:
timegptalert_default_schedule_status = DefaultScheduleStatus.STOPPED
timegptalert_schedule = ScheduleDefinition(
job=timegptalert_job,
cron_schedule=spec["timegptalert_cron_schedule"],
default_status=timegptalert_default_schedule_status,
)
timegptalert_schedules.append(timegptalert_schedule)
7 changes: 6 additions & 1 deletion anomstack/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,25 @@
from anomstack.jobs.change import change_jobs, change_schedules
from anomstack.jobs.ingest import ingest_jobs, ingest_schedules
from anomstack.jobs.llmalert import llmalert_jobs, llmalert_schedules
from anomstack.jobs.timegptalert import timegptalert_jobs, timegptalert_schedules
from anomstack.jobs.plot import plot_jobs, plot_schedules
from anomstack.jobs.score import score_jobs, score_schedules
from anomstack.jobs.train import train_jobs, train_schedules

# from anomstack.sensors.failure import email_on_run_failure

jobs = (
(
ingest_jobs
+ train_jobs
+ score_jobs
+ alert_jobs
+ llmalert_jobs
+ timegptalert_jobs
+ plot_jobs
+ change_jobs
)
# sensors = [email_on_run_failure]
# )
schedules = (
ingest_schedules
+ train_schedules
Expand All @@ -32,7 +35,9 @@
+ llmalert_schedules
+ plot_schedules
+ change_schedules
+ timegptalert_schedules
)
sensors = [email_on_run_failure]

defs = Definitions(
jobs=jobs,
Expand Down
12 changes: 12 additions & 0 deletions metrics/defaults/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ alert_methods: "email" # comma separated list of alert methods to use eg "email,
llmalert_recent_n: 5 # only llmalert on recent n so as to avoid continually alerting.
llmalert_smooth_n: 3 # smooth metric value prior to sending to llm.
llmalert_metric_rounding: 4 # round metric values to this number of decimal places.
timegptalert_recent_n: 5 # only timegptalert on recent n so as to avoid continually alerting.
timegptalert_smooth_n: 3 # smooth metric value prior to sending to timegpt.
timegptalert_metric_rounding: 4 # round metric values to this number of decimal places.
timegptalert_max_n: 180 # max n to include for timegpt in an alert.
timegptalert_freq: '1D' # how often to resample for timegpt.
timegptalert_freq_agg: 'mean' # default aggregation function to use for resampling for timegpt.
preprocess_fn: >
{% include "./defaults/python/preprocess.py" %}
preprocess_params:
Expand All @@ -56,6 +62,8 @@ change_cron_schedule: "*/5 * * * *" # cron schedule for change detection jobs
change_default_schedule_status: 'STOPPED' # default schedule status for alert jobs (RUNNING or STOPPED)
llmalert_cron_schedule: "*/5 * * * *" # cron schedule for llmalerting jobs
llmalert_default_schedule_status: 'STOPPED' # default schedule status for llmalert jobs (RUNNING or STOPPED)
timegptalert_cron_schedule: "*/5 * * * *" # cron schedule for timegptalerting jobs
timegptalert_default_schedule_status: 'STOPPED' # default schedule status for timegptalert jobs (RUNNING or STOPPED)
plot_cron_schedule: "*/5 * * * *" # cron schedule for plot jobs
plot_default_schedule_status: 'STOPPED' # default schedule status for plot jobs (RUNNING or STOPPED)
# default templated train sql
Expand All @@ -72,6 +80,9 @@ change_sql: >
# default templated plot sql
plot_sql: >
{% include "./defaults/sql/plot.sql" %}
# default templated timegpt sql
timegpt_sql: >
{% include "./defaults/sql/timegpt.sql" %}
# default templated dashboard sql
dashboard_sql: >
{% include "./defaults/sql/dashboard.sql" %}
Expand All @@ -81,6 +92,7 @@ disable_score: False # if you want to disable score job for some reason.
disable_alert: False # if you want to disable alert job for some reason.
disable_change: False # if you want to disable change detection job for some reason.
disable_llmalert: True # if you want to disable llmalert job for some reason.
disable_timegptalert: True # if you want to disable timegptalert job for some reason.
disable_plot: False # if you want to disable plot job for some reason.
prompt_fn: >
{% include "./defaults/python/prompt.py" %}
71 changes: 71 additions & 0 deletions metrics/defaults/sql/timegpt.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
Template for generating the input data for the timegptalert job.
*/

with

metric_value_data as
(
select distinct
metric_timestamp,
metric_batch,
metric_name,
avg(metric_value) as metric_value
from
{{ table_key }}
where
metric_batch = '{{ metric_batch }}'
and
metric_type = 'metric'
and
-- limit to the last {{ alert_metric_timestamp_max_days_ago }} days
cast(metric_timestamp as datetime) >= CURRENT_DATE - INTERVAL '{{ alert_metric_timestamp_max_days_ago }}' DAY
group by 1,2,3
),

metric_value_recency_ranked as
(
select distinct
metric_timestamp,
metric_batch,
metric_name,
metric_value,
rank() over (partition by metric_name order by metric_timestamp desc) as metric_value_recency_rank
from
metric_value_data
),

data_ranked as
(
select
m.metric_timestamp,
m.metric_batch,
m.metric_name,
m.metric_value,
m.metric_value_recency_rank
from
metric_value_recency_ranked m
),

data_timegpt as
(
select
metric_timestamp,
metric_batch,
metric_name,
metric_value
from
data_ranked
where
-- only plot most recent {{ timegptalert_max_n }} values
metric_value_recency_rank <= {{ timegptalert_max_n }}
)

select
metric_timestamp,
metric_batch,
metric_name,
metric_value
from
data_timegpt
;
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,16 @@ ingest_cron_schedule: "*/10 * * * *" # ingest every 10 minutes
train_cron_schedule: "15 */1 * * *" # train every 1 hour
score_cron_schedule: "*/15 * * * *" # score every 15 minutes
alert_cron_schedule: "*/20 * * * *" # alert every 20 minutes
llmalert_cron_schedule: "*/20 * * * *" # llmalert every 20 minutes
timegptalert_cron_schedule: "*/20 * * * *" # timegptalert every 20 minutes
change_cron_schedule: "*/20 * * * *" # change detection every 20 minutes
llmalert_cron_schedule: "*/20 * * * *" # alert every 20 minutes
disable_llmalert: True
plot_cron_schedule: "*/25 * * * *" # plot every 25 minutes
timegptalert_freq: '1H'
timegptalert_freq_agg: 'mean'
disable_llmalert: True
disable_timegptalert: False
alert_always: False
alert_methods: "email"
# metric_tags is a map of metric key value tags to metric names
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ google-auth
google-cloud-bigquery
Jinja2
matplotlib
nixtlats
numpy
oscrypto
openai>=1.2.4
Expand Down