From 22638a49431696221bc9bda9fa219ac1b0664f2d Mon Sep 17 00:00:00 2001 From: Ambient Code Bot Date: Tue, 28 Apr 2026 19:50:51 +0000 Subject: [PATCH 1/4] spec(api-server): add SessionEvent Kind for durable AG-UI event store Introduces SessionEvent as the canonical append-only record of all AG-UI protocol events in a session, replacing agui-events.jsonl and the overloaded non-user rows that grpc_push_middleware wrote to session_messages. Key design decisions: - SessionMessage narrowed to runner inbox (user messages only); matches the existing WatchSessionMessages watcher which discards non-user events - SessionEvent stores raw AG-UI JSON verbatim; SSE replay pipes payload directly without transformation (fixes the legacy wrapper format bug) - run_id and thread_id are first-class columns enabling per-run compaction and targeted queries - Compaction: TEXT_MESSAGE_CONTENT rows marked superseded=true after RUN_FINISHED; MESSAGES_SNAPSHOT appended as canonical transcript - /agui-events endpoint with since= (reconnect) and run_id= filters - grpc_push_middleware must migrate to PushSessionEvent; GRPCMessageWriter is retired Co-Authored-By: Claude Sonnet 4.6 --- docs/internal/design/ambient-model.spec.md | 112 +++++++++++++++++---- 1 file changed, 90 insertions(+), 22 deletions(-) diff --git a/docs/internal/design/ambient-model.spec.md b/docs/internal/design/ambient-model.spec.md index f1054ea68..b7d220445 100755 --- a/docs/internal/design/ambient-model.spec.md +++ b/docs/internal/design/ambient-model.spec.md @@ -2,7 +2,7 @@ **Date:** 2026-03-20 **Status:** Proposed — Pending Consensus -**Last Updated:** 2026-04-28 — added `ScheduledSession` Kind; added session operational sub-resources (workspace, files, git, repos, tasks, runner protocol); added generic proxy surface for backend passthrough; updated coverage matrix: all ScheduledSession commands implemented; session sub-resources (workspace/files/git/repos/operational/runner protocol) implemented in API server; generic proxy plugin implemented +**Last Updated:** 2026-04-28 — added `ScheduledSession` Kind; added session operational sub-resources (workspace, files, git, repos, tasks, runner protocol); added generic proxy surface for backend passthrough; updated coverage matrix: all ScheduledSession commands implemented; session sub-resources (workspace/files/git/repos/operational/runner protocol) implemented in API server; generic proxy plugin implemented; added `SessionEvent` Kind (durable AG-UI event store replacing `agui-events.jsonl`); clarified `SessionMessage` as runner inbox only; added `/agui-events` SSE endpoint with compaction and `since`/`run_id` filters **Guide:** `ambient-model.guide.md` — implementation waves, gap table, build commands, run log **Design:** `credentials-session.md` — full Credential Kind design spec and rationale @@ -133,14 +133,28 @@ erDiagram time deleted_at } - %% ── SessionMessage (AG-UI event stream — real LLM turns) ───────────────── + %% ── SessionMessage (runner inbox — user message delivery queue) ────────── SessionMessage { string ID PK string session_id FK int seq "monotonic within session" - string event_type "user | assistant | tool_use | tool_result | system | error" - string payload "message body or JSON-encoded event" + string event_type "user (only; runner ignores all other event_types)" + string payload "message body" + time created_at + } + + %% ── SessionEvent (durable AG-UI event store — replaces agui-events.jsonl) ─ + + SessionEvent { + string ID PK "KSUID" + string session_id FK + string run_id "nullable — groups events within a runner invocation" + string thread_id "nullable — ties to runner in-memory stream" + bigint seq "BIGSERIAL monotonic within session" + string event_type "AG-UI protocol type: RUN_STARTED | TEXT_MESSAGE_START | TEXT_MESSAGE_CONTENT | TEXT_MESSAGE_END | TOOL_CALL_START | TOOL_CALL_ARGS | TOOL_CALL_END | TOOL_CALL_RESULT | MESSAGES_SNAPSHOT | STATE_SNAPSHOT | REASONING_START | REASONING_CONTENT | REASONING_END | RUN_FINISHED | RUN_ERROR | META | CUSTOM | RAW" + text payload "raw JSON-serialized AG-UI event — piped verbatim to SSE consumers" + bool superseded "true = streaming delta replaced by compaction snapshot; excluded from replay" time created_at } @@ -224,7 +238,8 @@ erDiagram Inbox }o--o| Agent : "sent_from" - Session ||--o{ SessionMessage : "streams" + Session ||--o{ SessionMessage : "inbox" + Session ||--o{ SessionEvent : "records" Role ||--o{ RoleBinding : "granted_by" ``` @@ -286,22 +301,62 @@ All four are assembled into the start context in that order. Pokes roll downhill --- -## SessionMessage — AG-UI Event Stream +## SessionMessage — Runner Inbox (User Message Delivery Queue) -SessionMessages are the real LLM conversation. They are appended by the runner via gRPC `PushSessionMessage` and streamed to clients via SSE. +SessionMessages are the delivery queue for user-to-runner messages. They carry only `event_type="user"`. The runner's `WatchSessionMessages` gRPC stream watches this table and filters to `event_type="user"` — any other event_type is discarded by the watcher. -`seq` is monotonically increasing within a session. `event_type` follows the AG-UI protocol: `user`, `assistant`, `tool_use`, `tool_result`, `system`, `error`. +`seq` is monotonically increasing within a session. SessionMessages are append-only and never deleted. -SessionMessages are never deleted or edited. They are the canonical record of what happened in a session. +**SessionMessages do not store assistant outputs, tool calls, or any other AG-UI events.** Those are stored in `SessionEvent`. The overloading of `session_messages` as both a delivery queue and an event log is superseded by this separation — `grpc_push_middleware` and `GRPCMessageWriter` must write to `SessionEvent`, not `SessionMessage`. -### Two Event Streams +### Event Streams | Endpoint | Source | Persistence | Purpose | |---|---|---|---| -| `GET /sessions/{id}/messages` | API server gRPC fan-out | Persisted in DB (replay from `seq=0`) | Durable stream; supports replay and history | -| `GET /sessions/{id}/events` | Runner pod SSE (`GET /events/{thread_id}`) | Ephemeral; runner-local in-memory queue | Live AG-UI turn events during an active run | +| `POST /sessions/{id}/messages` | Human or system | DB append | Deliver a user message to the runner inbox | +| `GET /sessions/{id}/messages` | API server DB | Persisted | List user messages sent to this session | +| `GET /sessions/{id}/agui-events` | API server DB + runner | Persisted (see SessionEvent) | Durable AG-UI event replay + live stream | +| `GET /sessions/{id}/events` | Runner pod SSE | Ephemeral; runner-local | Live AG-UI turn events during an active run | + +The runner's `/events/{thread_id}` endpoint registers an asyncio queue into `bridge._active_streams[thread_id]` and streams every AG-UI event as SSE until `RUN_FINISHED` / `RUN_ERROR` or client disconnect. The API server's `/sessions/{id}/events` proxies this from the runner pod for the active session, routing via pod IP or session service. Keepalive pings fire every 30s to hold the connection open. The `/sessions/{id}/agui-events` endpoint provides the same stream with durable replay — see `SessionEvent` below. + +--- + +## SessionEvent — Durable AG-UI Event Store + +SessionEvents are the canonical append-only record of every AG-UI protocol event in a session. They replace `agui-events.jsonl` (the legacy backend PVC file) and the overloaded non-user event rows that `grpc_push_middleware` previously wrote to `session_messages`. + +Every AG-UI event emitted by the runner is persisted here: `RUN_STARTED`, `TEXT_MESSAGE_START`, `TEXT_MESSAGE_CONTENT`, `TEXT_MESSAGE_END`, `TOOL_CALL_START`, `TOOL_CALL_ARGS`, `TOOL_CALL_END`, `TOOL_CALL_RESULT`, `MESSAGES_SNAPSHOT`, `STATE_SNAPSHOT`, `REASONING_START`, `REASONING_CONTENT`, `REASONING_END`, `RUN_FINISHED`, `RUN_ERROR`, `CUSTOM`, `RAW`, `META`. + +`run_id` and `thread_id` are first-class columns — they group events within a runner invocation and tie back to the runner's in-memory stream. The `payload` column stores the raw AG-UI event JSON verbatim; the SSE replay endpoint pipes it directly as `data: {payload}\n\n` without transformation, producing a standards-compliant AG-UI SSE stream. + +### Compaction + +After `RUN_FINISHED` or `RUN_ERROR`, streaming deltas are superseded: + +1. All `TEXT_MESSAGE_CONTENT` rows for that `run_id` are marked `superseded=true`. +2. A new `MESSAGES_SNAPSHOT` row is appended with `superseded=false` — the canonical assembled transcript for the run. + +SSE replay excludes `superseded=true` rows. This mirrors the `compactFinishedRun()` behavior from the legacy backend (`agui_store.go`) but operates within the database rather than on the filesystem, eliminating the atomic-rename requirement and making history queries cheap. + +### Write Paths + +| Source | Mechanism | Scope | +|---|---|---| +| Backend AG-UI proxy | Writes one row per event as it proxies the runner SSE stream | Operator-based sessions | +| Runner (control-plane sessions) | gRPC `PushSessionEvent` — all events via `grpc_push_middleware` | Control-plane sessions | + +`grpc_push_middleware` must be updated to call `PushSessionEvent` instead of `PushSessionMessage`. The `GRPCMessageWriter` (which wrote the final assistant text to `session_messages`) is retired — the full event stream in `session_events` supersedes it. + +### Read Paths + +| Endpoint | Behavior | +|---|---| +| `GET /sessions/{id}/agui-events` | SSE: replay all non-superseded rows from `seq=0`, then stream live events | +| `GET /sessions/{id}/agui-events?since=` | SSE: replay from `seq` (reconnect without full history) | +| `GET /sessions/{id}/agui-events?run_id=` | SSE: all events for a single run (debugging, per-run transcript) | -The runner's `/events/{thread_id}` endpoint registers an asyncio queue into `bridge._active_streams[thread_id]` and streams every AG-UI event as SSE until `RUN_FINISHED` / `RUN_ERROR` or client disconnect. The API server's `/sessions/{id}/events` proxies this from the runner pod for the active session, routing via pod IP or session service. Keepalive pings fire every 30s to hold the connection open. +The `since` parameter enables seamless client reconnect: the client tracks the last `seq` it received, reconnects with `?since=`, and receives only new events — no full replay. --- @@ -374,9 +429,12 @@ The `acpctl` CLI mirrors the API 1-for-1. Every REST operation has a correspondi | `DELETE /sessions/{id}` | `acpctl delete session ` | ✅ implemented | | `GET /sessions/{id}/messages` | `acpctl session messages ` | ✅ implemented | | `POST /sessions/{id}/messages` | `acpctl session send ` | ✅ implemented | -| `POST /sessions/{id}/messages` + `GET /sessions/{id}/events` | `acpctl session send -f` | ✅ implemented | -| `POST /sessions/{id}/messages` + `GET /sessions/{id}/events` | `acpctl session send -f --json` | ✅ implemented | -| `GET /sessions/{id}/events` | `acpctl session events ` | ✅ implemented | +| `POST /sessions/{id}/messages` + `GET /sessions/{id}/agui-events` | `acpctl session send -f` | 🔲 planned (migrate from /events) | +| `POST /sessions/{id}/messages` + `GET /sessions/{id}/agui-events` | `acpctl session send -f --json` | 🔲 planned (migrate from /events) | +| `GET /sessions/{id}/agui-events` | `acpctl session agui-events ` | 🔲 planned | +| `GET /sessions/{id}/agui-events?since=` | `acpctl session agui-events --since ` | 🔲 planned | +| `GET /sessions/{id}/agui-events?run_id=` | `acpctl session agui-events --run-id ` | 🔲 planned | +| `GET /sessions/{id}/events` | `acpctl session events ` | ✅ implemented (ephemeral; no replay) | #### ScheduledSessions (Project-Scoped) @@ -661,9 +719,12 @@ GET /api/ambient/v1/sessions lis GET /api/ambient/v1/sessions/{id} read session DELETE /api/ambient/v1/sessions/{id} cancel or delete session -GET /api/ambient/v1/sessions/{id}/messages list messages (history) -POST /api/ambient/v1/sessions/{id}/messages push a message (human turn) -GET /api/ambient/v1/sessions/{id}/events SSE live event stream from runner pod +GET /api/ambient/v1/sessions/{id}/messages list user messages sent to this session (runner inbox) +POST /api/ambient/v1/sessions/{id}/messages push a user message (enqueues to runner inbox) +GET /api/ambient/v1/sessions/{id}/agui-events SSE durable AG-UI event stream (DB replay + live; excludes superseded rows) +GET /api/ambient/v1/sessions/{id}/agui-events?since= SSE from seq (client reconnect — no full replay) +GET /api/ambient/v1/sessions/{id}/agui-events?run_id= SSE filtered to a single run +GET /api/ambient/v1/sessions/{id}/events SSE live event stream proxied from runner pod (ephemeral — no replay) GET /api/ambient/v1/sessions/{id}/role_bindings RBAC bindings ``` @@ -1090,7 +1151,13 @@ This structure means you can define and compose bespoke agent suites — entire | `current_session_id` denormalized on Agent | Project Home reads Agent + session phase without joining through sessions | | Sessions created only via start | Sessions are run artifacts; direct `POST /sessions` does not exist | | Every layer carries a `prompt` | Project.prompt = workspace context; Agent.prompt = who the agent is; Session.prompt = what this run does; Inbox = prior requests. Pokes roll downhill. | -| `SessionMessage` is append-only | Canonical record of the LLM conversation; never edited or deleted | +| `SessionMessage` is the runner inbox only | Narrowed from "event log" to "user message delivery queue". The `WatchSessionMessages` watcher already discarded non-user events (line 215 of `grpc_transport.py`) — the table now matches that contract. Prevents dual-store confusion. | +| `SessionEvent` is separate from `SessionMessage` | The two stores flow in opposite directions and have different consumers. `session_messages`: user → runner (delivery queue, consumed by gRPC watch). `session_events`: runner → DB (event log, consumed by SSE replay and CLI/SDK history). Merging them required overloading one direction to serve both, which `grpc_push_middleware` did badly. | +| `run_id` and `thread_id` are first-class columns in `SessionEvent` | The legacy `agui-events.jsonl` had no per-run filtering. `run_id` enables per-run compaction (mark streaming deltas `superseded=true`, append `MESSAGES_SNAPSHOT`) and per-run queries (`?run_id=`). `thread_id` ties DB rows back to runner in-memory streams for debugging. | +| Compaction uses `superseded` flag, not row deletion | Deleting compacted rows would require transaction coordination with concurrent readers. Marking `superseded=true` is append-safe and lets queries add `WHERE NOT superseded`. The `MESSAGES_SNAPSHOT` row inserted at compaction is the authoritative assembled transcript for the run. | +| `payload` is raw AG-UI JSON piped verbatim | The legacy backend's `GET /sessions/{id}/messages` SSE wrapped DB records in a custom envelope — wrong format for AG-UI protocol consumers. `session_events.payload` stores the original JSON; SSE emits `data: {payload}\n\n` directly. Zero transformation means any AG-UI client can consume the stream without adapter code. | +| `SessionEvent` replaces `agui-events.jsonl` entirely | The JSONL was durable (PVC-backed) but lost on PVC deletion, required O(file-size) reads for history, and could not serve multiple readers without an in-memory broadcast pipe. A DB table gives durability, indexed queries, multi-reader fan-out via gRPC watch, and compaction without atomic-rename hacks. | +| `SessionMessage` is append-only | Canonical record of user messages sent to the session; never edited or deleted | | `agent:editor` role | Allows prompt updates without full operator access | | `agent:runner` role | Pods get minimum viable credential: read agent definition, push session messages, send inbox | | Union-only permissions | No deny rules — simpler mental model for fleet operators | @@ -1165,8 +1232,9 @@ _Last updated: 2026-04-28. Use this as the authoritative index — click into co |---|---|---|---|---| | **Sessions — CRUD** | ✅ | ✅ `SessionAPI.{Get,List,Create,Update,Delete}` | ✅ `get/create/delete session` | | | **Sessions — start/stop** | ✅ `/start` `/stop` | ✅ `SessionAPI.{Start,Stop}` | ✅ `start`/`stop` commands | | -| **Sessions — messages (list/push/watch)** | ✅ `/messages` | ✅ `PushMessage`, `ListMessages`, `WatchSessionMessages` (gRPC) | ✅ `session messages`, `session send` | gRPC watch via `session_watch.go` | -| **Sessions — live events (SSE proxy)** | ✅ `/events` → runner pod | ✅ `SessionAPI.StreamEvents` → `io.ReadCloser` | ✅ `session events` | Runner must be Running; 502 if unreachable | +| **Sessions — messages (list/push/watch)** | ✅ `/messages` | ✅ `PushMessage`, `ListMessages`, `WatchSessionMessages` (gRPC) | ✅ `session messages`, `session send` | gRPC watch via `session_watch.go`; user messages only after SessionEvent split | +| **Sessions — AG-UI event store** | 🔲 `/agui-events` → `session_events` table | 🔲 `SessionAPI.StreamAGUIEvents` | 🔲 `session agui-events` | New; replaces `agui-events.jsonl`; compaction on RUN_FINISHED; `since`/`run_id` filters | +| **Sessions — live events (SSE proxy)** | ✅ `/events` → runner pod | ✅ `SessionAPI.StreamEvents` → `io.ReadCloser` | ✅ `session events` | Ephemeral; no replay; runner must be Running; 502 if unreachable | | **Sessions — labels/annotations** | ✅ PATCH accepts `labels`/`annotations` | ✅ fields on `Session` type; `SessionAPI.Update(patch map[string]any)` | ⚠️ no dedicated subcommand; use `acpctl get session -o json` + manual PATCH | | | **Sessions — workspace files** | ✅ sessions plugin; stubs empty list when no runner; 503 per-file-op | 🔲 | 🔲 `session workspace list/get/put/delete` | Requires running session for file ops | | **Sessions — pre-upload files** | ✅ sessions plugin; stubs empty list when no runner; 503 per-file-op | 🔲 | 🔲 `session files list/upload/delete` | S3-staged; available before session starts | From 7a1d37a9e6b916d2679266bd2290681c494c54e9 Mon Sep 17 00:00:00 2001 From: jsell-rh Date: Tue, 28 Apr 2026 20:23:05 +0000 Subject: [PATCH 2/4] spec(api-server): address peer review findings on SessionEvent spec MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix all 8 blockers and 7 majors identified by the 10-agent review: B1/B7: Add PushSessionEvent proto contract with run_id + thread_id fields B2: Specify session_events DB migration requirement (table + indexes) B3: Document AGUIEvents handler migration from ephemeral proxy to DB-backed B4: Document DeriveAgentStatus() migration path from JSONL to session_events B5: Fix event type enum — remove nonexistent TOOL_CALL_RESULT, add STEP_*, STATE_DELTA, ACTIVITY_SNAPSHOT/DELTA, REASONING_MESSAGE_* (5 new types) B6: Expand compaction scope — supersede all streaming deltas (TEXT_MESSAGE_CONTENT, TOOL_CALL_ARGS, REASONING_MESSAGE_CONTENT, STATE_DELTA, ACTIVITY_DELTA), not just TEXT_MESSAGE_CONTENT as previously stated B8: Correct seq to global BIGSERIAL (not per-session unique); add design decision M1: Add required indexes: (session_id, seq), (session_id, run_id), replay partial M2: Define live-tail mechanism (subscribe-before-replay fan-out pattern) M3: Add retention note and ?limit parameter M4: Document write throughput guidance (individual inserts ok; batching future) M5: Add RBAC requirements for GET /agui-events and PushSessionEvent gRPC M6: Document StreamTextMessages() migration from session_messages to session_events T1: Fix Overview "Message" bullet to reference SessionEvent T2: Fix String Tree "SessionMessage is one turn" — add SessionEvent line T3: Fix Inbox comparison table "Created by" to user-only Also: document payload encoding bug (model_dump needs by_alias=True), GRPCMessageWriter retirement, and session_messages narrowing cleanup path. Co-Authored-By: Claude Sonnet 4.6 --- docs/internal/design/ambient-model.spec.md | 113 ++++++++++++++++++--- 1 file changed, 99 insertions(+), 14 deletions(-) diff --git a/docs/internal/design/ambient-model.spec.md b/docs/internal/design/ambient-model.spec.md index b7d220445..26f47010e 100755 --- a/docs/internal/design/ambient-model.spec.md +++ b/docs/internal/design/ambient-model.spec.md @@ -15,7 +15,7 @@ The Ambient API server provides a coordination layer for orchestrating fleets of - **Project** — a workspace. Groups agents and provides shared context (`prompt`) injected into every agent start. - **Agent** — a project-scoped, mutable definition. Agents belong to exactly one Project. `prompt` defines who the agent is and is directly editable (subject to RBAC). - **Session** — an ephemeral Kubernetes execution run, created exclusively via agent start. Only one active Session per Agent at a time. -- **Message** — a single AG-UI event in the LLM conversation. Append-only; the canonical record of what happened in a session. +- **SessionEvent** — a single AG-UI protocol event in the LLM conversation. Append-only; the canonical record of what happened in a session. Stored in the `session_events` table; replaces `agui-events.jsonl`. - **Inbox** — a persistent message queue on an Agent. Messages survive across sessions and are drained into the start context at the next run. - **Credential** — a project-scoped secret. Stores a Personal Access Token or equivalent for an external provider (GitHub, GitLab, Jira, Google). Consumed by runners at session start. All agents in the project share the project's credentials automatically. - **RoleBinding** — binds a Resource to a Role at a given scope (`global`, `project`, `agent`, `session`). Ownership and access for all Kinds is expressed through RoleBindings. @@ -151,8 +151,8 @@ erDiagram string session_id FK string run_id "nullable — groups events within a runner invocation" string thread_id "nullable — ties to runner in-memory stream" - bigint seq "BIGSERIAL monotonic within session" - string event_type "AG-UI protocol type: RUN_STARTED | TEXT_MESSAGE_START | TEXT_MESSAGE_CONTENT | TEXT_MESSAGE_END | TOOL_CALL_START | TOOL_CALL_ARGS | TOOL_CALL_END | TOOL_CALL_RESULT | MESSAGES_SNAPSHOT | STATE_SNAPSHOT | REASONING_START | REASONING_CONTENT | REASONING_END | RUN_FINISHED | RUN_ERROR | META | CUSTOM | RAW" + bigint seq "BIGSERIAL global — monotonically increasing; not per-session unique; (session_id, seq) index enables ordered per-session replay" + string event_type "AG-UI type: RUN_STARTED | RUN_FINISHED | RUN_ERROR | STEP_STARTED | STEP_FINISHED | TEXT_MESSAGE_START | TEXT_MESSAGE_CONTENT | TEXT_MESSAGE_END | TOOL_CALL_START | TOOL_CALL_ARGS | TOOL_CALL_END | REASONING_START | REASONING_END | REASONING_MESSAGE_START | REASONING_MESSAGE_CONTENT | REASONING_MESSAGE_END | MESSAGES_SNAPSHOT | STATE_SNAPSHOT | STATE_DELTA | ACTIVITY_SNAPSHOT | ACTIVITY_DELTA | META | CUSTOM | RAW" text payload "raw JSON-serialized AG-UI event — piped verbatim to SSE consumers" bool superseded "true = streaming delta replaced by compaction snapshot; excluded from replay" time created_at @@ -276,9 +276,9 @@ Inbox messages are addressed to an Agent (`agent_id`). They are distinct from Se | | Inbox | SessionMessage | |--|-------|----------------| | Scope | Agent (persists across sessions) | Session (ephemeral) | -| Created by | Human or another Agent | LLM turn / runner gRPC push | -| Drained | At session start | Never — append-only stream | -| Purpose | Queued intent waiting for next run | Real LLM event stream | +| Created by | Human or another Agent | Human user (API or CLI) | +| Drained | At session start | Never — append-only queue | +| Purpose | Queued intent waiting for next run | User message delivery to runner | At session start, all unread Inbox messages are drained: marked `read=true` and injected as context into the Session prompt before the first SessionMessage turn. @@ -326,18 +326,41 @@ The runner's `/events/{thread_id}` endpoint registers an asyncio queue into `bri SessionEvents are the canonical append-only record of every AG-UI protocol event in a session. They replace `agui-events.jsonl` (the legacy backend PVC file) and the overloaded non-user event rows that `grpc_push_middleware` previously wrote to `session_messages`. -Every AG-UI event emitted by the runner is persisted here: `RUN_STARTED`, `TEXT_MESSAGE_START`, `TEXT_MESSAGE_CONTENT`, `TEXT_MESSAGE_END`, `TOOL_CALL_START`, `TOOL_CALL_ARGS`, `TOOL_CALL_END`, `TOOL_CALL_RESULT`, `MESSAGES_SNAPSHOT`, `STATE_SNAPSHOT`, `REASONING_START`, `REASONING_CONTENT`, `REASONING_END`, `RUN_FINISHED`, `RUN_ERROR`, `CUSTOM`, `RAW`, `META`. +Every AG-UI event emitted by the runner is persisted here. Complete event type set (authoritative source: `components/backend/types/agui.go`): -`run_id` and `thread_id` are first-class columns — they group events within a runner invocation and tie back to the runner's in-memory stream. The `payload` column stores the raw AG-UI event JSON verbatim; the SSE replay endpoint pipes it directly as `data: {payload}\n\n` without transformation, producing a standards-compliant AG-UI SSE stream. +| Category | Event Types | +|---|---| +| Lifecycle | `RUN_STARTED`, `RUN_FINISHED`, `RUN_ERROR` | +| Steps | `STEP_STARTED`, `STEP_FINISHED` | +| Text (streaming) | `TEXT_MESSAGE_START`, `TEXT_MESSAGE_CONTENT`, `TEXT_MESSAGE_END` | +| Tool calls (streaming) | `TOOL_CALL_START`, `TOOL_CALL_ARGS`, `TOOL_CALL_END` | +| Reasoning (streaming) | `REASONING_START`, `REASONING_END`, `REASONING_MESSAGE_START`, `REASONING_MESSAGE_CONTENT`, `REASONING_MESSAGE_END` | +| Snapshots | `MESSAGES_SNAPSHOT`, `STATE_SNAPSHOT`, `ACTIVITY_SNAPSHOT` | +| Deltas (streaming) | `STATE_DELTA`, `ACTIVITY_DELTA` | +| Platform | `META`, `CUSTOM`, `RAW` | + +Note: `TOOL_CALL_RESULT` does **not** exist in the AG-UI protocol — tool results are carried in `TOOL_CALL_END.result`. Do not store or emit this type. + +`run_id` and `thread_id` are first-class columns — they group events within a runner invocation and tie back to the runner's in-memory stream. `seq` is a global BIGSERIAL (not per-session unique) — the `(session_id, seq)` index enables ordered per-session replay without a per-session sequence generator. The `payload` column stores the raw AG-UI event JSON verbatim; the SSE replay endpoint pipes it directly as `data: {payload}\n\n` without transformation, producing a standards-compliant AG-UI SSE stream. ### Compaction -After `RUN_FINISHED` or `RUN_ERROR`, streaming deltas are superseded: +After `RUN_FINISHED` or `RUN_ERROR`, streaming delta rows for that `run_id` are marked `superseded=true` and replaced by terminal snapshot rows: + +| Superseded (streaming deltas) | Snapshot that replaces it | +|---|---| +| `TEXT_MESSAGE_CONTENT`, `TOOL_CALL_ARGS`, `REASONING_MESSAGE_CONTENT` | `MESSAGES_SNAPSHOT` | +| `STATE_DELTA` | `STATE_SNAPSHOT` | +| `ACTIVITY_DELTA` | `ACTIVITY_SNAPSHOT` | + +Compaction steps: +1. `UPDATE session_events SET superseded = true WHERE session_id = ? AND run_id = ? AND event_type IN ('TEXT_MESSAGE_CONTENT', 'TOOL_CALL_ARGS', 'REASONING_MESSAGE_CONTENT', 'STATE_DELTA', 'ACTIVITY_DELTA')` +2. `INSERT` a `MESSAGES_SNAPSHOT` row with `superseded = false` — the canonical assembled transcript for the run. +3. If state was tracked, `INSERT` a `STATE_SNAPSHOT` row; if activity was tracked, `INSERT` an `ACTIVITY_SNAPSHOT` row. -1. All `TEXT_MESSAGE_CONTENT` rows for that `run_id` are marked `superseded=true`. -2. A new `MESSAGES_SNAPSHOT` row is appended with `superseded=false` — the canonical assembled transcript for the run. +SSE replay excludes `superseded=true` rows (`WHERE NOT superseded`). Non-delta events (`STEP_STARTED`, `STEP_FINISHED`, `TOOL_CALL_START`, `TOOL_CALL_END`, lifecycle events) are never marked superseded — they are always included in replay. -SSE replay excludes `superseded=true` rows. This mirrors the `compactFinishedRun()` behavior from the legacy backend (`agui_store.go`) but operates within the database rather than on the filesystem, eliminating the atomic-rename requirement and making history queries cheap. +This mirrors the `compactFinishedRun()` behavior from the legacy backend (`agui_store.go`) but operates within the database rather than on the filesystem, eliminating the atomic-rename requirement and the per-session filesystem mutex. The legacy code preserves `AskUserQuestion` TOOL_CALL_START events specifically for `DeriveAgentStatus()` — the new compaction never touches START events, so this behavior is preserved by default. ### Write Paths @@ -348,6 +371,39 @@ SSE replay excludes `superseded=true` rows. This mirrors the `compactFinishedRun `grpc_push_middleware` must be updated to call `PushSessionEvent` instead of `PushSessionMessage`. The `GRPCMessageWriter` (which wrote the final assistant text to `session_messages`) is retired — the full event stream in `session_events` supersedes it. +**Proto contract:** The `PushSessionEvent` RPC must be added to `sessions.proto`. The request message must include `run_id` and `thread_id` as required string fields — these are available in the runner at the point `grpc_push_middleware` fires (`grpc_transport.py` lines 281-282) but are absent from the existing `PushSessionMessageRequest`. + +```proto +message PushSessionEventRequest { + string session_id = 1; + string run_id = 2; // required — groups events within a runner invocation + string thread_id = 3; // required — ties to runner in-memory stream + string event_type = 4; + string payload = 5; // raw AG-UI event JSON; must use camelCase (by_alias=True in model_dump) +} +message PushSessionEventResponse {} +rpc PushSessionEvent(PushSessionEventRequest) returns (PushSessionEventResponse); +``` + +**Payload encoding:** `_event_to_payload()` in `grpc_push.py` currently calls `event.model_dump()` without `by_alias=True` — this produces snake_case field names instead of camelCase. Must be fixed to `event.model_dump(by_alias=True)` before `PushSessionEvent` is wired up, otherwise SSE consumers receive malformed AG-UI events. + +**Write throughput:** Individual row inserts are acceptable for initial implementation. At high session volume (>10 concurrent active sessions at 20 events/sec each), batch inserts via a buffered channel in the gRPC handler can be adopted without changing the proto contract. + +### Indexes + +```sql +-- Required: ordered per-session replay (all queries use this) +CREATE INDEX idx_session_events_session_seq ON session_events(session_id, seq); + +-- Required: per-run queries (?run_id= filter and compaction UPDATE) +CREATE INDEX idx_session_events_session_run ON session_events(session_id, run_id); + +-- Optional: replay optimization (partial index skips superseded rows without full scan) +CREATE INDEX idx_session_events_replay ON session_events(session_id, seq) WHERE NOT superseded; +``` + +`seq` is a global BIGSERIAL — no per-session UNIQUE constraint. The `(session_id, seq)` index provides ordered per-session reads. `seq` values are not contiguous within a session (global monotone), but they form a stable cursor for `?since=` reconnect. + ### Read Paths | Endpoint | Behavior | @@ -355,9 +411,36 @@ SSE replay excludes `superseded=true` rows. This mirrors the `compactFinishedRun | `GET /sessions/{id}/agui-events` | SSE: replay all non-superseded rows from `seq=0`, then stream live events | | `GET /sessions/{id}/agui-events?since=` | SSE: replay from `seq` (reconnect without full history) | | `GET /sessions/{id}/agui-events?run_id=` | SSE: all events for a single run (debugging, per-run transcript) | +| `GET /sessions/{id}/agui-events?limit=` | SSE: cap replay to last N non-superseded rows (pagination; default unlimited) | The `since` parameter enables seamless client reconnect: the client tracks the last `seq` it received, reconnects with `?since=`, and receives only new events — no full replay. +**Live-tail mechanism:** After replaying historical rows, the SSE handler subscribes to a per-session in-memory broadcast channel. The `PushSessionEvent` gRPC handler publishes each incoming event to this channel immediately after the DB insert. The SSE handler subscribes _before_ beginning the DB replay to avoid a race condition (event arrives between replay completion and subscription setup). Keepalive pings fire every 30 s to hold the SSE connection open. This is the same fan-out pattern as the legacy backend's `publishLine()` / `broadcast` channel in `agui_proxy.go`. + +**RBAC:** `GET /sessions/{id}/agui-events` requires at minimum `agent:observer` on the parent agent or `project:viewer` on the parent project — same as `GET /sessions/{id}`. `PushSessionEvent` gRPC is restricted to `agent:runner` — service accounts granted by the operator at session start. Human users and CLI callers cannot call `PushSessionEvent` directly. + +**Retention:** No automatic row deletion is defined in this spec. Table growth is bounded by session count × events per session. For high-volume deployments, a background job can delete `superseded=true` rows older than N days — the `superseded` flag enables this without API changes. This is a future operational concern. + +### Migration Paths + +**`DeriveAgentStatus()`** (`components/backend/websocket/agui_store.go`, wired at `main.go:204`): Currently tail-scans `agui-events.jsonl`. Must be ported to query `session_events`: +```sql +SELECT event_type, payload FROM session_events +WHERE session_id = ? AND NOT superseded +ORDER BY seq DESC LIMIT 50 +``` +Apply the same state machine (RUN_STARTED/FINISHED/ERROR → working/idle; TOOL_CALL_START with `AskUserQuestion` tool name → waiting_input). Until the legacy backend is removed, both paths run in parallel — the backend reads JSONL, the API server reads `session_events`. + +**`AGUIEvents` handler** (`handler.go:644-692`): Currently proxies the runner's live SSE stream directly with no DB backing. Must be replaced to: (1) replay non-superseded `session_events` rows ordered by `seq`, (2) subscribe to the in-memory fan-out channel for live events. Add `?since` and `?run_id` query parameter parsing. The existing ephemeral `GET /sessions/{id}/events` endpoint remains unchanged. + +**`StreamTextMessages()`** (`message_handler.go:144`): Currently filters `TEXT_MESSAGE_*` from `session_messages`. After `session_messages` narrowing, this returns empty. Must be updated to query `session_events WHERE event_type IN ('TEXT_MESSAGE_START', 'TEXT_MESSAGE_CONTENT', 'TEXT_MESSAGE_END') AND session_id = ?`. + +**`grpc_push_middleware`** (`ambient_runner/middleware/grpc_push.py`): Replace the `session_messages.push()` call with a new `session_events.push()` gRPC call. Pass `run_id` and `thread_id` from `RunAgentInput` (available at `grpc_transport.py:281-282`). Fix `_event_to_payload()` to use `event.model_dump(by_alias=True)` to produce camelCase JSON. + +**`GRPCMessageWriter`** (`grpc_transport.py`): Retired. The full event stream in `session_events` supersedes the final-text-only write to `session_messages`. Remove the `on_run_finished`/`on_run_error` hooks that write `event_type="assistant"` to `session_messages`. + +**`session_messages` narrowing:** Existing non-user rows in `session_messages` (written by `grpc_push_middleware` and `GRPCMessageWriter` before this split) should be purged or migrated during the cutover migration. A `CHECK (event_type = 'user')` constraint can then be added to enforce the new contract at the schema level. + --- ## ScheduledSession — Recurring Agent Trigger @@ -1065,7 +1148,8 @@ A `Project` is an ID and a `prompt` string — the workspace context. An `Agent` is an ID and a `prompt` string — who the agent is. A `Session` is an ID and a `prompt` string — what this run is focused on. An `InboxMessage` is an ID and a `body` string — a request addressed to an agent. -A `SessionMessage` is an ID and a `payload` string — one turn in the conversation. +A `SessionEvent` is an ID and a `payload` string — one AG-UI event in the conversation stream. +A `SessionMessage` is an ID and a `payload` string — one user message in the runner inbox. Strings can be simple (`"hello world"`) or arbitrarily complex (a bookmarked system prompt, a structured markdown context block, a multi-section briefing). The model does not care. Every node is still just an ID and a string. @@ -1154,7 +1238,8 @@ This structure means you can define and compose bespoke agent suites — entire | `SessionMessage` is the runner inbox only | Narrowed from "event log" to "user message delivery queue". The `WatchSessionMessages` watcher already discarded non-user events (line 215 of `grpc_transport.py`) — the table now matches that contract. Prevents dual-store confusion. | | `SessionEvent` is separate from `SessionMessage` | The two stores flow in opposite directions and have different consumers. `session_messages`: user → runner (delivery queue, consumed by gRPC watch). `session_events`: runner → DB (event log, consumed by SSE replay and CLI/SDK history). Merging them required overloading one direction to serve both, which `grpc_push_middleware` did badly. | | `run_id` and `thread_id` are first-class columns in `SessionEvent` | The legacy `agui-events.jsonl` had no per-run filtering. `run_id` enables per-run compaction (mark streaming deltas `superseded=true`, append `MESSAGES_SNAPSHOT`) and per-run queries (`?run_id=`). `thread_id` ties DB rows back to runner in-memory streams for debugging. | -| Compaction uses `superseded` flag, not row deletion | Deleting compacted rows would require transaction coordination with concurrent readers. Marking `superseded=true` is append-safe and lets queries add `WHERE NOT superseded`. The `MESSAGES_SNAPSHOT` row inserted at compaction is the authoritative assembled transcript for the run. | +| Compaction uses `superseded` flag, not row deletion | Deleting compacted rows would require transaction coordination with concurrent readers. Marking `superseded=true` is append-safe and lets queries add `WHERE NOT superseded`. Superseded types: `TEXT_MESSAGE_CONTENT`, `TOOL_CALL_ARGS`, `REASONING_MESSAGE_CONTENT`, `STATE_DELTA`, `ACTIVITY_DELTA`. Non-delta events (`STEP_*`, `TOOL_CALL_START`, `TOOL_CALL_END`, lifecycle) are never superseded. `MESSAGES_SNAPSHOT` is appended as the canonical transcript; `STATE_SNAPSHOT` and `ACTIVITY_SNAPSHOT` are appended if state/activity were tracked. | +| `session_events.seq` is a global BIGSERIAL, not per-session | Per-session sequences require either a transaction-level sequence generator per session or application-level locking. A global BIGSERIAL with a `(session_id, seq)` index provides ordered per-session reads without any locking overhead. `seq` values are globally unique and monotonically increasing — they are not contiguous within a session, but they form a stable cursor for `?since=` reconnect. | | `payload` is raw AG-UI JSON piped verbatim | The legacy backend's `GET /sessions/{id}/messages` SSE wrapped DB records in a custom envelope — wrong format for AG-UI protocol consumers. `session_events.payload` stores the original JSON; SSE emits `data: {payload}\n\n` directly. Zero transformation means any AG-UI client can consume the stream without adapter code. | | `SessionEvent` replaces `agui-events.jsonl` entirely | The JSONL was durable (PVC-backed) but lost on PVC deletion, required O(file-size) reads for history, and could not serve multiple readers without an in-memory broadcast pipe. A DB table gives durability, indexed queries, multi-reader fan-out via gRPC watch, and compaction without atomic-rename hacks. | | `SessionMessage` is append-only | Canonical record of user messages sent to the session; never edited or deleted | From efb2af15142cf67c2493faf70cb1bb023f84155f Mon Sep 17 00:00:00 2001 From: jsell-rh Date: Tue, 28 Apr 2026 21:36:42 +0000 Subject: [PATCH 3/4] spec(session-events): apply second-round critic fixes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Resolves all design-level inputs and second-round factual issues: - Fix endpoint path: agui-events → agui/events (slash, sub-resource) - Add TOOL_CALL_RESULT to event_type enum; remove incorrect denial - Add THINKING_* deprecation note (match agui.go, not ag_ui library) - Fix compaction SQL: IS NOT DISTINCT FROM for nullable run_id - Fix live-tail: PostgreSQL LISTEN/NOTIFY replaces in-memory channel (cross-replica fan-out); add 4-step subscribe/replay/drain/live procedure - Fix gRPC auth: service-caller boolean (IsServiceCaller), not RBAC role - Fix run_id/thread_id plumbing: extend grpc_push_middleware signature, remove wrong grpc_transport.py:281-282 attribution - PushSessionEventResponse now returns seq for reconnect cursor - Add explicit CREATE TABLE DDL for session_events with all column types, NOT NULL constraints, DEFAULT FALSE on superseded - Add migration registration guidance (function name, ID format, rollback) - StreamTextMessages: implement+register route from session_events; acpctl session messages migrates to this endpoint - DeriveAgentStatus: event-type-filtered query instead of LIMIT 50; document case-insensitive + non-alpha-strip normalization for AskUserQuestion - GRPCMessageWriter: add _synthesize_run_error dependency; atomic deployment requirement; 7-step implementation checklist including buf generate and Go handler authorship Co-Authored-By: Claude Sonnet 4.6 --- docs/internal/design/ambient-model.spec.md | 125 +++++++++++++++------ 1 file changed, 91 insertions(+), 34 deletions(-) diff --git a/docs/internal/design/ambient-model.spec.md b/docs/internal/design/ambient-model.spec.md index 26f47010e..1ce2bffea 100755 --- a/docs/internal/design/ambient-model.spec.md +++ b/docs/internal/design/ambient-model.spec.md @@ -2,7 +2,7 @@ **Date:** 2026-03-20 **Status:** Proposed — Pending Consensus -**Last Updated:** 2026-04-28 — added `ScheduledSession` Kind; added session operational sub-resources (workspace, files, git, repos, tasks, runner protocol); added generic proxy surface for backend passthrough; updated coverage matrix: all ScheduledSession commands implemented; session sub-resources (workspace/files/git/repos/operational/runner protocol) implemented in API server; generic proxy plugin implemented; added `SessionEvent` Kind (durable AG-UI event store replacing `agui-events.jsonl`); clarified `SessionMessage` as runner inbox only; added `/agui-events` SSE endpoint with compaction and `since`/`run_id` filters +**Last Updated:** 2026-04-28 — added `ScheduledSession` Kind; added session operational sub-resources (workspace, files, git, repos, tasks, runner protocol); added generic proxy surface for backend passthrough; updated coverage matrix: all ScheduledSession commands implemented; session sub-resources (workspace/files/git/repos/operational/runner protocol) implemented in API server; generic proxy plugin implemented; added `SessionEvent` Kind (durable AG-UI event store replacing `agui-events.jsonl`); clarified `SessionMessage` as runner inbox only; added `/agui/events` SSE endpoint with compaction and `since`/`run_id` filters **Guide:** `ambient-model.guide.md` — implementation waves, gap table, build commands, run log **Design:** `credentials-session.md` — full Credential Kind design spec and rationale @@ -152,7 +152,7 @@ erDiagram string run_id "nullable — groups events within a runner invocation" string thread_id "nullable — ties to runner in-memory stream" bigint seq "BIGSERIAL global — monotonically increasing; not per-session unique; (session_id, seq) index enables ordered per-session replay" - string event_type "AG-UI type: RUN_STARTED | RUN_FINISHED | RUN_ERROR | STEP_STARTED | STEP_FINISHED | TEXT_MESSAGE_START | TEXT_MESSAGE_CONTENT | TEXT_MESSAGE_END | TOOL_CALL_START | TOOL_CALL_ARGS | TOOL_CALL_END | REASONING_START | REASONING_END | REASONING_MESSAGE_START | REASONING_MESSAGE_CONTENT | REASONING_MESSAGE_END | MESSAGES_SNAPSHOT | STATE_SNAPSHOT | STATE_DELTA | ACTIVITY_SNAPSHOT | ACTIVITY_DELTA | META | CUSTOM | RAW" + string event_type "AG-UI type: RUN_STARTED | RUN_FINISHED | RUN_ERROR | STEP_STARTED | STEP_FINISHED | TEXT_MESSAGE_START | TEXT_MESSAGE_CONTENT | TEXT_MESSAGE_END | TOOL_CALL_START | TOOL_CALL_ARGS | TOOL_CALL_END | TOOL_CALL_RESULT | REASONING_START | REASONING_END | REASONING_MESSAGE_START | REASONING_MESSAGE_CONTENT | REASONING_MESSAGE_END | MESSAGES_SNAPSHOT | STATE_SNAPSHOT | STATE_DELTA | ACTIVITY_SNAPSHOT | ACTIVITY_DELTA | META | CUSTOM | RAW" text payload "raw JSON-serialized AG-UI event — piped verbatim to SSE consumers" bool superseded "true = streaming delta replaced by compaction snapshot; excluded from replay" time created_at @@ -315,10 +315,10 @@ SessionMessages are the delivery queue for user-to-runner messages. They carry o |---|---|---|---| | `POST /sessions/{id}/messages` | Human or system | DB append | Deliver a user message to the runner inbox | | `GET /sessions/{id}/messages` | API server DB | Persisted | List user messages sent to this session | -| `GET /sessions/{id}/agui-events` | API server DB + runner | Persisted (see SessionEvent) | Durable AG-UI event replay + live stream | +| `GET /sessions/{id}/agui/events` | API server DB + runner | Persisted (see SessionEvent) | Durable AG-UI event replay + live stream | | `GET /sessions/{id}/events` | Runner pod SSE | Ephemeral; runner-local | Live AG-UI turn events during an active run | -The runner's `/events/{thread_id}` endpoint registers an asyncio queue into `bridge._active_streams[thread_id]` and streams every AG-UI event as SSE until `RUN_FINISHED` / `RUN_ERROR` or client disconnect. The API server's `/sessions/{id}/events` proxies this from the runner pod for the active session, routing via pod IP or session service. Keepalive pings fire every 30s to hold the connection open. The `/sessions/{id}/agui-events` endpoint provides the same stream with durable replay — see `SessionEvent` below. +The runner's `/events/{thread_id}` endpoint registers an asyncio queue into `bridge._active_streams[thread_id]` and streams every AG-UI event as SSE until `RUN_FINISHED` / `RUN_ERROR` or client disconnect. The API server's `/sessions/{id}/events` proxies this from the runner pod for the active session, routing via pod IP or session service. Keepalive pings fire every 30s to hold the connection open. The `/sessions/{id}/agui/events` endpoint provides the same stream with durable replay — see `SessionEvent` below. --- @@ -333,13 +333,13 @@ Every AG-UI event emitted by the runner is persisted here. Complete event type s | Lifecycle | `RUN_STARTED`, `RUN_FINISHED`, `RUN_ERROR` | | Steps | `STEP_STARTED`, `STEP_FINISHED` | | Text (streaming) | `TEXT_MESSAGE_START`, `TEXT_MESSAGE_CONTENT`, `TEXT_MESSAGE_END` | -| Tool calls (streaming) | `TOOL_CALL_START`, `TOOL_CALL_ARGS`, `TOOL_CALL_END` | +| Tool calls (streaming) | `TOOL_CALL_START`, `TOOL_CALL_ARGS`, `TOOL_CALL_END`, `TOOL_CALL_RESULT` | | Reasoning (streaming) | `REASONING_START`, `REASONING_END`, `REASONING_MESSAGE_START`, `REASONING_MESSAGE_CONTENT`, `REASONING_MESSAGE_END` | | Snapshots | `MESSAGES_SNAPSHOT`, `STATE_SNAPSHOT`, `ACTIVITY_SNAPSHOT` | | Deltas (streaming) | `STATE_DELTA`, `ACTIVITY_DELTA` | | Platform | `META`, `CUSTOM`, `RAW` | -Note: `TOOL_CALL_RESULT` does **not** exist in the AG-UI protocol — tool results are carried in `TOOL_CALL_END.result`. Do not store or emit this type. +This type list matches `components/backend/types/agui.go` (the Go authoritative source). `THINKING_*` event types present in some versions of the `ag_ui` Python library are deprecated and excluded from `session_events`. `TOOL_CALL_RESULT` is accepted — the installed `ag_ui` 0.1.18 defines `ToolCallResultEvent` with `message_id`, `tool_call_id`, and `content` fields. `run_id` and `thread_id` are first-class columns — they group events within a runner invocation and tie back to the runner's in-memory stream. `seq` is a global BIGSERIAL (not per-session unique) — the `(session_id, seq)` index enables ordered per-session replay without a per-session sequence generator. The `payload` column stores the raw AG-UI event JSON verbatim; the SSE replay endpoint pipes it directly as `data: {payload}\n\n` without transformation, producing a standards-compliant AG-UI SSE stream. @@ -354,7 +354,8 @@ After `RUN_FINISHED` or `RUN_ERROR`, streaming delta rows for that `run_id` are | `ACTIVITY_DELTA` | `ACTIVITY_SNAPSHOT` | Compaction steps: -1. `UPDATE session_events SET superseded = true WHERE session_id = ? AND run_id = ? AND event_type IN ('TEXT_MESSAGE_CONTENT', 'TOOL_CALL_ARGS', 'REASONING_MESSAGE_CONTENT', 'STATE_DELTA', 'ACTIVITY_DELTA')` +1. `UPDATE session_events SET superseded = true WHERE session_id = ? AND run_id IS NOT DISTINCT FROM ? AND event_type IN ('TEXT_MESSAGE_CONTENT', 'TOOL_CALL_ARGS', 'REASONING_MESSAGE_CONTENT', 'STATE_DELTA', 'ACTIVITY_DELTA')` + (`IS NOT DISTINCT FROM` handles nullable `run_id` correctly — standard `= ?` evaluates to NULL when `run_id` is NULL and matches nothing.) 2. `INSERT` a `MESSAGES_SNAPSHOT` row with `superseded = false` — the canonical assembled transcript for the run. 3. If state was tracked, `INSERT` a `STATE_SNAPSHOT` row; if activity was tracked, `INSERT` an `ACTIVITY_SNAPSHOT` row. @@ -371,17 +372,19 @@ This mirrors the `compactFinishedRun()` behavior from the legacy backend (`agui_ `grpc_push_middleware` must be updated to call `PushSessionEvent` instead of `PushSessionMessage`. The `GRPCMessageWriter` (which wrote the final assistant text to `session_messages`) is retired — the full event stream in `session_events` supersedes it. -**Proto contract:** The `PushSessionEvent` RPC must be added to `sessions.proto`. The request message must include `run_id` and `thread_id` as required string fields — these are available in the runner at the point `grpc_push_middleware` fires (`grpc_transport.py` lines 281-282) but are absent from the existing `PushSessionMessageRequest`. +**Proto contract:** The `PushSessionEvent` RPC must be added to `sessions.proto`. The request message must include `run_id` and `thread_id` as string fields — passed from `RunAgentInput` at the `run.py` call site (see Migration Paths for signature details). After editing the proto, run `buf generate` from the `proto/` directory to regenerate `sessions.pb.go` and `sessions_grpc.pb.go`. The `PushSessionEvent` method must then be implemented on the `sessionGRPCHandler` struct in `grpc_handler.go`. ```proto message PushSessionEventRequest { string session_id = 1; - string run_id = 2; // required — groups events within a runner invocation - string thread_id = 3; // required — ties to runner in-memory stream + string run_id = 2; // groups events within a runner invocation + string thread_id = 3; // ties to runner in-memory stream string event_type = 4; string payload = 5; // raw AG-UI event JSON; must use camelCase (by_alias=True in model_dump) } -message PushSessionEventResponse {} +message PushSessionEventResponse { + int64 seq = 1; // server-assigned sequence number; enables ?since= reconnect cursor +} rpc PushSessionEvent(PushSessionEventRequest) returns (PushSessionEventResponse); ``` @@ -404,40 +407,94 @@ CREATE INDEX idx_session_events_replay ON session_events(session_id, seq) WHERE `seq` is a global BIGSERIAL — no per-session UNIQUE constraint. The `(session_id, seq)` index provides ordered per-session reads. `seq` values are not contiguous within a session (global monotone), but they form a stable cursor for `?since=` reconnect. +### DDL and Migration + +The full `CREATE TABLE` DDL for `session_events` (mirrors the `session_messages` migration pattern): + +```sql +CREATE TABLE IF NOT EXISTS session_events ( + id VARCHAR(36) PRIMARY KEY, -- KSUID assigned by server + session_id VARCHAR(36) NOT NULL, + run_id TEXT, -- nullable; groups events per runner invocation + thread_id TEXT, -- nullable; ties to runner in-memory stream + seq BIGSERIAL NOT NULL, -- global monotone; not per-session unique + event_type TEXT NOT NULL, + payload TEXT NOT NULL, -- raw camelCase AG-UI event JSON + superseded BOOLEAN NOT NULL DEFAULT FALSE, -- true = streaming delta replaced by snapshot + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +) +``` + +No `updated_at` or `deleted_at` columns — `session_events` is append-only and never updated in place (compaction sets `superseded=true`, it does not update other columns). + +**Migration registration:** Add a `sessionEventsMigration()` function to `plugins/sessions/migration.go` following the existing pattern (with `ID: "YYYYMMDDHHMI"` timestamp and a `Rollback` func that drops indexes then the table). Register it in `plugin.go`'s `init()` via `db.RegisterMigration(sessionEventsMigration())` alongside the existing calls. The rollback must drop indexes before the table: + +```go +Rollback: func(tx *gorm.DB) error { + stmts := []string{ + `DROP INDEX IF EXISTS idx_session_events_replay`, + `DROP INDEX IF EXISTS idx_session_events_session_run`, + `DROP INDEX IF EXISTS idx_session_events_session_seq`, + `DROP TABLE IF EXISTS session_events`, + } + for _, s := range stmts { tx.Exec(s) } + return nil +}, +``` + ### Read Paths | Endpoint | Behavior | |---|---| -| `GET /sessions/{id}/agui-events` | SSE: replay all non-superseded rows from `seq=0`, then stream live events | -| `GET /sessions/{id}/agui-events?since=` | SSE: replay from `seq` (reconnect without full history) | -| `GET /sessions/{id}/agui-events?run_id=` | SSE: all events for a single run (debugging, per-run transcript) | -| `GET /sessions/{id}/agui-events?limit=` | SSE: cap replay to last N non-superseded rows (pagination; default unlimited) | +| `GET /sessions/{id}/agui/events` | SSE: replay all non-superseded rows from `seq=0`, then stream live events | +| `GET /sessions/{id}/agui/events?since=` | SSE: replay from `seq` (reconnect without full history) | +| `GET /sessions/{id}/agui/events?run_id=` | SSE: all events for a single run (debugging, per-run transcript) | +| `GET /sessions/{id}/agui/events?limit=` | SSE: cap replay to last N non-superseded rows (pagination; default unlimited) | The `since` parameter enables seamless client reconnect: the client tracks the last `seq` it received, reconnects with `?since=`, and receives only new events — no full replay. -**Live-tail mechanism:** After replaying historical rows, the SSE handler subscribes to a per-session in-memory broadcast channel. The `PushSessionEvent` gRPC handler publishes each incoming event to this channel immediately after the DB insert. The SSE handler subscribes _before_ beginning the DB replay to avoid a race condition (event arrives between replay completion and subscription setup). Keepalive pings fire every 30 s to hold the SSE connection open. This is the same fan-out pattern as the legacy backend's `publishLine()` / `broadcast` channel in `agui_proxy.go`. +**Live-tail mechanism:** The SSE handler uses PostgreSQL `LISTEN`/`NOTIFY` for cross-replica fan-out: -**RBAC:** `GET /sessions/{id}/agui-events` requires at minimum `agent:observer` on the parent agent or `project:viewer` on the parent project — same as `GET /sessions/{id}`. `PushSessionEvent` gRPC is restricted to `agent:runner` — service accounts granted by the operator at session start. Human users and CLI callers cannot call `PushSessionEvent` directly. +1. **Subscribe**: Issue `LISTEN session_` on a dedicated DB connection before beginning replay. +2. **Replay**: Query and emit all non-superseded rows `WHERE session_id = ? AND seq > ?` ordered by `seq`. Track the highest `seq` emitted. +3. **Drain**: Flush any `NOTIFY` payloads buffered during replay. Discard those with `seq ≤ last_replayed_seq` to prevent duplicates. +4. **Live**: Forward subsequent `NOTIFY` payloads directly to the SSE client as they arrive. + +The `PushSessionEvent` gRPC handler sends `NOTIFY session_ ''` immediately after each `INSERT`. Keepalive pings fire every 30 s to hold the SSE connection open. This fan-out works across API server replicas because `NOTIFY` is broadcast to all `LISTEN`ers on that PostgreSQL channel, regardless of which replica they connect from. + +**RBAC:** `GET /sessions/{id}/agui/events` requires at minimum `agent:observer` on the parent agent or `project:viewer` on the parent project — same as `GET /sessions/{id}`. `PushSessionEvent` gRPC is restricted to service callers — the bearer token interceptor's `middleware.IsServiceCaller(ctx)` check must pass (token matches `expectedToken` or the JWT username matches the configured service account). Human users and CLI callers cannot call `PushSessionEvent` directly. **Retention:** No automatic row deletion is defined in this spec. Table growth is bounded by session count × events per session. For high-volume deployments, a background job can delete `superseded=true` rows older than N days — the `superseded` flag enables this without API changes. This is a future operational concern. ### Migration Paths -**`DeriveAgentStatus()`** (`components/backend/websocket/agui_store.go`, wired at `main.go:204`): Currently tail-scans `agui-events.jsonl`. Must be ported to query `session_events`: +**`DeriveAgentStatus()`** (`components/backend/websocket/agui_store.go`, wired at `main.go:204`): Currently tail-scans `agui-events.jsonl`. Must be ported to query `session_events` with an event-type filter (not a row LIMIT, which is insufficient for high-event sessions): ```sql SELECT event_type, payload FROM session_events -WHERE session_id = ? AND NOT superseded -ORDER BY seq DESC LIMIT 50 +WHERE session_id = ? + AND NOT superseded + AND event_type IN ('RUN_STARTED', 'RUN_FINISHED', 'RUN_ERROR', 'TOOL_CALL_START') +ORDER BY seq DESC +LIMIT 20 ``` -Apply the same state machine (RUN_STARTED/FINISHED/ERROR → working/idle; TOOL_CALL_START with `AskUserQuestion` tool name → waiting_input). Until the legacy backend is removed, both paths run in parallel — the backend reads JSONL, the API server reads `session_events`. +Apply the same state machine: `RUN_STARTED` → `working`; `RUN_FINISHED`/`RUN_ERROR` → `idle`; `TOOL_CALL_START` where the tool name matches `AskUserQuestion` → `waiting_input`. The tool name comparison is **case-insensitive and strips all non-alphabetic characters** before comparing (matching the existing `isAskUserQuestionToolCall()` behavior in `agui_store.go`). Until the legacy backend is removed, both paths run in parallel — the backend reads JSONL, the API server reads `session_events`. **`AGUIEvents` handler** (`handler.go:644-692`): Currently proxies the runner's live SSE stream directly with no DB backing. Must be replaced to: (1) replay non-superseded `session_events` rows ordered by `seq`, (2) subscribe to the in-memory fan-out channel for live events. Add `?since` and `?run_id` query parameter parsing. The existing ephemeral `GET /sessions/{id}/events` endpoint remains unchanged. -**`StreamTextMessages()`** (`message_handler.go:144`): Currently filters `TEXT_MESSAGE_*` from `session_messages`. After `session_messages` narrowing, this returns empty. Must be updated to query `session_events WHERE event_type IN ('TEXT_MESSAGE_START', 'TEXT_MESSAGE_CONTENT', 'TEXT_MESSAGE_END') AND session_id = ?`. +**`StreamTextMessages()`** (`message_handler.go:144`): Currently filters `TEXT_MESSAGE_*` from `session_messages`. Must be updated to query `session_events WHERE event_type IN ('TEXT_MESSAGE_START', 'TEXT_MESSAGE_CONTENT', 'TEXT_MESSAGE_END') AND session_id = ? ORDER BY seq` and registered as a route in `plugin.go`. This endpoint becomes the `acpctl session messages` data source — the CLI reconstructs the human-readable conversation from `TEXT_MESSAGE_CONTENT` deltas in `seq` order. The ephemeral `GET /sessions/{id}/events` runner-proxy endpoint is unchanged and still available for live-only consumption. + +**`grpc_push_middleware`** (`ambient_runner/middleware/grpc_push.py`): Replace the `session_messages.push()` call with a new `session_events.push()` gRPC call. Extend the function signature to accept `run_id` and `thread_id` as optional keyword arguments: `grpc_push_middleware(stream, *, session_id=None, run_id=None, thread_id=None)`. The call site in `run.py` passes them from `RunAgentInput.run_id` and `RunAgentInput.thread_id`. The runner-side gRPC client also requires a new `SessionEventsAPI` class (analogous to `_session_messages_api.py`) and a `session_events` property on `AmbientGRPCClient`. Fix `_event_to_payload()` to use `event.model_dump(by_alias=True)` to produce camelCase JSON (this bug also affects the current `session_messages` write path). -**`grpc_push_middleware`** (`ambient_runner/middleware/grpc_push.py`): Replace the `session_messages.push()` call with a new `session_events.push()` gRPC call. Pass `run_id` and `thread_id` from `RunAgentInput` (available at `grpc_transport.py:281-282`). Fix `_event_to_payload()` to use `event.model_dump(by_alias=True)` to produce camelCase JSON. +**`GRPCMessageWriter`** (`grpc_transport.py`): Retired. The full event stream in `session_events` supersedes the final-text-only write to `session_messages`. The writer's `_write_message()` method (called on `RUN_FINISHED` and `RUN_ERROR`) and the `_synthesize_run_error()` function (which creates `GRPCMessageWriter` objects directly) must both be updated or removed together — retiring `GRPCMessageWriter` without updating `_synthesize_run_error` will cause a runtime error in the error path. `GRPCMessageWriter` retirement must be deployed atomically in the same release as `PushSessionEvent` going live — removing it before `session_events` is available leaves control-plane sessions with no durable write path. -**`GRPCMessageWriter`** (`grpc_transport.py`): Retired. The full event stream in `session_events` supersedes the final-text-only write to `session_messages`. Remove the `on_run_finished`/`on_run_error` hooks that write `event_type="assistant"` to `session_messages`. +**Implementation checklist for `PushSessionEvent`:** +1. Add proto definition to `sessions.proto` +2. Run `buf generate` from `proto/` to regenerate `sessions.pb.go` and `sessions_grpc.pb.go` +3. Implement `PushSessionEvent` method on `sessionGRPCHandler` in `grpc_handler.go` (guarded by `middleware.IsServiceCaller`) +4. Add `sessionEventsMigration()` to `migration.go` and register it in `plugin.go` +5. Write `SessionEventsAPI` class in runner (`_session_events_api.py`) and add `session_events` property to `AmbientGRPCClient` +6. Extend `grpc_push_middleware` signature and update call site in `run.py` +7. Retire `GRPCMessageWriter` and update `_synthesize_run_error` in same PR **`session_messages` narrowing:** Existing non-user rows in `session_messages` (written by `grpc_push_middleware` and `GRPCMessageWriter` before this split) should be purged or migrated during the cutover migration. A `CHECK (event_type = 'user')` constraint can then be added to enforce the new contract at the schema level. @@ -510,13 +567,13 @@ The `acpctl` CLI mirrors the API 1-for-1. Every REST operation has a correspondi | `GET /sessions/{id}` | `acpctl get session ` | ✅ implemented | | `GET /sessions/{id}` | `acpctl describe session ` | ✅ implemented | | `DELETE /sessions/{id}` | `acpctl delete session ` | ✅ implemented | -| `GET /sessions/{id}/messages` | `acpctl session messages ` | ✅ implemented | +| `GET /sessions/{id}/stream-text-messages` | `acpctl session messages ` | 🔲 planned (migrate from session_messages to StreamTextMessages SSE over session_events) | | `POST /sessions/{id}/messages` | `acpctl session send ` | ✅ implemented | -| `POST /sessions/{id}/messages` + `GET /sessions/{id}/agui-events` | `acpctl session send -f` | 🔲 planned (migrate from /events) | -| `POST /sessions/{id}/messages` + `GET /sessions/{id}/agui-events` | `acpctl session send -f --json` | 🔲 planned (migrate from /events) | -| `GET /sessions/{id}/agui-events` | `acpctl session agui-events ` | 🔲 planned | -| `GET /sessions/{id}/agui-events?since=` | `acpctl session agui-events --since ` | 🔲 planned | -| `GET /sessions/{id}/agui-events?run_id=` | `acpctl session agui-events --run-id ` | 🔲 planned | +| `POST /sessions/{id}/messages` + `GET /sessions/{id}/agui/events` | `acpctl session send -f` | 🔲 planned (migrate from /events) | +| `POST /sessions/{id}/messages` + `GET /sessions/{id}/agui/events` | `acpctl session send -f --json` | 🔲 planned (migrate from /events) | +| `GET /sessions/{id}/agui/events` | `acpctl session agui-events ` | 🔲 planned | +| `GET /sessions/{id}/agui/events?since=` | `acpctl session agui-events --since ` | 🔲 planned | +| `GET /sessions/{id}/agui/events?run_id=` | `acpctl session agui-events --run-id ` | 🔲 planned | | `GET /sessions/{id}/events` | `acpctl session events ` | ✅ implemented (ephemeral; no replay) | #### ScheduledSessions (Project-Scoped) @@ -804,9 +861,9 @@ DELETE /api/ambient/v1/sessions/{id} can GET /api/ambient/v1/sessions/{id}/messages list user messages sent to this session (runner inbox) POST /api/ambient/v1/sessions/{id}/messages push a user message (enqueues to runner inbox) -GET /api/ambient/v1/sessions/{id}/agui-events SSE durable AG-UI event stream (DB replay + live; excludes superseded rows) -GET /api/ambient/v1/sessions/{id}/agui-events?since= SSE from seq (client reconnect — no full replay) -GET /api/ambient/v1/sessions/{id}/agui-events?run_id= SSE filtered to a single run +GET /api/ambient/v1/sessions/{id}/agui/events SSE durable AG-UI event stream (DB replay + live; excludes superseded rows) +GET /api/ambient/v1/sessions/{id}/agui/events?since= SSE from seq (client reconnect — no full replay) +GET /api/ambient/v1/sessions/{id}/agui/events?run_id= SSE filtered to a single run GET /api/ambient/v1/sessions/{id}/events SSE live event stream proxied from runner pod (ephemeral — no replay) GET /api/ambient/v1/sessions/{id}/role_bindings RBAC bindings ``` @@ -1318,7 +1375,7 @@ _Last updated: 2026-04-28. Use this as the authoritative index — click into co | **Sessions — CRUD** | ✅ | ✅ `SessionAPI.{Get,List,Create,Update,Delete}` | ✅ `get/create/delete session` | | | **Sessions — start/stop** | ✅ `/start` `/stop` | ✅ `SessionAPI.{Start,Stop}` | ✅ `start`/`stop` commands | | | **Sessions — messages (list/push/watch)** | ✅ `/messages` | ✅ `PushMessage`, `ListMessages`, `WatchSessionMessages` (gRPC) | ✅ `session messages`, `session send` | gRPC watch via `session_watch.go`; user messages only after SessionEvent split | -| **Sessions — AG-UI event store** | 🔲 `/agui-events` → `session_events` table | 🔲 `SessionAPI.StreamAGUIEvents` | 🔲 `session agui-events` | New; replaces `agui-events.jsonl`; compaction on RUN_FINISHED; `since`/`run_id` filters | +| **Sessions — AG-UI event store** | 🔲 `/agui/events` → `session_events` table | 🔲 `SessionAPI.StreamAGUIEvents` | 🔲 `session agui-events` | New; replaces `agui-events.jsonl`; compaction on RUN_FINISHED; `since`/`run_id` filters | | **Sessions — live events (SSE proxy)** | ✅ `/events` → runner pod | ✅ `SessionAPI.StreamEvents` → `io.ReadCloser` | ✅ `session events` | Ephemeral; no replay; runner must be Running; 502 if unreachable | | **Sessions — labels/annotations** | ✅ PATCH accepts `labels`/`annotations` | ✅ fields on `Session` type; `SessionAPI.Update(patch map[string]any)` | ⚠️ no dedicated subcommand; use `acpctl get session -o json` + manual PATCH | | | **Sessions — workspace files** | ✅ sessions plugin; stubs empty list when no runner; 503 per-file-op | 🔲 | 🔲 `session workspace list/get/put/delete` | Requires running session for file ops | From d2453f0a6e91e9e5f0cd167e51a6e3ab8c7d2d09 Mon Sep 17 00:00:00 2001 From: jsell-rh Date: Tue, 28 Apr 2026 21:38:23 +0000 Subject: [PATCH 4/4] =?UTF-8?q?spec(session-events):=20rename=20runner=20i?= =?UTF-8?q?nbox=20=E2=86=92=20delivery=20queue?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit "inbox" is reserved for the Inbox Kind (Agent persistent message queue). Replace all session_messages-related uses of "inbox" with "delivery queue" or "runner delivery queue": - ERD comment: runner inbox → runner delivery queue - ERD relationship label: "inbox" → "messages" - Event streams table description - API reference descriptions for GET/POST /sessions/{id}/messages - String Tree description of SessionMessage - Design Decisions table entry - Section heading: Runner Inbox → Runner Delivery Queue No Inbox Kind references changed. Co-Authored-By: Claude Sonnet 4.6 --- docs/internal/design/ambient-model.spec.md | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/docs/internal/design/ambient-model.spec.md b/docs/internal/design/ambient-model.spec.md index 1ce2bffea..31e88abf6 100755 --- a/docs/internal/design/ambient-model.spec.md +++ b/docs/internal/design/ambient-model.spec.md @@ -133,7 +133,7 @@ erDiagram time deleted_at } - %% ── SessionMessage (runner inbox — user message delivery queue) ────────── + %% ── SessionMessage (runner delivery queue — user message queue) ────────── SessionMessage { string ID PK @@ -238,7 +238,7 @@ erDiagram Inbox }o--o| Agent : "sent_from" - Session ||--o{ SessionMessage : "inbox" + Session ||--o{ SessionMessage : "messages" Session ||--o{ SessionEvent : "records" Role ||--o{ RoleBinding : "granted_by" @@ -301,7 +301,7 @@ All four are assembled into the start context in that order. Pokes roll downhill --- -## SessionMessage — Runner Inbox (User Message Delivery Queue) +## SessionMessage — Runner Delivery Queue (User Messages Only) SessionMessages are the delivery queue for user-to-runner messages. They carry only `event_type="user"`. The runner's `WatchSessionMessages` gRPC stream watches this table and filters to `event_type="user"` — any other event_type is discarded by the watcher. @@ -313,7 +313,7 @@ SessionMessages are the delivery queue for user-to-runner messages. They carry o | Endpoint | Source | Persistence | Purpose | |---|---|---|---| -| `POST /sessions/{id}/messages` | Human or system | DB append | Deliver a user message to the runner inbox | +| `POST /sessions/{id}/messages` | Human or system | DB append | Deliver a user message to the runner | | `GET /sessions/{id}/messages` | API server DB | Persisted | List user messages sent to this session | | `GET /sessions/{id}/agui/events` | API server DB + runner | Persisted (see SessionEvent) | Durable AG-UI event replay + live stream | | `GET /sessions/{id}/events` | Runner pod SSE | Ephemeral; runner-local | Live AG-UI turn events during an active run | @@ -859,8 +859,8 @@ GET /api/ambient/v1/sessions lis GET /api/ambient/v1/sessions/{id} read session DELETE /api/ambient/v1/sessions/{id} cancel or delete session -GET /api/ambient/v1/sessions/{id}/messages list user messages sent to this session (runner inbox) -POST /api/ambient/v1/sessions/{id}/messages push a user message (enqueues to runner inbox) +GET /api/ambient/v1/sessions/{id}/messages list user messages sent to this session +POST /api/ambient/v1/sessions/{id}/messages push a user message (enqueues to runner delivery queue) GET /api/ambient/v1/sessions/{id}/agui/events SSE durable AG-UI event stream (DB replay + live; excludes superseded rows) GET /api/ambient/v1/sessions/{id}/agui/events?since= SSE from seq (client reconnect — no full replay) GET /api/ambient/v1/sessions/{id}/agui/events?run_id= SSE filtered to a single run @@ -1206,7 +1206,7 @@ An `Agent` is an ID and a `prompt` string — who the agent is. A `Session` is an ID and a `prompt` string — what this run is focused on. An `InboxMessage` is an ID and a `body` string — a request addressed to an agent. A `SessionEvent` is an ID and a `payload` string — one AG-UI event in the conversation stream. -A `SessionMessage` is an ID and a `payload` string — one user message in the runner inbox. +A `SessionMessage` is an ID and a `payload` string — one user message in the runner delivery queue. Strings can be simple (`"hello world"`) or arbitrarily complex (a bookmarked system prompt, a structured markdown context block, a multi-section briefing). The model does not care. Every node is still just an ID and a string. @@ -1292,7 +1292,7 @@ This structure means you can define and compose bespoke agent suites — entire | `current_session_id` denormalized on Agent | Project Home reads Agent + session phase without joining through sessions | | Sessions created only via start | Sessions are run artifacts; direct `POST /sessions` does not exist | | Every layer carries a `prompt` | Project.prompt = workspace context; Agent.prompt = who the agent is; Session.prompt = what this run does; Inbox = prior requests. Pokes roll downhill. | -| `SessionMessage` is the runner inbox only | Narrowed from "event log" to "user message delivery queue". The `WatchSessionMessages` watcher already discarded non-user events (line 215 of `grpc_transport.py`) — the table now matches that contract. Prevents dual-store confusion. | +| `SessionMessage` is the runner delivery queue only | Narrowed from "event log" to "user message delivery queue". The `WatchSessionMessages` watcher already discarded non-user events (line 215 of `grpc_transport.py`) — the table now matches that contract. Prevents dual-store confusion. | | `SessionEvent` is separate from `SessionMessage` | The two stores flow in opposite directions and have different consumers. `session_messages`: user → runner (delivery queue, consumed by gRPC watch). `session_events`: runner → DB (event log, consumed by SSE replay and CLI/SDK history). Merging them required overloading one direction to serve both, which `grpc_push_middleware` did badly. | | `run_id` and `thread_id` are first-class columns in `SessionEvent` | The legacy `agui-events.jsonl` had no per-run filtering. `run_id` enables per-run compaction (mark streaming deltas `superseded=true`, append `MESSAGES_SNAPSHOT`) and per-run queries (`?run_id=`). `thread_id` ties DB rows back to runner in-memory streams for debugging. | | Compaction uses `superseded` flag, not row deletion | Deleting compacted rows would require transaction coordination with concurrent readers. Marking `superseded=true` is append-safe and lets queries add `WHERE NOT superseded`. Superseded types: `TEXT_MESSAGE_CONTENT`, `TOOL_CALL_ARGS`, `REASONING_MESSAGE_CONTENT`, `STATE_DELTA`, `ACTIVITY_DELTA`. Non-delta events (`STEP_*`, `TOOL_CALL_START`, `TOOL_CALL_END`, lifecycle) are never superseded. `MESSAGES_SNAPSHOT` is appended as the canonical transcript; `STATE_SNAPSHOT` and `ACTIVITY_SNAPSHOT` are appended if state/activity were tracked. |