Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion mesher/api/detail.mpl
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ fn detail_row_to_event(row) -> Event do
user_context : Map.get(row, "user_context"),
sdk_name : Map.get(row, "sdk_name"),
sdk_version : Map.get(row, "sdk_version"),
environment : Map.get(row, "environment"),
session_id : Map.get(row, "session_id"),
received_at : Map.get(row, "received_at")
}
end
Expand All @@ -53,8 +55,10 @@ fn event_detail_to_json(row) -> String do
let user_context = Map.get(row, "user_context")
let sdk_name = Json.get(normalized, "sdk_name")
let sdk_version = Json.get(normalized, "sdk_version")
let environment = Json.get(normalized, "environment")
let session_id = Json.get(normalized, "session_id")
let received_at = Json.get(normalized, "received_at")
"""{"id":"#{id}","project_id":"#{project_id}","issue_id":"#{issue_id}","level":"#{level}","message":"#{message}","fingerprint":"#{fingerprint}","exception":#{exception},"stacktrace":#{stacktrace},"breadcrumbs":#{breadcrumbs},"tags":#{tags},"extra":#{extra},"user_context":#{user_context},"sdk_name":"#{sdk_name}","sdk_version":"#{sdk_version}","received_at":"#{received_at}"}"""
"""{"id":"#{id}","project_id":"#{project_id}","issue_id":"#{issue_id}","level":"#{level}","message":"#{message}","fingerprint":"#{fingerprint}","exception":#{exception},"stacktrace":#{stacktrace},"breadcrumbs":#{breadcrumbs},"tags":#{tags},"extra":#{extra},"user_context":#{user_context},"sdk_name":"#{sdk_name}","sdk_version":"#{sdk_version}","environment":"#{environment}","session_id":"#{session_id}","received_at":"#{received_at}"}"""
end

# Format a nullable neighbor ID for JSON output.
Expand Down
50 changes: 48 additions & 2 deletions mesher/api/search.mpl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# All handlers follow the PipelineRegistry pattern for pool lookup.

from Ingestion.Pipeline import PipelineRegistry
from Storage.Queries import list_issues_filtered, search_events_fulltext, filter_events_by_tag, list_events_for_issue
from Storage.Queries import list_issues_filtered, search_events_fulltext, filter_events_by_tag, list_events_for_issue, get_events_by_session_id
from Types.Event import Event
from Types.Issue import Issue
from Api.Helpers import query_or_default, to_json_array, require_param, get_registry, resolve_project_id
Expand Down Expand Up @@ -116,7 +116,8 @@ fn issue_row_to_cursor_issue(row) -> Issue do
event_count : 0,
first_seen : Map.get(row, "first_seen"),
last_seen : Map.get(row, "last_seen"),
assigned_to : Map.get(row, "assigned_to")
assigned_to : Map.get(row, "assigned_to"),
last_resolved_at : ""
}
end

Expand Down Expand Up @@ -158,6 +159,8 @@ fn issue_event_row_to_event(row) -> Event do
user_context : "null",
sdk_name : "",
sdk_version : "",
environment : "",
session_id : "",
received_at : Map.get(row, "received_at")
}
end
Expand Down Expand Up @@ -225,6 +228,7 @@ pub fn handle_search_issues(request) do
let status = query_or_default(request, "status", "")
let level = query_or_default(request, "level", "")
let assigned_to = query_or_default(request, "assigned_to", "")
let environment = query_or_default(request, "environment", "")
let cursor = query_or_default(request, "cursor", "")
|> decode_query_component()
let cursor_id = query_or_default(request, "cursor_id", "")
Expand All @@ -235,6 +239,7 @@ pub fn handle_search_issues(request) do
status,
level,
assigned_to,
environment,
cursor,
cursor_id,
limit_str)
Expand Down Expand Up @@ -378,3 +383,44 @@ pub fn handle_list_issue_events(request) do
Err( e) -> HTTP.response(500, json { error : e })
end
end

# Convert a session event row to JSON.
# Includes environment so the client can verify which deployment the event came from.

