Skip to content
Merged

Dev #12

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
df382df
fix: preserve composer attachments and require explicit dialog close
Apr 21, 2026
394b711
Increase file preview limit to 5MB
Apr 21, 2026
670634a
Cache file browser root snapshot
Apr 21, 2026
8db6413
Merge remote-tracking branch 'origin/master' into dev
Apr 21, 2026
3e4d1f5
fix: centralize upload size limit
Apr 21, 2026
487b7df
fix: keep file browser list requests lightweight
Apr 21, 2026
ff33a7e
fix: show tool timestamps without copying them
Apr 21, 2026
142bbc3
fix: show history refresh spinner during http sync
Apr 22, 2026
74c21d6
fix: always backfill timeline history on session open
Apr 22, 2026
fc66daf
fix: throttle timeline backfill until app resume
Apr 22, 2026
7efd2f1
test: cover native resume timeline refresh chain
Apr 22, 2026
1423e7c
test: add e2e gate for active timeline refresh
Apr 22, 2026
c430364
ci: install web deps before e2e wrapper
Apr 22, 2026
ebf5c62
Prefill CC preset defaults
Apr 22, 2026
c0992da
fix: prefer final codex web search query
Apr 22, 2026
dda8d60
Add compatible API preset model discovery
Apr 22, 2026
390db16
fix: make mobile timeline refresh recover missed events
Apr 22, 2026
fca6782
Fix qwen preset e2e mocks
Apr 22, 2026
957aa5f
Stabilize timeline reconnect afterTs test
Apr 22, 2026
ff653c3
feat: add postgres timeline text-tail cache
Apr 22, 2026
f805efc
fix: guard preset updates in session dialogs
Apr 22, 2026
50413e4
Align daemon badge with server heartbeat
Apr 22, 2026
56bfe92
fix: show timeline refresh while bootstrapping history
Apr 22, 2026
34b5fc0
fix: backfill timeline text tail from daemon history
Apr 22, 2026
eebcf8e
fix: paginate text-tail daemon backfill
Apr 22, 2026
87d3c1a
Show timeline history fetch progress
Apr 22, 2026
1d5422e
Guard missing timeline history status in component tests
Apr 22, 2026
a9f1a18
Raise upload size limit to 2GB
Apr 22, 2026
8361a43
Clear stale transport approval banners
Apr 22, 2026
c31040d
fix: auto-restart errored transport sessions
Apr 22, 2026
fb75157
test: cover timeline sqlite projection behavior
Apr 22, 2026
029a291
feat: add local timeline sqlite projection core
Apr 22, 2026
580a19e
fix: retry transient qwen transport errors
Apr 22, 2026
a37ca72
test: cover timeline sqlite projection cutover
Apr 22, 2026
0f0a8fa
fix(web): keep sent transport retries visible
Apr 22, 2026
126d7b5
test: align timeline store async mocks
Apr 22, 2026
898b8e1
Recover streaming timeline state after WS reconnect
Apr 22, 2026
def0219
fix: resolve lint errors, increase Browser rate limit to 240/10s
Apr 22, 2026
4d8892f
revert: restore unused imports/vars that were warning-only fixes
Apr 22, 2026
8322311
Revert "revert: restore unused imports/vars that were warning-only fi…
Apr 22, 2026
1f2e9d9
Reapply "revert: restore unused imports/vars that were warning-only f…
Apr 22, 2026
f3e50a2
Revert "Reapply "revert: restore unused imports/vars that were warnin…
Apr 22, 2026
09f44be
Reapply "revert: restore unused imports/vars that were warning-only f…
Apr 22, 2026
96e84ec
Revert "revert: restore unused imports/vars that were warning-only fi…
Apr 22, 2026
f2f34e1
Revert "fix: resolve lint errors, increase Browser rate limit to 240/…
Apr 22, 2026
f8aab87
fix: lint cleanup on 898b8e17 baseline
Apr 22, 2026
1c178a4
Gate timeline HTTP backfills to active sessions
Apr 22, 2026
a9ecf38
Update live server status from daemon events
Apr 22, 2026
0eaab64
Stabilize hook send content-type test
Apr 22, 2026
35d8748
Wire active session timeline gating
Apr 22, 2026
cc7a54f
Guard sub-session sync without type
Apr 22, 2026
9470a6e
fix(server): narrow bridge db before async sync
Apr 22, 2026
57712f4
Fix new-file patch previews
Apr 22, 2026
628fda9
test: shutdown timeline projection in truncate tests
Apr 22, 2026
10d2d0a
fix: suppress push when mobile websocket is connected
Apr 22, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ jobs:
- name: Prime tmux server (ensures socket dir exists)
run: tmux new-session -d -s init && tmux kill-session -t init
- run: npm ci
- name: Install web deps (active timeline refresh e2e wrapper invokes web vitest)
run: ./scripts/ci-npm-ci.sh web
- name: Run pipe-pane e2e tests
run: npx vitest run test/e2e/pipe-pane-stream.test.ts
- name: Run other e2e tests
Expand Down
11 changes: 11 additions & 0 deletions server/src/db/migrations/043_session_text_tail_cache.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
CREATE TABLE IF NOT EXISTS session_text_tail_cache (
server_id TEXT NOT NULL,
session_name TEXT NOT NULL,
events JSONB NOT NULL DEFAULT '[]'::jsonb,
latest_ts BIGINT,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (server_id, session_name)
);

