Skip to content

Commit 2a37cb0

Browse files
committed
Close the remaining gaps in grouping, regression detection, alerts, ownership, environments, and session context on top of the current Mesher backend
1 parent 302fb0f commit 2a37cb0

File tree

9 files changed

+178
-14
lines changed

9 files changed

+178
-14
lines changed

mesher/api/detail.mpl

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ fn detail_row_to_event(row) -> Event do
2929
user_context : Map.get(row, "user_context"),
3030
sdk_name : Map.get(row, "sdk_name"),
3131
sdk_version : Map.get(row, "sdk_version"),
32+
environment : Map.get(row, "environment"),
33+
session_id : Map.get(row, "session_id"),
3234
received_at : Map.get(row, "received_at")
3335
}
3436
end
@@ -53,8 +55,10 @@ fn event_detail_to_json(row) -> String do
5355
let user_context = Map.get(row, "user_context")
5456
let sdk_name = Json.get(normalized, "sdk_name")
5557
let sdk_version = Json.get(normalized, "sdk_version")
58+
let environment = Json.get(normalized, "environment")
59+
let session_id = Json.get(normalized, "session_id")
5660
let received_at = Json.get(normalized, "received_at")
57-
"""{"id":"#{id}","project_id":"#{project_id}","issue_id":"#{issue_id}","level":"#{level}","message":"#{message}","fingerprint":"#{fingerprint}","exception":#{exception},"stacktrace":#{stacktrace},"breadcrumbs":#{breadcrumbs},"tags":#{tags},"extra":#{extra},"user_context":#{user_context},"sdk_name":"#{sdk_name}","sdk_version":"#{sdk_version}","received_at":"#{received_at}"}"""
61+
"""{"id":"#{id}","project_id":"#{project_id}","issue_id":"#{issue_id}","level":"#{level}","message":"#{message}","fingerprint":"#{fingerprint}","exception":#{exception},"stacktrace":#{stacktrace},"breadcrumbs":#{breadcrumbs},"tags":#{tags},"extra":#{extra},"user_context":#{user_context},"sdk_name":"#{sdk_name}","sdk_version":"#{sdk_version}","environment":"#{environment}","session_id":"#{session_id}","received_at":"#{received_at}"}"""
5862
end
5963

6064
# Format a nullable neighbor ID for JSON output.

mesher/api/search.mpl

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
# All handlers follow the PipelineRegistry pattern for pool lookup.
55

66
from Ingestion.Pipeline import PipelineRegistry
7-
from Storage.Queries import list_issues_filtered, search_events_fulltext, filter_events_by_tag, list_events_for_issue
7+
from Storage.Queries import list_issues_filtered, search_events_fulltext, filter_events_by_tag, list_events_for_issue, get_events_by_session_id
88
from Types.Event import Event
99
from Types.Issue import Issue
1010
from Api.Helpers import query_or_default, to_json_array, require_param, get_registry, resolve_project_id
@@ -116,7 +116,8 @@ fn issue_row_to_cursor_issue(row) -> Issue do
116116
event_count : 0,
117117
first_seen : Map.get(row, "first_seen"),
118118
last_seen : Map.get(row, "last_seen"),
119-
assigned_to : Map.get(row, "assigned_to")
119+
assigned_to : Map.get(row, "assigned_to"),
120+
last_resolved_at : ""
120121
}
121122
end
122123