fn row_to_session_event_json(row) -> String do
let id = Map.get(row, "id")
let issue_id = Map.get(row, "issue_id")
let level = Map.get(row, "level")
let message = Map.get(row, "message")
let environment = Map.get(row, "environment")
let received_at = Map.get(row, "received_at")
"""{"id":"#{id}","issue_id":"#{issue_id}","level":"#{level}","message":"#{message}","environment":"#{environment}","received_at":"#{received_at}"}"""
end

# Helper: serialize session event rows and respond.

fn respond_session_events(rows) do
let body = rows
|> List.map(fn (row) do row_to_session_event_json(row) end)
|> to_json_array()
HTTP.response(200, body)
end

# Handle GET /api/v1/projects/:project_id/sessions/:session_id/events
# Returns events belonging to a specific SDK session, ordered by received_at ASC.
# Scoped to the last 24 hours (partition-pruned on the partitioned events table).
# session_id must match exactly the value the SDK sent in the event payload.

pub fn handle_session_events(request) do
let reg_pid = get_registry()
let pool = PipelineRegistry.get_pool(reg_pid)
let raw_id = require_param(request, "project_id")
let project_id = resolve_project_id(pool, raw_id)
let session_id = require_param(request, "session_id")
let limit_str = get_limit(request)
let result = get_events_by_session_id(pool, project_id, session_id, limit_str)
case result do
Ok( rows) -> respond_session_events(rows)
Err( e) -> HTTP.response(500, json { error : e })
end
end
35 changes: 33 additions & 2 deletions mesher/ingestion/routes.mpl
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ from Storage.Queries import (
delete_issue,
list_issues_by_status,
check_new_issue,
check_regression,
get_event_alert_rules,
fire_alert,
check_sample_rate,
Expand Down Expand Up @@ -185,14 +186,36 @@ is_new :: Bool) do
end
end

# Check for new-issue alerts after event processing (ALERT-03).
# Fire regression alerts when a previously resolved issue receives a new event.
# Reuses fire_matching_event_alerts with condition_type "regression" so alert
# rules with that condition type are triggered without a separate alert evaluation path.

fn handle_regression_alert(pool :: PoolHandle,
project_id :: String,
issue_id :: String,
is_regression :: Bool) do
if is_regression do
fire_matching_event_alerts(pool, project_id, "regression", issue_id)
0
else
0
end
end

# Check for new-issue and regression alerts after event processing (ALERT-03).
# new_issue fires on first occurrence; regression fires when a resolved issue regresses.

fn check_event_alerts(pool :: PoolHandle, project_id :: String, issue_id :: String) do
let new_result = check_new_issue(pool, issue_id)
case new_result do
Ok( is_new) -> handle_new_issue_alert(pool, project_id, issue_id, is_new)
Err( _) -> 0
end
let reg_result = check_regression(pool, issue_id)
case reg_result do
Ok( is_regression) -> handle_regression_alert(pool, project_id, issue_id, is_regression)
Err( _) -> 0
end
end

# Helper: broadcast event notification, issue count, check alerts, and return response
Expand Down Expand Up @@ -488,12 +511,20 @@ pub fn handle_unresolve_issue(request) do
end
end

# Helper: broadcast assignment and return success response.
# Follows the resolve_success / archive_success pattern: broadcast then respond.

fn assign_success(pool, issue_id :: String) do
broadcast_issue_update(pool, issue_id, "assigned")
HTTP.response(200, json { status : "ok" })
end

# Helper: perform assignment after extracting user_id from parsed JSON rows.