CREATE INDEX IF NOT EXISTS idx_session_text_tail_cache_updated_at
ON session_text_tail_cache (updated_at DESC);
199 changes: 199 additions & 0 deletions server/src/db/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,143 @@ export interface QuickData {
phrases: string[];
}

export const SESSION_TEXT_TAIL_CACHE_LIMIT = 50;

export interface SessionTextTailCacheItem {
eventId: string;
ts: number;
type: 'user.message' | 'assistant.text';
text: string;
source?: string;
confidence?: string;
}

interface DbSessionTextTailCacheRow {
server_id: string;
session_name: string;
events: SessionTextTailCacheItem[] | string | null;
latest_ts: number | null;
updated_at: Date;
}

interface ClassifiedSessionTextTailEvent {
sessionName: string;
item: SessionTextTailCacheItem;
}

function normalizeSessionTextTailText(text: unknown): string | null {
if (typeof text !== 'string') return null;
const trimmed = text.trim();
return trimmed || null;
}

function isSessionTextTailType(type: unknown): type is SessionTextTailCacheItem['type'] {
return type === 'user.message' || type === 'assistant.text';
}

function parseSessionTextTailCacheEvents(
raw: SessionTextTailCacheItem[] | string | null | undefined,
): SessionTextTailCacheItem[] {
let parsed: unknown = raw;
if (typeof parsed === 'string') {
try {
parsed = JSON.parse(parsed);
} catch {
return [];
}
}
if (!Array.isArray(parsed)) return [];
const items: SessionTextTailCacheItem[] = [];
for (const entry of parsed) {
if (!entry || typeof entry !== 'object') return [];
const row = entry as Record<string, unknown>;
if (
typeof row.eventId !== 'string'
|| typeof row.ts !== 'number'
|| !isSessionTextTailType(row.type)
|| typeof row.text !== 'string'
) {
return [];
}
const text = normalizeSessionTextTailText(row.text);
if (!text) return [];
const item: SessionTextTailCacheItem = {
eventId: row.eventId,
ts: row.ts,
type: row.type,
text,
};
if (typeof row.source === 'string' && row.source.trim()) item.source = row.source.trim();
if (typeof row.confidence === 'string' && row.confidence.trim()) item.confidence = row.confidence.trim();
items.push(item);
}
return items;
}

function mergeSessionTextTailCacheEvents(
existing: SessionTextTailCacheItem[],
incoming: SessionTextTailCacheItem,
): SessionTextTailCacheItem[] {
const deduped = new Map<string, SessionTextTailCacheItem>();
for (const item of existing) deduped.set(item.eventId, item);
deduped.set(incoming.eventId, incoming);
const merged = [...deduped.values()].sort((a, b) => {
if (a.ts !== b.ts) return a.ts - b.ts;
return a.eventId.localeCompare(b.eventId);
});
return merged.length > SESSION_TEXT_TAIL_CACHE_LIMIT
? merged.slice(merged.length - SESSION_TEXT_TAIL_CACHE_LIMIT)
: merged;
}

export function mergeSessionTextTailCacheItems(
existing: SessionTextTailCacheItem[],
incoming: SessionTextTailCacheItem[],
): SessionTextTailCacheItem[] {
const deduped = new Map<string, SessionTextTailCacheItem>();
for (const item of existing) deduped.set(item.eventId, item);
for (const item of incoming) deduped.set(item.eventId, item);
const merged = [...deduped.values()].sort((a, b) => {
if (a.ts !== b.ts) return a.ts - b.ts;
return a.eventId.localeCompare(b.eventId);
});
return merged.length > SESSION_TEXT_TAIL_CACHE_LIMIT
? merged.slice(merged.length - SESSION_TEXT_TAIL_CACHE_LIMIT)
: merged;
}