@@ -158,6 +159,8 @@ fn issue_event_row_to_event(row) -> Event do
158159
user_context : "null",
159160
sdk_name : "",
160161
sdk_version : "",
162+
environment : "",
163+
session_id : "",
161164
received_at : Map.get(row, "received_at")
162165
}
163166
end
@@ -225,6 +228,7 @@ pub fn handle_search_issues(request) do
225228
let status = query_or_default(request, "status", "")
226229
let level = query_or_default(request, "level", "")
227230
let assigned_to = query_or_default(request, "assigned_to", "")
231+
let environment = query_or_default(request, "environment", "")
228232
let cursor = query_or_default(request, "cursor", "")
229233
|> decode_query_component()
230234
let cursor_id = query_or_default(request, "cursor_id", "")
@@ -235,6 +239,7 @@ pub fn handle_search_issues(request) do
235239
status,
236240
level,
237241
assigned_to,
242+
environment,
238243
cursor,
239244
cursor_id,
240245
limit_str)
@@ -378,3 +383,44 @@ pub fn handle_list_issue_events(request) do
378383
Err( e) -> HTTP.response(500, json { error : e })
379384
end
380385
end
386+
387+
# Convert a session event row to JSON.
388+
# Includes environment so the client can verify which deployment the event came from.
389+
390+
fn row_to_session_event_json(row) -> String do
391+
let id = Map.get(row, "id")
392+
let issue_id = Map.get(row, "issue_id")
393+
let level = Map.get(row, "level")
394+
let message = Map.get(row, "message")
395+
let environment = Map.get(row, "environment")
396+
let received_at = Map.get(row, "received_at")
397+
"""{"id":"#{id}","issue_id":"#{issue_id}","level":"#{level}","message":"#{message}","environment":"#{environment}","received_at":"#{received_at}"}"""
398+
end
399+
400+
# Helper: serialize session event rows and respond.
401+
402+
fn respond_session_events(rows) do
403+
let body = rows
404+
|> List.map(fn (row) do row_to_session_event_json(row) end)
405+
|> to_json_array()
406+
HTTP.response(200, body)
407+
end
408+
409+
# Handle GET /api/v1/projects/:project_id/sessions/:session_id/events
410+
# Returns events belonging to a specific SDK session, ordered by received_at ASC.
411+
# Scoped to the last 24 hours (partition-pruned on the partitioned events table).
412+
# session_id must match exactly the value the SDK sent in the event payload.
413+
414+
pub fn handle_session_events(request) do
415+
let reg_pid = get_registry()
416+
let pool = PipelineRegistry.get_pool(reg_pid)
417+
let raw_id = require_param(request, "project_id")
418+
let project_id = resolve_project_id(pool, raw_id)
419+
let session_id = require_param(request, "session_id")
420+
let limit_str = get_limit(request)
421+
let result = get_events_by_session_id(pool, project_id, session_id, limit_str)
422+
case result do
423+
Ok( rows) -> respond_session_events(rows)
424+
Err( e) -> HTTP.response(500, json { error : e })
425+
end
426+
end

mesher/ingestion/routes.mpl

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ from Storage.Queries import (
1919
delete_issue,
2020
list_issues_by_status,
2121
check_new_issue,
22+
check_regression,
2223
get_event_alert_rules,
2324
fire_alert,
2425
check_sample_rate,
@@ -185,14 +186,36 @@ is_new :: Bool) do
185186
end
186187
end
187188

188-
# Check for new-issue alerts after event processing (ALERT-03).
189+
# Fire regression alerts when a previously resolved issue receives a new event.
190+
# Reuses fire_matching_event_alerts with condition_type "regression" so alert
191+
# rules with that condition type are triggered without a separate alert evaluation path.
192+
193+
fn handle_regression_alert(pool :: PoolHandle,
194+
project_id :: String,
195+
issue_id :: String,
196+
is_regression :: Bool) do
197+
if is_regression do
198+
fire_matching_event_alerts(pool, project_id, "regression", issue_id)
199+
0
200+
else
201+
0
202+
end
203+
end
204+
205+
# Check for new-issue and regression alerts after event processing (ALERT-03).
206+
# new_issue fires on first occurrence; regression fires when a resolved issue regresses.
189207

190208
fn check_event_alerts(pool :: PoolHandle, project_id :: String, issue_id :: String) do
191209
let new_result = check_new_issue(pool, issue_id)
192210
case new_result do
193211
Ok( is_new) -> handle_new_issue_alert(pool, project_id, issue_id, is_new)
194212
Err( _) -> 0
195213
end
214+
let reg_result = check_regression(pool, issue_id)
215+
case reg_result do
216+
Ok( is_regression) -> handle_regression_alert(pool, project_id, issue_id, is_regression)
217+
Err( _) -> 0
218+
end
196219
end
197220

198221
# Helper: broadcast event notification, issue count, check alerts, and return response
@@ -488,12 +511,20 @@ pub fn handle_unresolve_issue(request) do
488511
end
489512
end
490513

514+
# Helper: broadcast assignment and return success response.
515+
# Follows the resolve_success / archive_success pattern: broadcast then respond.
516+
517+
fn assign_success(pool, issue_id :: String) do
518+
broadcast_issue_update(pool, issue_id, "assigned")
519+
HTTP.response(200, json { status : "ok" })
520+
end
521+
491522
# Helper: perform assignment after extracting user_id from parsed JSON rows.
492523

493524
fn assign_with_user_id(pool :: PoolHandle, issue_id :: String, user_id :: String) do
494525
let result = assign_issue(pool, issue_id, user_id)
495526
case result do
496-
Ok( _) -> HTTP.response(200, json { status : "ok" })
527+
Ok( _) -> assign_success(pool, issue_id)
497528
Err( e) -> HTTP.response(500, json { error : e })
498529
end
499530
end