fn assign_with_user_id(pool :: PoolHandle, issue_id :: String, user_id :: String) do
let result = assign_issue(pool, issue_id, user_id)
case result do
Ok( _) -> HTTP.response(200, json { status : "ok" })
Ok( _) -> assign_success(pool, issue_id)
Err( e) -> HTTP.response(500, json { error : e })
end
end
Expand Down
3 changes: 2 additions & 1 deletion mesher/main.mpl
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ from Ingestion.Routes import (
handle_discard_issue,
handle_delete_issue
)
from Api.Search import handle_search_issues, handle_search_events, handle_filter_by_tag, handle_list_issue_events
from Api.Search import handle_search_issues, handle_search_events, handle_filter_by_tag, handle_list_issue_events, handle_session_events
from Api.Dashboard import (
handle_event_volume,
handle_error_breakdown,
Expand Down Expand Up @@ -120,6 +120,7 @@ fn start_runtime(http_port :: Int, ws_port :: Int, window_seconds :: Int, max_ev
|> HTTP.on_get("/api/v1/projects/:project_id/events/search", handle_search_events)
|> HTTP.on_get("/api/v1/projects/:project_id/events/tags", handle_filter_by_tag)
|> HTTP.on_get("/api/v1/issues/:issue_id/events", handle_list_issue_events)
|> HTTP.on_get("/api/v1/projects/:project_id/sessions/:session_id/events", handle_session_events)
|> HTTP.on_get("/api/v1/projects/:project_id/dashboard/volume", handle_event_volume)
|> HTTP.on_get("/api/v1/projects/:project_id/dashboard/levels", handle_error_breakdown)
|> HTTP.on_get("/api/v1/projects/:project_id/dashboard/top-issues", handle_top_issues)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Migration: add environment and session_id to events, last_resolved_at to issues.
# environment -- first-class column so operators can filter by deployment target
# without relying on tags JSONB. Partial index covers only rows where the SDK
# sends the field; rows from older SDKs are excluded from the index automatically.
# session_id -- correlates events from the same SDK session for session-context
# queries without decomposing user_context JSONB on every read path.
# last_resolved_at -- records the timestamp of the last manual resolve so
# regression detection (resolved -> unresolved flip) can fire regression alerts.

pub fn up(pool :: PoolHandle) -> Int ! String do
Pool.execute(pool, "ALTER TABLE events ADD COLUMN IF NOT EXISTS environment TEXT", []) ?
Pool.execute(pool, "ALTER TABLE events ADD COLUMN IF NOT EXISTS session_id TEXT", []) ?
Pool.execute(pool, "ALTER TABLE issues ADD COLUMN IF NOT EXISTS last_resolved_at TIMESTAMPTZ", []) ?
Pool.execute(pool, "CREATE INDEX IF NOT EXISTS idx_events_environment ON events (project_id, environment, received_at DESC) WHERE environment IS NOT NULL", []) ?
Pool.execute(pool, "CREATE INDEX IF NOT EXISTS idx_events_session ON events (session_id, received_at DESC) WHERE session_id IS NOT NULL", []) ?
Pool.execute(pool, "CREATE INDEX IF NOT EXISTS idx_issues_last_resolved ON issues (last_resolved_at) WHERE last_resolved_at IS NOT NULL", []) ?
Ok(0)
end

pub fn down(pool :: PoolHandle) -> Int ! String do
Pool.execute(pool, "DROP INDEX IF EXISTS idx_issues_last_resolved", []) ?
Pool.execute(pool, "DROP INDEX IF EXISTS idx_events_session", []) ?
Pool.execute(pool, "DROP INDEX IF EXISTS idx_events_environment", []) ?
Pool.execute(pool, "ALTER TABLE issues DROP COLUMN IF EXISTS last_resolved_at", []) ?
Pool.execute(pool, "ALTER TABLE events DROP COLUMN IF EXISTS session_id", []) ?
Pool.execute(pool, "ALTER TABLE events DROP COLUMN IF EXISTS environment", []) ?
Ok(0)
end
55 changes: 50 additions & 5 deletions mesher/storage/queries.mpl
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,10 @@ pub fn resolve_issue(pool :: PoolHandle, issue_id :: String) -> Int ! String do
let q = Query.from(Issue.__table__())
|> Query.where_raw("id = ?::uuid", [issue_id])
|> Query.where_raw("status != 'resolved'", [])
Repo.update_where(pool, Issue.__table__(), %{"status" => "resolved"}, q) ?
Repo.update_where_expr(pool,
Issue.__table__(),
%{"status" => Expr.value("resolved"), "last_resolved_at" => Expr.fn_call("now", [])},
q) ?
Ok(1)
end

Expand Down Expand Up @@ -544,7 +547,8 @@ pub fn list_issues_by_status(pool :: PoolHandle, project_id :: String, status ::
event_count : parse_event_count(Map.get(row, "event_count")),
first_seen : Map.get(row, "first_seen"),
last_seen : Map.get(row, "last_seen"),
assigned_to : Map.get(row, "assigned_to")
assigned_to : Map.get(row, "assigned_to"),
last_resolved_at : ""
}
end))
end
Expand Down Expand Up @@ -599,12 +603,13 @@ project_id :: String,
status :: String,
level :: String,
assigned_to :: String,
environment :: String,
cursor :: String,
cursor_id :: String,
limit_str :: String) -> List < Map < String, String > > ! String do
let lim = String.from(parse_limit(limit_str))
let sql = "SELECT id::text AS id, title::text AS title, level::text AS level, status::text AS status, event_count::text AS event_count, first_seen::text AS first_seen, last_seen::text AS last_seen, COALESCE(assigned_to::text, '') AS assigned_to FROM issues WHERE project_id = $1::uuid AND ($2 = '' OR status = $2) AND ($3 = '' OR level = $3) AND ($4 = '' OR assigned_to = NULLIF($4, '')::uuid) AND ($5 = '' OR (last_seen, id) < ($5::timestamptz, NULLIF($6, '')::uuid)) ORDER BY last_seen DESC, id DESC LIMIT $7::int"
Repo.query_raw(pool, sql, [project_id, status, level, assigned_to, cursor, cursor_id, lim])
let sql = "SELECT id::text AS id, title::text AS title, level::text AS level, status::text AS status, event_count::text AS event_count, first_seen::text AS first_seen, last_seen::text AS last_seen, COALESCE(assigned_to::text, '') AS assigned_to FROM issues WHERE project_id = $1::uuid AND ($2 = '' OR status = $2) AND ($3 = '' OR level = $3) AND ($4 = '' OR assigned_to = NULLIF($4, '')::uuid) AND ($5 = '' OR (last_seen, id) < ($5::timestamptz, NULLIF($6, '')::uuid)) AND ($8 = '' OR EXISTS (SELECT 1 FROM events WHERE issue_id = issues.id AND environment = $8 AND received_at > now() - interval '7 days')) ORDER BY last_seen DESC, id DESC LIMIT $7::int"
Repo.query_raw(pool, sql, [project_id, status, level, assigned_to, cursor, cursor_id, lim, environment])
end