export function classifySessionTextTailEvent(rawEvent: Record<string, unknown>): ClassifiedSessionTextTailEvent | null {
const sessionName = typeof rawEvent.sessionId === 'string' ? rawEvent.sessionId : null;
const eventId = typeof rawEvent.eventId === 'string' ? rawEvent.eventId : null;
const ts = typeof rawEvent.ts === 'number' ? rawEvent.ts : null;
const type = isSessionTextTailType(rawEvent.type) ? rawEvent.type : null;
const payload = rawEvent.payload && typeof rawEvent.payload === 'object'
? rawEvent.payload as Record<string, unknown>
: null;
if (!sessionName || !eventId || ts === null || !type || !payload) return null;
if (type === 'assistant.text' && payload.streaming === true) return null;
const text = normalizeSessionTextTailText(payload.text);
if (!text) return null;
const item: SessionTextTailCacheItem = { eventId, ts, type, text };
if (typeof rawEvent.source === 'string' && rawEvent.source.trim()) item.source = rawEvent.source.trim();
if (typeof rawEvent.confidence === 'string' && rawEvent.confidence.trim()) item.confidence = rawEvent.confidence.trim();
return { sessionName, item };
}

export function collectSessionTextTailCacheItems(
sessionName: string,
rawEvents: unknown[],
): SessionTextTailCacheItem[] {
const items: SessionTextTailCacheItem[] = [];
for (const raw of rawEvents) {
if (!raw || typeof raw !== 'object') continue;
const classified = classifySessionTextTailEvent(raw as Record<string, unknown>);
if (!classified || classified.sessionName !== sessionName) continue;
items.push(classified.item);
}
return mergeSessionTextTailCacheItems([], items);
}

// ── Users ─────────────────────────────────────────────────────────────────

export async function createUser(db: Database, id: string): Promise<DbUser> {
Expand Down Expand Up @@ -542,6 +679,68 @@ export async function updateSession(
);
}

export async function upsertSessionTextTailCacheEvent(
db: Database,
serverId: string,
rawEvent: Record<string, unknown>,
): Promise<void> {
const classified = classifySessionTextTailEvent(rawEvent);
if (!classified) return;
await db.transaction(async (tx) => {
const row = await tx.queryOne<Pick<DbSessionTextTailCacheRow, 'events'>>(
`SELECT events
FROM session_text_tail_cache
WHERE server_id = $1 AND session_name = $2
FOR UPDATE`,
[serverId, classified.sessionName],
);
const existing = parseSessionTextTailCacheEvents(row?.events ?? null);
const events = mergeSessionTextTailCacheEvents(existing, classified.item);
const latestTs = events.length > 0 ? events[events.length - 1]!.ts : null;
await tx.execute(
`INSERT INTO session_text_tail_cache (server_id, session_name, events, latest_ts, updated_at)
VALUES ($1, $2, $3::jsonb, $4, NOW())
ON CONFLICT (server_id, session_name)
DO UPDATE SET events = EXCLUDED.events, latest_ts = EXCLUDED.latest_ts, updated_at = NOW()`,
[serverId, classified.sessionName, JSON.stringify(events), latestTs],
);
});
}

export async function getSessionTextTailCache(
db: Database,
serverId: string,
sessionName: string,
): Promise<SessionTextTailCacheItem[]> {
const row = await db.queryOne<Pick<DbSessionTextTailCacheRow, 'events'>>(
`SELECT events
FROM session_text_tail_cache
WHERE server_id = $1 AND session_name = $2`,
[serverId, sessionName],
);
const events = parseSessionTextTailCacheEvents(row?.events ?? null);
return events.length > SESSION_TEXT_TAIL_CACHE_LIMIT
? events.slice(events.length - SESSION_TEXT_TAIL_CACHE_LIMIT)
: events;
}

export async function replaceSessionTextTailCache(
db: Database,
serverId: string,
sessionName: string,
events: SessionTextTailCacheItem[],
): Promise<void> {
const bounded = mergeSessionTextTailCacheItems([], events);
const latestTs = bounded.length > 0 ? bounded[bounded.length - 1]!.ts : null;
await db.execute(
`INSERT INTO session_text_tail_cache (server_id, session_name, events, latest_ts, updated_at)
VALUES ($1, $2, $3::jsonb, $4, NOW())
ON CONFLICT (server_id, session_name)
DO UPDATE SET events = EXCLUDED.events, latest_ts = EXCLUDED.latest_ts, updated_at = NOW()`,
[serverId, sessionName, JSON.stringify(bounded), latestTs],
);
}

// ── Quick data ────────────────────────────────────────────────────────────

const EMPTY_QUICK_DATA: QuickData = { history: [], sessionHistory: {}, commands: [], phrases: [] };
Expand Down
106 changes: 105 additions & 1 deletion server/src/routes/watch.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,26 @@
import { Hono } from 'hono';
import type { Env } from '../env.js';
import { getServersByUserId, getDbSessionsByServer, getSubSessionsByServer, getUserPref } from '../db/queries.js';
import {
getServersByUserId,
getDbSessionsByServer,
getSubSessionsByServer,
getUserPref,
getSessionTextTailCache,
collectSessionTextTailCacheItems,
mergeSessionTextTailCacheItems,
replaceSessionTextTailCache,
SESSION_TEXT_TAIL_CACHE_LIMIT,
} from '../db/queries.js';
import { requireAuth, resolveServerRole } from '../security/authorization.js';
import { WsBridge } from '../ws/bridge.js';
import { IMCODES_POD_HEADER } from '../../../shared/http-header-names.js';
import { getPodIdentity } from '../util/pod-identity.js';
import logger from '../util/logger.js';