mesher/main.mpl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ from Ingestion.Routes import (
3333
handle_discard_issue,
3434
handle_delete_issue
3535
)
36-
from Api.Search import handle_search_issues, handle_search_events, handle_filter_by_tag, handle_list_issue_events
36+
from Api.Search import handle_search_issues, handle_search_events, handle_filter_by_tag, handle_list_issue_events, handle_session_events
3737
from Api.Dashboard import (
3838
handle_event_volume,
3939
handle_error_breakdown,
@@ -120,6 +120,7 @@ fn start_runtime(http_port :: Int, ws_port :: Int, window_seconds :: Int, max_ev
120120
|> HTTP.on_get("/api/v1/projects/:project_id/events/search", handle_search_events)
121121
|> HTTP.on_get("/api/v1/projects/:project_id/events/tags", handle_filter_by_tag)
122122
|> HTTP.on_get("/api/v1/issues/:issue_id/events", handle_list_issue_events)
123+
|> HTTP.on_get("/api/v1/projects/:project_id/sessions/:session_id/events", handle_session_events)
123124
|> HTTP.on_get("/api/v1/projects/:project_id/dashboard/volume", handle_event_volume)
124125
|> HTTP.on_get("/api/v1/projects/:project_id/dashboard/levels", handle_error_breakdown)
125126
|> HTTP.on_get("/api/v1/projects/:project_id/dashboard/top-issues", handle_top_issues)
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Migration: add environment and session_id to events, last_resolved_at to issues.
2+
# environment -- first-class column so operators can filter by deployment target
3+
# without relying on tags JSONB. Partial index covers only rows where the SDK
4+
# sends the field; rows from older SDKs are excluded from the index automatically.
5+
# session_id -- correlates events from the same SDK session for session-context
6+
# queries without decomposing user_context JSONB on every read path.
7+
# last_resolved_at -- records the timestamp of the last manual resolve so
8+
# regression detection (resolved -> unresolved flip) can fire regression alerts.
9+
10+
pub fn up(pool :: PoolHandle) -> Int ! String do
11+
Pool.execute(pool, "ALTER TABLE events ADD COLUMN IF NOT EXISTS environment TEXT", []) ?
12+
Pool.execute(pool, "ALTER TABLE events ADD COLUMN IF NOT EXISTS session_id TEXT", []) ?
13+
Pool.execute(pool, "ALTER TABLE issues ADD COLUMN IF NOT EXISTS last_resolved_at TIMESTAMPTZ", []) ?
14+
Pool.execute(pool, "CREATE INDEX IF NOT EXISTS idx_events_environment ON events (project_id, environment, received_at DESC) WHERE environment IS NOT NULL", []) ?
15+
Pool.execute(pool, "CREATE INDEX IF NOT EXISTS idx_events_session ON events (session_id, received_at DESC) WHERE session_id IS NOT NULL", []) ?
16+
Pool.execute(pool, "CREATE INDEX IF NOT EXISTS idx_issues_last_resolved ON issues (last_resolved_at) WHERE last_resolved_at IS NOT NULL", []) ?
17+
Ok(0)
18+
end
19+
20+
pub fn down(pool :: PoolHandle) -> Int ! String do
21+
Pool.execute(pool, "DROP INDEX IF EXISTS idx_issues_last_resolved", []) ?
22+
Pool.execute(pool, "DROP INDEX IF EXISTS idx_events_session", []) ?
23+
Pool.execute(pool, "DROP INDEX IF EXISTS idx_events_environment", []) ?
24+
Pool.execute(pool, "ALTER TABLE issues DROP COLUMN IF EXISTS last_resolved_at", []) ?
25+
Pool.execute(pool, "ALTER TABLE events DROP COLUMN IF EXISTS session_id", []) ?
26+
Pool.execute(pool, "ALTER TABLE events DROP COLUMN IF EXISTS environment", []) ?
27+
Ok(0)
28+
end

mesher/storage/queries.mpl

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,10 @@ pub fn resolve_issue(pool :: PoolHandle, issue_id :: String) -> Int ! String do
351351
let q = Query.from(Issue.__table__())
352352
|> Query.where_raw("id = ?::uuid", [issue_id])
353353
|> Query.where_raw("status != 'resolved'", [])
354-
Repo.update_where(pool, Issue.__table__(), %{"status" => "resolved"}, q) ?
354+
Repo.update_where_expr(pool,
355+
Issue.__table__(),
356+
%{"status" => Expr.value("resolved"), "last_resolved_at" => Expr.fn_call("now", [])},
357+
q) ?
355358
Ok(1)
356359
end
357360

@@ -544,7 +547,8 @@ pub fn list_issues_by_status(pool :: PoolHandle, project_id :: String, status ::
544547
event_count : parse_event_count(Map.get(row, "event_count")),
545548
first_seen : Map.get(row, "first_seen"),
546549
last_seen : Map.get(row, "last_seen"),
547-
assigned_to : Map.get(row, "assigned_to")
550+
assigned_to : Map.get(row, "assigned_to"),
551+
last_resolved_at : ""
548552
}
549553
end))
550554
end
@@ -599,12 +603,13 @@ project_id :: String,
599603
status :: String,
600604
level :: String,
601605
assigned_to :: String,
606+
environment :: String,
602607
cursor :: String,
603608
cursor_id :: String,
604609
limit_str :: String) -> List < Map < String, String > > ! String do
605610
let lim = String.from(parse_limit(limit_str))
606-
let sql = "SELECT id::text AS id, title::text AS title, level::text AS level, status::text AS status, event_count::text AS event_count, first_seen::text AS first_seen, last_seen::text AS last_seen, COALESCE(assigned_to::text, '') AS assigned_to FROM issues WHERE project_id = $1::uuid AND ($2 = '' OR status = $2) AND ($3 = '' OR level = $3) AND ($4 = '' OR assigned_to = NULLIF($4, '')::uuid) AND ($5 = '' OR (last_seen, id) < ($5::timestamptz, NULLIF($6, '')::uuid)) ORDER BY last_seen DESC, id DESC LIMIT $7::int"
607-
Repo.query_raw(pool, sql, [project_id, status, level, assigned_to, cursor, cursor_id, lim])
611+
let sql = "SELECT id::text AS id, title::text AS title, level::text AS level, status::text AS status, event_count::text AS event_count, first_seen::text AS first_seen, last_seen::text AS last_seen, COALESCE(assigned_to::text, '') AS assigned_to FROM issues WHERE project_id = $1::uuid AND ($2 = '' OR status = $2) AND ($3 = '' OR level = $3) AND ($4 = '' OR assigned_to = NULLIF($4, '')::uuid) AND ($5 = '' OR (last_seen, id) < ($5::timestamptz, NULLIF($6, '')::uuid)) AND ($8 = '' OR EXISTS (SELECT 1 FROM events WHERE issue_id = issues.id AND environment = $8 AND received_at > now() - interval '7 days')) ORDER BY last_seen DESC, id DESC LIMIT $7::int"
612+
Repo.query_raw(pool, sql, [project_id, status, level, assigned_to, cursor, cursor_id, lim, environment])
608613
end
609614