# SEARCH-02: Full-text search on event messages using inline tsvector.
Expand Down Expand Up @@ -825,7 +830,9 @@ pub fn get_event_detail(pool :: PoolHandle, event_id :: String) -> List < Map <
"tags"), Expr.label(Expr.coalesce([Pg.text(Expr.column("extra")), Expr.value("{}")]), "extra"), Expr.label(Expr.coalesce([Pg.text(Expr.column("user_context")), Expr.value("null")]),
"user_context"), Expr.label(Expr.coalesce([Expr.column("sdk_name"), Expr.value("")]),
"sdk_name"), Expr.label(Expr.coalesce([Expr.column("sdk_version"), Expr.value("")]),
"sdk_version"), Expr.label(Expr.column("received_at"), "received_at")])
"sdk_version"), Expr.label(Expr.coalesce([Expr.column("environment"), Expr.value("")]),
"environment"), Expr.label(Expr.coalesce([Expr.column("session_id"), Expr.value("")]),
"session_id"), Expr.label(Expr.column("received_at"), "received_at")])
Repo.all(pool, q)
end

Expand Down Expand Up @@ -1004,6 +1011,44 @@ pub fn check_new_issue(pool :: PoolHandle, issue_id :: String) -> Bool ! String
Ok(List.length(rows) > 0)
end