export const watchRoutes = new Hono<{ Bindings: Env; Variables: { userId: string; role: string } }>();
const TEXT_TAIL_HISTORY_PAGE_LIMIT = 500;
const TEXT_TAIL_HISTORY_MAX_PAGES = 6;
const TEXT_TAIL_HISTORY_TIMEOUT_MS = 1500;

type WatchSessionState = 'working' | 'idle' | 'error' | 'stopped';

Expand Down Expand Up @@ -52,6 +65,61 @@ function titleForSubSession(sub: { id: string; label: string | null; type: strin
return sub.type || sub.id;
}

async function backfillSessionTextTailFromDaemon(
serverId: string,
sessionName: string,
cached: Awaited<ReturnType<typeof getSessionTextTailCache>>,
): Promise<Awaited<ReturnType<typeof getSessionTextTailCache>>> {
let events = cached;
let beforeTs: number | undefined;
const seenPages = new Set<string>();

for (let page = 0; page < TEXT_TAIL_HISTORY_MAX_PAGES; page++) {
if (events.length >= SESSION_TEXT_TAIL_CACHE_LIMIT) break;

const response = await WsBridge.get(serverId).requestTimelineHistory({
sessionName,
limit: TEXT_TAIL_HISTORY_PAGE_LIMIT,
timeoutMs: TEXT_TAIL_HISTORY_TIMEOUT_MS,
...(beforeTs !== undefined ? { beforeTs } : {}),
});
const rawEvents = Array.isArray(response.events)
? response.events.filter((event): event is Record<string, unknown> => !!event && typeof event === 'object')
: [];
if (rawEvents.length === 0) break;

const fingerprint = JSON.stringify([
rawEvents.length,
rawEvents[0]?.eventId,
rawEvents[0]?.ts,
rawEvents.at(-1)?.eventId,
rawEvents.at(-1)?.ts,
]);
if (seenPages.has(fingerprint)) break;
seenPages.add(fingerprint);

const live = collectSessionTextTailCacheItems(sessionName, rawEvents);
if (live.length > 0) {
events = mergeSessionTextTailCacheItems(events, live);
}

if (rawEvents.length < TEXT_TAIL_HISTORY_PAGE_LIMIT) break;

let oldestTs: number | undefined;
for (const event of rawEvents) {
if (typeof event.ts !== 'number' || !Number.isFinite(event.ts)) continue;
oldestTs = oldestTs === undefined ? event.ts : Math.min(oldestTs, event.ts);
}
if (oldestTs === undefined) break;

// Keep a 1ms overlap on the page boundary so same-ts events are not
// skipped when the next page is requested.
beforeTs = oldestTs + 1;
}

return events;
}

function sanitizeWatchTimelineEvent(raw: unknown): {
eventId: string;
sessionId: string;
Expand Down Expand Up @@ -356,3 +424,39 @@ watchRoutes.get('/server/:id/timeline/history/full', requireAuth(), async (c) =>
return c.json({ error: 'relay_failed' }, 502);
}
});

watchRoutes.get('/server/:id/timeline/text-tail', requireAuth(), async (c) => {
const userId = c.get('userId' as never) as string;
const serverId = c.req.param('id')!;
const role = await resolveServerRole(c.env.DB, serverId, userId);
if (role === 'none') return c.json({ error: 'forbidden' }, 403);

const sessionName = c.req.query('sessionName')?.trim();
if (!sessionName) return c.json({ error: 'session_name_required' }, 400);

try {
const cached = await getSessionTextTailCache(c.env.DB, serverId, sessionName);
let events = cached;
try {
events = await backfillSessionTextTailFromDaemon(serverId, sessionName, cached);
if (JSON.stringify(events) !== JSON.stringify(cached)) {
await replaceSessionTextTailCache(c.env.DB, serverId, sessionName, events);
}
} catch (err) {
logger.info({
serverId,
sessionName,
err: err instanceof Error ? err.message : String(err),
}, 'timeline.text-tail backfill skipped');
}
c.header(IMCODES_POD_HEADER, getPodIdentity());
return c.json({ sessionName, events });
} catch (err) {
logger.warn({
serverId,
sessionName,
err: err instanceof Error ? err.message : String(err),
}, 'timeline.text-tail failed');
return c.json({ error: 'cache_read_failed' }, 500);
}
});
Loading
Loading