610615
# SEARCH-02: Full-text search on event messages using inline tsvector.
@@ -825,7 +830,9 @@ pub fn get_event_detail(pool :: PoolHandle, event_id :: String) -> List < Map <
825830
"tags"), Expr.label(Expr.coalesce([Pg.text(Expr.column("extra")), Expr.value("{}")]), "extra"), Expr.label(Expr.coalesce([Pg.text(Expr.column("user_context")), Expr.value("null")]),
826831
"user_context"), Expr.label(Expr.coalesce([Expr.column("sdk_name"), Expr.value("")]),
827832
"sdk_name"), Expr.label(Expr.coalesce([Expr.column("sdk_version"), Expr.value("")]),
828-
"sdk_version"), Expr.label(Expr.column("received_at"), "received_at")])
833+
"sdk_version"), Expr.label(Expr.coalesce([Expr.column("environment"), Expr.value("")]),
834+
"environment"), Expr.label(Expr.coalesce([Expr.column("session_id"), Expr.value("")]),
835+
"session_id"), Expr.label(Expr.column("received_at"), "received_at")])
829836
Repo.all(pool, q)
830837
end
831838

@@ -1004,6 +1011,44 @@ pub fn check_new_issue(pool :: PoolHandle, issue_id :: String) -> Bool ! String
10041011
Ok(List.length(rows) > 0)
10051012
end
10061013

