From 2a37cb034e138cc616f4884e84fec2dd58256e1e Mon Sep 17 00:00:00 2001 From: "Md.Sadiq" Date: Mon, 13 Apr 2026 21:44:25 +0530 Subject: [PATCH] Close the remaining gaps in grouping, regression detection, alerts, ownership, environments, and session context on top of the current Mesher backend --- mesher/api/detail.mpl | 6 +- mesher/api/search.mpl | 50 ++++++++++++++++- mesher/ingestion/routes.mpl | 35 +++++++++++- mesher/main.mpl | 3 +- ...000_add_environment_session_regression.mpl | 28 ++++++++++ mesher/storage/queries.mpl | 55 +++++++++++++++++-- mesher/storage/writer.mpl | 4 +- mesher/types/event.mpl | 4 ++ mesher/types/issue.mpl | 7 ++- 9 files changed, 178 insertions(+), 14 deletions(-) create mode 100644 mesher/migrations/20260413000000_add_environment_session_regression.mpl diff --git a/mesher/api/detail.mpl b/mesher/api/detail.mpl index 5897a233..c88b9254 100644 --- a/mesher/api/detail.mpl +++ b/mesher/api/detail.mpl @@ -29,6 +29,8 @@ fn detail_row_to_event(row) -> Event do user_context : Map.get(row, "user_context"), sdk_name : Map.get(row, "sdk_name"), sdk_version : Map.get(row, "sdk_version"), + environment : Map.get(row, "environment"), + session_id : Map.get(row, "session_id"), received_at : Map.get(row, "received_at") } end @@ -53,8 +55,10 @@ fn event_detail_to_json(row) -> String do let user_context = Map.get(row, "user_context") let sdk_name = Json.get(normalized, "sdk_name") let sdk_version = Json.get(normalized, "sdk_version") + let environment = Json.get(normalized, "environment") + let session_id = Json.get(normalized, "session_id") let received_at = Json.get(normalized, "received_at") - """{"id":"#{id}","project_id":"#{project_id}","issue_id":"#{issue_id}","level":"#{level}","message":"#{message}","fingerprint":"#{fingerprint}","exception":#{exception},"stacktrace":#{stacktrace},"breadcrumbs":#{breadcrumbs},"tags":#{tags},"extra":#{extra},"user_context":#{user_context},"sdk_name":"#{sdk_name}","sdk_version":"#{sdk_version}","received_at":"#{received_at}"}""" + """{"id":"#{id}","project_id":"#{project_id}","issue_id":"#{issue_id}","level":"#{level}","message":"#{message}","fingerprint":"#{fingerprint}","exception":#{exception},"stacktrace":#{stacktrace},"breadcrumbs":#{breadcrumbs},"tags":#{tags},"extra":#{extra},"user_context":#{user_context},"sdk_name":"#{sdk_name}","sdk_version":"#{sdk_version}","environment":"#{environment}","session_id":"#{session_id}","received_at":"#{received_at}"}""" end # Format a nullable neighbor ID for JSON output. diff --git a/mesher/api/search.mpl b/mesher/api/search.mpl index 189acb62..ba65471f 100644 --- a/mesher/api/search.mpl +++ b/mesher/api/search.mpl @@ -4,7 +4,7 @@ # All handlers follow the PipelineRegistry pattern for pool lookup. from Ingestion.Pipeline import PipelineRegistry -from Storage.Queries import list_issues_filtered, search_events_fulltext, filter_events_by_tag, list_events_for_issue +from Storage.Queries import list_issues_filtered, search_events_fulltext, filter_events_by_tag, list_events_for_issue, get_events_by_session_id from Types.Event import Event from Types.Issue import Issue from Api.Helpers import query_or_default, to_json_array, require_param, get_registry, resolve_project_id @@ -116,7 +116,8 @@ fn issue_row_to_cursor_issue(row) -> Issue do event_count : 0, first_seen : Map.get(row, "first_seen"), last_seen : Map.get(row, "last_seen"), - assigned_to : Map.get(row, "assigned_to") + assigned_to : Map.get(row, "assigned_to"), + last_resolved_at : "" } end @@ -158,6 +159,8 @@ fn issue_event_row_to_event(row) -> Event do user_context : "null", sdk_name : "", sdk_version : "", + environment : "", + session_id : "", received_at : Map.get(row, "received_at") } end @@ -225,6 +228,7 @@ pub fn handle_search_issues(request) do let status = query_or_default(request, "status", "") let level = query_or_default(request, "level", "") let assigned_to = query_or_default(request, "assigned_to", "") + let environment = query_or_default(request, "environment", "") let cursor = query_or_default(request, "cursor", "") |> decode_query_component() let cursor_id = query_or_default(request, "cursor_id", "") @@ -235,6 +239,7 @@ pub fn handle_search_issues(request) do status, level, assigned_to, + environment, cursor, cursor_id, limit_str) @@ -378,3 +383,44 @@ pub fn handle_list_issue_events(request) do Err( e) -> HTTP.response(500, json { error : e }) end end + +# Convert a session event row to JSON. +# Includes environment so the client can verify which deployment the event came from. + +fn row_to_session_event_json(row) -> String do + let id = Map.get(row, "id") + let issue_id = Map.get(row, "issue_id") + let level = Map.get(row, "level") + let message = Map.get(row, "message") + let environment = Map.get(row, "environment") + let received_at = Map.get(row, "received_at") + """{"id":"#{id}","issue_id":"#{issue_id}","level":"#{level}","message":"#{message}","environment":"#{environment}","received_at":"#{received_at}"}""" +end + +# Helper: serialize session event rows and respond. + +fn respond_session_events(rows) do + let body = rows + |> List.map(fn (row) do row_to_session_event_json(row) end) + |> to_json_array() + HTTP.response(200, body) +end + +# Handle GET /api/v1/projects/:project_id/sessions/:session_id/events +# Returns events belonging to a specific SDK session, ordered by received_at ASC. +# Scoped to the last 24 hours (partition-pruned on the partitioned events table). +# session_id must match exactly the value the SDK sent in the event payload. + +pub fn handle_session_events(request) do + let reg_pid = get_registry() + let pool = PipelineRegistry.get_pool(reg_pid) + let raw_id = require_param(request, "project_id") + let project_id = resolve_project_id(pool, raw_id) + let session_id = require_param(request, "session_id") + let limit_str = get_limit(request) + let result = get_events_by_session_id(pool, project_id, session_id, limit_str) + case result do + Ok( rows) -> respond_session_events(rows) + Err( e) -> HTTP.response(500, json { error : e }) + end +end diff --git a/mesher/ingestion/routes.mpl b/mesher/ingestion/routes.mpl index a9d0fc7d..153c6554 100644 --- a/mesher/ingestion/routes.mpl +++ b/mesher/ingestion/routes.mpl @@ -19,6 +19,7 @@ from Storage.Queries import ( delete_issue, list_issues_by_status, check_new_issue, + check_regression, get_event_alert_rules, fire_alert, check_sample_rate, @@ -185,7 +186,24 @@ is_new :: Bool) do end end -# Check for new-issue alerts after event processing (ALERT-03). +# Fire regression alerts when a previously resolved issue receives a new event. +# Reuses fire_matching_event_alerts with condition_type "regression" so alert +# rules with that condition type are triggered without a separate alert evaluation path. + +fn handle_regression_alert(pool :: PoolHandle, +project_id :: String, +issue_id :: String, +is_regression :: Bool) do + if is_regression do + fire_matching_event_alerts(pool, project_id, "regression", issue_id) + 0 + else + 0 + end +end + +# Check for new-issue and regression alerts after event processing (ALERT-03). +# new_issue fires on first occurrence; regression fires when a resolved issue regresses. fn check_event_alerts(pool :: PoolHandle, project_id :: String, issue_id :: String) do let new_result = check_new_issue(pool, issue_id) @@ -193,6 +211,11 @@ fn check_event_alerts(pool :: PoolHandle, project_id :: String, issue_id :: Stri Ok( is_new) -> handle_new_issue_alert(pool, project_id, issue_id, is_new) Err( _) -> 0 end + let reg_result = check_regression(pool, issue_id) + case reg_result do + Ok( is_regression) -> handle_regression_alert(pool, project_id, issue_id, is_regression) + Err( _) -> 0 + end end # Helper: broadcast event notification, issue count, check alerts, and return response @@ -488,12 +511,20 @@ pub fn handle_unresolve_issue(request) do end end +# Helper: broadcast assignment and return success response. +# Follows the resolve_success / archive_success pattern: broadcast then respond. + +fn assign_success(pool, issue_id :: String) do + broadcast_issue_update(pool, issue_id, "assigned") + HTTP.response(200, json { status : "ok" }) +end + # Helper: perform assignment after extracting user_id from parsed JSON rows. fn assign_with_user_id(pool :: PoolHandle, issue_id :: String, user_id :: String) do let result = assign_issue(pool, issue_id, user_id) case result do - Ok( _) -> HTTP.response(200, json { status : "ok" }) + Ok( _) -> assign_success(pool, issue_id) Err( e) -> HTTP.response(500, json { error : e }) end end diff --git a/mesher/main.mpl b/mesher/main.mpl index 64fe95c2..2ec97cd4 100644 --- a/mesher/main.mpl +++ b/mesher/main.mpl @@ -33,7 +33,7 @@ from Ingestion.Routes import ( handle_discard_issue, handle_delete_issue ) -from Api.Search import handle_search_issues, handle_search_events, handle_filter_by_tag, handle_list_issue_events +from Api.Search import handle_search_issues, handle_search_events, handle_filter_by_tag, handle_list_issue_events, handle_session_events from Api.Dashboard import ( handle_event_volume, handle_error_breakdown, @@ -120,6 +120,7 @@ fn start_runtime(http_port :: Int, ws_port :: Int, window_seconds :: Int, max_ev |> HTTP.on_get("/api/v1/projects/:project_id/events/search", handle_search_events) |> HTTP.on_get("/api/v1/projects/:project_id/events/tags", handle_filter_by_tag) |> HTTP.on_get("/api/v1/issues/:issue_id/events", handle_list_issue_events) + |> HTTP.on_get("/api/v1/projects/:project_id/sessions/:session_id/events", handle_session_events) |> HTTP.on_get("/api/v1/projects/:project_id/dashboard/volume", handle_event_volume) |> HTTP.on_get("/api/v1/projects/:project_id/dashboard/levels", handle_error_breakdown) |> HTTP.on_get("/api/v1/projects/:project_id/dashboard/top-issues", handle_top_issues) diff --git a/mesher/migrations/20260413000000_add_environment_session_regression.mpl b/mesher/migrations/20260413000000_add_environment_session_regression.mpl new file mode 100644 index 00000000..00c2ca06 --- /dev/null +++ b/mesher/migrations/20260413000000_add_environment_session_regression.mpl @@ -0,0 +1,28 @@ +# Migration: add environment and session_id to events, last_resolved_at to issues. +# environment -- first-class column so operators can filter by deployment target +# without relying on tags JSONB. Partial index covers only rows where the SDK +# sends the field; rows from older SDKs are excluded from the index automatically. +# session_id -- correlates events from the same SDK session for session-context +# queries without decomposing user_context JSONB on every read path. +# last_resolved_at -- records the timestamp of the last manual resolve so +# regression detection (resolved -> unresolved flip) can fire regression alerts. + +pub fn up(pool :: PoolHandle) -> Int ! String do + Pool.execute(pool, "ALTER TABLE events ADD COLUMN IF NOT EXISTS environment TEXT", []) ? + Pool.execute(pool, "ALTER TABLE events ADD COLUMN IF NOT EXISTS session_id TEXT", []) ? + Pool.execute(pool, "ALTER TABLE issues ADD COLUMN IF NOT EXISTS last_resolved_at TIMESTAMPTZ", []) ? + Pool.execute(pool, "CREATE INDEX IF NOT EXISTS idx_events_environment ON events (project_id, environment, received_at DESC) WHERE environment IS NOT NULL", []) ? + Pool.execute(pool, "CREATE INDEX IF NOT EXISTS idx_events_session ON events (session_id, received_at DESC) WHERE session_id IS NOT NULL", []) ? + Pool.execute(pool, "CREATE INDEX IF NOT EXISTS idx_issues_last_resolved ON issues (last_resolved_at) WHERE last_resolved_at IS NOT NULL", []) ? + Ok(0) +end + +pub fn down(pool :: PoolHandle) -> Int ! String do + Pool.execute(pool, "DROP INDEX IF EXISTS idx_issues_last_resolved", []) ? + Pool.execute(pool, "DROP INDEX IF EXISTS idx_events_session", []) ? + Pool.execute(pool, "DROP INDEX IF EXISTS idx_events_environment", []) ? + Pool.execute(pool, "ALTER TABLE issues DROP COLUMN IF EXISTS last_resolved_at", []) ? + Pool.execute(pool, "ALTER TABLE events DROP COLUMN IF EXISTS session_id", []) ? + Pool.execute(pool, "ALTER TABLE events DROP COLUMN IF EXISTS environment", []) ? + Ok(0) +end diff --git a/mesher/storage/queries.mpl b/mesher/storage/queries.mpl index 9afb2370..02d484f8 100644 --- a/mesher/storage/queries.mpl +++ b/mesher/storage/queries.mpl @@ -351,7 +351,10 @@ pub fn resolve_issue(pool :: PoolHandle, issue_id :: String) -> Int ! String do let q = Query.from(Issue.__table__()) |> Query.where_raw("id = ?::uuid", [issue_id]) |> Query.where_raw("status != 'resolved'", []) - Repo.update_where(pool, Issue.__table__(), %{"status" => "resolved"}, q) ? + Repo.update_where_expr(pool, + Issue.__table__(), + %{"status" => Expr.value("resolved"), "last_resolved_at" => Expr.fn_call("now", [])}, + q) ? Ok(1) end @@ -544,7 +547,8 @@ pub fn list_issues_by_status(pool :: PoolHandle, project_id :: String, status :: event_count : parse_event_count(Map.get(row, "event_count")), first_seen : Map.get(row, "first_seen"), last_seen : Map.get(row, "last_seen"), - assigned_to : Map.get(row, "assigned_to") + assigned_to : Map.get(row, "assigned_to"), + last_resolved_at : "" } end)) end @@ -599,12 +603,13 @@ project_id :: String, status :: String, level :: String, assigned_to :: String, +environment :: String, cursor :: String, cursor_id :: String, limit_str :: String) -> List < Map < String, String > > ! String do let lim = String.from(parse_limit(limit_str)) - let sql = "SELECT id::text AS id, title::text AS title, level::text AS level, status::text AS status, event_count::text AS event_count, first_seen::text AS first_seen, last_seen::text AS last_seen, COALESCE(assigned_to::text, '') AS assigned_to FROM issues WHERE project_id = $1::uuid AND ($2 = '' OR status = $2) AND ($3 = '' OR level = $3) AND ($4 = '' OR assigned_to = NULLIF($4, '')::uuid) AND ($5 = '' OR (last_seen, id) < ($5::timestamptz, NULLIF($6, '')::uuid)) ORDER BY last_seen DESC, id DESC LIMIT $7::int" - Repo.query_raw(pool, sql, [project_id, status, level, assigned_to, cursor, cursor_id, lim]) + let sql = "SELECT id::text AS id, title::text AS title, level::text AS level, status::text AS status, event_count::text AS event_count, first_seen::text AS first_seen, last_seen::text AS last_seen, COALESCE(assigned_to::text, '') AS assigned_to FROM issues WHERE project_id = $1::uuid AND ($2 = '' OR status = $2) AND ($3 = '' OR level = $3) AND ($4 = '' OR assigned_to = NULLIF($4, '')::uuid) AND ($5 = '' OR (last_seen, id) < ($5::timestamptz, NULLIF($6, '')::uuid)) AND ($8 = '' OR EXISTS (SELECT 1 FROM events WHERE issue_id = issues.id AND environment = $8 AND received_at > now() - interval '7 days')) ORDER BY last_seen DESC, id DESC LIMIT $7::int" + Repo.query_raw(pool, sql, [project_id, status, level, assigned_to, cursor, cursor_id, lim, environment]) end # SEARCH-02: Full-text search on event messages using inline tsvector. @@ -825,7 +830,9 @@ pub fn get_event_detail(pool :: PoolHandle, event_id :: String) -> List < Map < "tags"), Expr.label(Expr.coalesce([Pg.text(Expr.column("extra")), Expr.value("{}")]), "extra"), Expr.label(Expr.coalesce([Pg.text(Expr.column("user_context")), Expr.value("null")]), "user_context"), Expr.label(Expr.coalesce([Expr.column("sdk_name"), Expr.value("")]), "sdk_name"), Expr.label(Expr.coalesce([Expr.column("sdk_version"), Expr.value("")]), - "sdk_version"), Expr.label(Expr.column("received_at"), "received_at")]) + "sdk_version"), Expr.label(Expr.coalesce([Expr.column("environment"), Expr.value("")]), + "environment"), Expr.label(Expr.coalesce([Expr.column("session_id"), Expr.value("")]), + "session_id"), Expr.label(Expr.column("received_at"), "received_at")]) Repo.all(pool, q) end @@ -1004,6 +1011,44 @@ pub fn check_new_issue(pool :: PoolHandle, issue_id :: String) -> Bool ! String Ok(List.length(rows) > 0) end +# Regression detection: an issue has regressed when it is currently unresolved, +# has a recorded last_resolved_at timestamp, and its last_seen is newer than +# last_resolved_at -- meaning a new event arrived after the last manual resolve. +# Returns true only once per regression window because last_seen advances on each +# new event while last_resolved_at stays fixed until the next resolve action. +# Wired into check_event_alerts in ingestion/routes.mpl to fire "regression" alerts. + +pub fn check_regression(pool :: PoolHandle, issue_id :: String) -> Bool ! String do + let q = Query.from(Issue.__table__()) + |> Query.where_raw("id = ?::uuid AND status = 'unresolved' AND last_resolved_at IS NOT NULL AND last_seen > last_resolved_at", + [issue_id]) + |> Query.select(["id"]) + let rows = Repo.all(pool, q) ? + Ok(List.length(rows) > 0) +end + +# Session-scoped event retrieval: returns events for a given session_id within a project. +# Scoped to the last 24 hours for partition pruning on the range-partitioned events table. +# Returns enough fields for session-context display without the full JSONB payload. + +pub fn get_events_by_session_id(pool :: PoolHandle, +project_id :: String, +session_id :: String, +limit_str :: String) -> List < Map < String, String > > ! String do + let lim = parse_limit(limit_str) + let q = Query.from(Event.__table__()) + |> Query.where_expr(Expr.eq(Expr.column("project_id"), Pg.uuid(Expr.value(project_id)))) + |> Query.where_raw("session_id = ?", [session_id]) + |> Query.where_raw("received_at > now() - interval '24 hours'", []) + |> Query.select_exprs([Expr.label(Pg.text(Expr.column("id")), "id"), Expr.label(Pg.text(Expr.column("issue_id")), + "issue_id"), Expr.label(Expr.column("level"), "level"), Expr.label(Expr.column("message"), + "message"), Expr.label(Expr.coalesce([Expr.column("environment"), Expr.value("")]), + "environment"), Expr.label(Pg.text(Expr.column("received_at")), "received_at")]) + |> Query.order_by(:received_at, :asc) + |> Query.limit(lim) + Repo.all(pool, q) +end + # ALERT-03: Get enabled alert rules for event-based conditions for a project. # Honest raw S03 keep-site: the live alert loop needs stable text rows and a # truthful cooldown gate. Keep the selector explicit and pre-filter it on the diff --git a/mesher/storage/writer.mpl b/mesher/storage/writer.mpl index b2b17985..2ebe019d 100644 --- a/mesher/storage/writer.mpl +++ b/mesher/storage/writer.mpl @@ -31,7 +31,9 @@ json_str :: String) -> String ! String do [event_json, Expr.value("extra")]), Pg.jsonb(Expr.value("{}"))]), "user_context" => Expr.fn_call("jsonb_extract_path", [event_json, Expr.value("user_context")]), "sdk_name" => Expr.fn_call("jsonb_extract_path_text", [event_json, Expr.value("sdk_name")]), "sdk_version" => Expr.fn_call("jsonb_extract_path_text", - [event_json, Expr.value("sdk_version")])}) ? + [event_json, Expr.value("sdk_version")]), "environment" => Expr.fn_call("jsonb_extract_path_text", + [event_json, Expr.value("environment")]), "session_id" => Expr.fn_call("jsonb_extract_path_text", + [event_json, Expr.value("session_id")])}) ? Ok("stored") end diff --git a/mesher/types/event.mpl b/mesher/types/event.mpl index bc318554..99d94c44 100644 --- a/mesher/types/event.mpl +++ b/mesher/types/event.mpl @@ -70,6 +70,8 @@ pub struct Event do user_context :: String sdk_name :: String sdk_version :: String + environment :: String + session_id :: String received_at :: String belongs_to :project, Project belongs_to :issue, Issue @@ -90,4 +92,6 @@ pub struct EventPayload do user_context :: String sdk_name :: Option < String > sdk_version :: Option < String > + environment :: Option < String > + session_id :: Option < String > end deriving(Json) diff --git a/mesher/types/issue.mpl b/mesher/types/issue.mpl index 82105a48..aa6e8350 100644 --- a/mesher/types/issue.mpl +++ b/mesher/types/issue.mpl @@ -4,10 +4,12 @@ pub type IssueStatus do Unresolved - + Resolved - + Archived + + Discarded end deriving(Json) # Database Row struct for issues. Status stored as text in DB, @@ -25,6 +27,7 @@ pub struct Issue do first_seen :: String last_seen :: String assigned_to :: String + last_resolved_at :: String belongs_to :project, Project has_many :events, Event end deriving(Schema, Json, Row)