# Regression detection: an issue has regressed when it is currently unresolved,
# has a recorded last_resolved_at timestamp, and its last_seen is newer than
# last_resolved_at -- meaning a new event arrived after the last manual resolve.
# Returns true only once per regression window because last_seen advances on each
# new event while last_resolved_at stays fixed until the next resolve action.
# Wired into check_event_alerts in ingestion/routes.mpl to fire "regression" alerts.

pub fn check_regression(pool :: PoolHandle, issue_id :: String) -> Bool ! String do
let q = Query.from(Issue.__table__())
|> Query.where_raw("id = ?::uuid AND status = 'unresolved' AND last_resolved_at IS NOT NULL AND last_seen > last_resolved_at",
[issue_id])
|> Query.select(["id"])
let rows = Repo.all(pool, q) ?
Ok(List.length(rows) > 0)
end

# Session-scoped event retrieval: returns events for a given session_id within a project.
# Scoped to the last 24 hours for partition pruning on the range-partitioned events table.
# Returns enough fields for session-context display without the full JSONB payload.

pub fn get_events_by_session_id(pool :: PoolHandle,
project_id :: String,
session_id :: String,
limit_str :: String) -> List < Map < String, String > > ! String do
let lim = parse_limit(limit_str)
let q = Query.from(Event.__table__())
|> Query.where_expr(Expr.eq(Expr.column("project_id"), Pg.uuid(Expr.value(project_id))))
|> Query.where_raw("session_id = ?", [session_id])
|> Query.where_raw("received_at > now() - interval '24 hours'", [])
|> Query.select_exprs([Expr.label(Pg.text(Expr.column("id")), "id"), Expr.label(Pg.text(Expr.column("issue_id")),
"issue_id"), Expr.label(Expr.column("level"), "level"), Expr.label(Expr.column("message"),
"message"), Expr.label(Expr.coalesce([Expr.column("environment"), Expr.value("")]),
"environment"), Expr.label(Pg.text(Expr.column("received_at")), "received_at")])
|> Query.order_by(:received_at, :asc)
|> Query.limit(lim)
Repo.all(pool, q)
end

# ALERT-03: Get enabled alert rules for event-based conditions for a project.
# Honest raw S03 keep-site: the live alert loop needs stable text rows and a
# truthful cooldown gate. Keep the selector explicit and pre-filter it on the
Expand Down
4 changes: 3 additions & 1 deletion mesher/storage/writer.mpl
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ json_str :: String) -> String ! String do
[event_json, Expr.value("extra")]), Pg.jsonb(Expr.value("{}"))]), "user_context" => Expr.fn_call("jsonb_extract_path",
[event_json, Expr.value("user_context")]), "sdk_name" => Expr.fn_call("jsonb_extract_path_text",
[event_json, Expr.value("sdk_name")]), "sdk_version" => Expr.fn_call("jsonb_extract_path_text",
[event_json, Expr.value("sdk_version")])}) ?
[event_json, Expr.value("sdk_version")]), "environment" => Expr.fn_call("jsonb_extract_path_text",
[event_json, Expr.value("environment")]), "session_id" => Expr.fn_call("jsonb_extract_path_text",
[event_json, Expr.value("session_id")])}) ?
Ok("stored")
end

Expand Down
4 changes: 4 additions & 0 deletions mesher/types/event.mpl
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ pub struct Event do
user_context :: String
sdk_name :: String
sdk_version :: String
environment :: String
session_id :: String
received_at :: String
belongs_to :project, Project
belongs_to :issue, Issue
Expand All @@ -90,4 +92,6 @@ pub struct EventPayload do
user_context :: String
sdk_name :: Option < String >
sdk_version :: Option < String >
environment :: Option < String >
session_id :: Option < String >
end deriving(Json)
7 changes: 5 additions & 2 deletions mesher/types/issue.mpl
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

pub type IssueStatus do
Unresolved

Resolved

Archived

Discarded
end deriving(Json)

# Database Row struct for issues. Status stored as text in DB,
Expand All @@ -25,6 +27,7 @@ pub struct Issue do
first_seen :: String
last_seen :: String
assigned_to :: String
last_resolved_at :: String
belongs_to :project, Project
has_many :events, Event
end deriving(Schema, Json, Row)
Loading