1014+
# Regression detection: an issue has regressed when it is currently unresolved,
1015+
# has a recorded last_resolved_at timestamp, and its last_seen is newer than
1016+
# last_resolved_at -- meaning a new event arrived after the last manual resolve.
1017+
# Returns true only once per regression window because last_seen advances on each
1018+
# new event while last_resolved_at stays fixed until the next resolve action.
1019+
# Wired into check_event_alerts in ingestion/routes.mpl to fire "regression" alerts.
1020+
1021+
pub fn check_regression(pool :: PoolHandle, issue_id :: String) -> Bool ! String do
1022+
let q = Query.from(Issue.__table__())
1023+
|> Query.where_raw("id = ?::uuid AND status = 'unresolved' AND last_resolved_at IS NOT NULL AND last_seen > last_resolved_at",
1024+
[issue_id])
1025+
|> Query.select(["id"])
1026+
let rows = Repo.all(pool, q) ?
1027+
Ok(List.length(rows) > 0)
1028+
end
1029+
1030+
# Session-scoped event retrieval: returns events for a given session_id within a project.
1031+
# Scoped to the last 24 hours for partition pruning on the range-partitioned events table.
1032+
# Returns enough fields for session-context display without the full JSONB payload.
1033+
1034+
pub fn get_events_by_session_id(pool :: PoolHandle,
1035+
project_id :: String,
1036+
session_id :: String,
1037+
limit_str :: String) -> List < Map < String, String > > ! String do
1038+
let lim = parse_limit(limit_str)
1039+
let q = Query.from(Event.__table__())
1040+
|> Query.where_expr(Expr.eq(Expr.column("project_id"), Pg.uuid(Expr.value(project_id))))
1041+
|> Query.where_raw("session_id = ?", [session_id])
1042+
|> Query.where_raw("received_at > now() - interval '24 hours'", [])
1043+
|> Query.select_exprs([Expr.label(Pg.text(Expr.column("id")), "id"), Expr.label(Pg.text(Expr.column("issue_id")),
1044+
"issue_id"), Expr.label(Expr.column("level"), "level"), Expr.label(Expr.column("message"),
1045+
"message"), Expr.label(Expr.coalesce([Expr.column("environment"), Expr.value("")]),
1046+
"environment"), Expr.label(Pg.text(Expr.column("received_at")), "received_at")])
1047+
|> Query.order_by(:received_at, :asc)
1048+
|> Query.limit(lim)
1049+
Repo.all(pool, q)
1050+
end
1051+
10071052
# ALERT-03: Get enabled alert rules for event-based conditions for a project.
10081053
# Honest raw S03 keep-site: the live alert loop needs stable text rows and a
10091054
# truthful cooldown gate. Keep the selector explicit and pre-filter it on the

mesher/storage/writer.mpl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ json_str :: String) -> String ! String do
3131
[event_json, Expr.value("extra")]), Pg.jsonb(Expr.value("{}"))]), "user_context" => Expr.fn_call("jsonb_extract_path",
3232
[event_json, Expr.value("user_context")]), "sdk_name" => Expr.fn_call("jsonb_extract_path_text",
3333
[event_json, Expr.value("sdk_name")]), "sdk_version" => Expr.fn_call("jsonb_extract_path_text",
34-
[event_json, Expr.value("sdk_version")])}) ?
34+
[event_json, Expr.value("sdk_version")]), "environment" => Expr.fn_call("jsonb_extract_path_text",
35+
[event_json, Expr.value("environment")]), "session_id" => Expr.fn_call("jsonb_extract_path_text",
36+
[event_json, Expr.value("session_id")])}) ?
3537
Ok("stored")
3638
end
3739

mesher/types/event.mpl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ pub struct Event do
7070
user_context :: String
7171
sdk_name :: String
7272
sdk_version :: String
73+
environment :: String
74+
session_id :: String
7375
received_at :: String
7476
belongs_to :project, Project
7577
belongs_to :issue, Issue
@@ -90,4 +92,6 @@ pub struct EventPayload do
9092
user_context :: String
9193
sdk_name :: Option < String >
9294
sdk_version :: Option < String >
95+
environment :: Option < String >
96+
session_id :: Option < String >
9397
end deriving(Json)

mesher/types/issue.mpl

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44

55
pub type IssueStatus do
66
Unresolved
7-
7+
88
Resolved
9-
9+
1010
Archived
11+
12+
Discarded
1113
end deriving(Json)
1214

1315
# Database Row struct for issues. Status stored as text in DB,
@@ -25,6 +27,7 @@ pub struct Issue do
2527
first_seen :: String
2628
last_seen :: String
2729
assigned_to :: String
30+
last_resolved_at :: String
2831
belongs_to :project, Project
2932
has_many :events, Event
3033
end deriving(Schema, Json, Row)

0 commit comments

Comments
 (0)