From 4d417442f301e99510a4805edbb71ad6a5cad0f2 Mon Sep 17 00:00:00 2001 From: "IM.codes" Date: Thu, 23 Apr 2026 11:01:16 +0800 Subject: [PATCH 1/5] fix: recover failed cursor and copilot subsessions --- src/daemon/command-handler.ts | 49 +++++++++++++++++++ .../command-handler-transport-queue.test.ts | 38 ++++++++++++++ 2 files changed, 87 insertions(+) diff --git a/src/daemon/command-handler.ts b/src/daemon/command-handler.ts index 979c3f9e7..342358d06 100644 --- a/src/daemon/command-handler.ts +++ b/src/daemon/command-handler.ts @@ -1844,6 +1844,23 @@ async function handleSend(cmd: Record, serverLink: ServerLink): const status = isLegacy ? 'accepted_legacy' : 'accepted'; timelineEmitter.emit(sessionName, 'command.ack', { commandId: effectiveId, status }); emitCommandAckReliable(serverLink, { commandId: effectiveId, sessionName, status }); + // Best-effort resume for sessions that failed to launch or whose runtime + // vanished outside the provider reconnect path. The resend queue drains on + // successful relaunch, so the queued user message still delivers. + void runExclusiveSessionRelaunch(sessionName, async () => { + try { + await resumeTransportRuntimeAfterLoss(record); + } catch (err) { + logger.error({ err, sessionName }, 'auto-resume after missing transport runtime failed'); + const resumeErr = err instanceof Error ? err.message : String(err); + timelineEmitter.emit( + sessionName, + 'assistant.text', + { text: `⚠️ Auto-resume failed: ${resumeErr}. Restart the session manually to recover.`, streaming: false, memoryExcluded: true }, + { source: 'daemon', confidence: 'high' }, + ); + } + }); return; } if (transportRuntime && !transportRuntime.providerSessionId) { @@ -3054,6 +3071,38 @@ async function handleSubSessionStart(cmd: Record, serverLink: S } catch { /* not connected */ } } catch (e: unknown) { logger.error({ err: e, id, type }, 'subsession.start failed (transport)'); + const now = Date.now(); + const errMsg = e instanceof Error ? e.message : String(e); + const existing = getSession(sessionName); + const errorRecord: SessionRecord = { + name: sessionName, + projectName: existing?.projectName ?? sessionName, + role: existing?.role ?? 'w1', + agentType: type, + projectDir: existing?.projectDir ?? ((cwd as string) || process.cwd()), + state: 'error', + restarts: existing?.restarts ?? 0, + restartTimestamps: existing?.restartTimestamps ?? [], + createdAt: existing?.createdAt ?? now, + updatedAt: now, + runtimeType: 'transport', + providerId: type, + ...(description ? { description } : {}), + ...(ccPreset ? { ccPreset } : {}), + ...(effort ? { effort } : {}), + ...(parentSession ? { parentSession } : {}), + ...(cmd.requestedModel || cmd.model + ? { requestedModel: ((cmd.requestedModel as string | undefined) ?? (cmd.model as string | undefined)) } + : {}), + userCreated: true, + }; + upsertSession(errorRecord); + timelineEmitter.emit( + sessionName, + 'session.state', + { state: 'error', error: errMsg }, + { source: 'daemon', confidence: 'high' }, + ); } return; } diff --git a/test/daemon/command-handler-transport-queue.test.ts b/test/daemon/command-handler-transport-queue.test.ts index fd776b6cc..6168e0fa1 100644 --- a/test/daemon/command-handler-transport-queue.test.ts +++ b/test/daemon/command-handler-transport-queue.test.ts @@ -585,6 +585,12 @@ describe('handleWebCommand transport queue behavior', () => { status: 'accepted', session: 'deck_transport_brain', }); + expect(stopTransportRuntimeSessionMock).toHaveBeenCalledWith('deck_transport_brain'); + expect(launchTransportSessionMock).toHaveBeenCalledWith(expect.objectContaining({ + name: 'deck_transport_brain', + agentType: 'claude-code-sdk', + projectName: 'transport', + })); // 2. NO user.message timeline event — the agent hasn't seen this message // yet, it's sitting in the daemon's resend queue. Emitting a @@ -648,6 +654,38 @@ describe('handleWebCommand transport queue behavior', () => { clearAllResend(); }); + it('persists a transport error record when subsession.start fails before runtime creation', async () => { + launchTransportSessionMock.mockRejectedValueOnce(new Error('provider bootstrap failed')); + + handleWebCommand({ + type: 'subsession.start', + id: 'cursor_fail', + sessionType: 'cursor-headless', + cwd: '/tmp/project', + parentSession: 'deck_proj_brain', + requestedModel: 'gpt-5.4', + }, serverLink as any); + await flushAsync(); + + expect(upsertSessionMock).toHaveBeenCalledWith(expect.objectContaining({ + name: 'deck_sub_cursor_fail', + agentType: 'cursor-headless', + projectDir: '/tmp/project', + runtimeType: 'transport', + providerId: 'cursor-headless', + state: 'error', + parentSession: 'deck_proj_brain', + requestedModel: 'gpt-5.4', + userCreated: true, + })); + expect(emitMock).toHaveBeenCalledWith( + 'deck_sub_cursor_fail', + 'session.state', + expect.objectContaining({ state: 'error', error: 'provider bootstrap failed' }), + expect.objectContaining({ source: 'daemon' }), + ); + }); + it('tracks supervision task intents while offline so Auto still follows the resent turn', async () => { const { clearAllResend } = await import('../../src/daemon/transport-resend-queue.js'); clearAllResend(); From 1df351e4854d375120e92f064caf2db284de8de8 Mon Sep 17 00:00:00 2001 From: "IM.codes" Date: Thu, 23 Apr 2026 11:49:46 +0800 Subject: [PATCH 2/5] fix: surface qwen auth failures --- src/agent/providers/qwen.ts | 48 +++++++++++++++++++++---- test/agent/qwen-provider.test.ts | 61 ++++++++++++++++++++++++++++++-- 2 files changed, 100 insertions(+), 9 deletions(-) diff --git a/src/agent/providers/qwen.ts b/src/agent/providers/qwen.ts index 3e03a18ac..4cc1f5b8f 100644 --- a/src/agent/providers/qwen.ts +++ b/src/agent/providers/qwen.ts @@ -34,6 +34,7 @@ const execFileAsync = promisify(execFile); const QWEN_BIN = 'qwen'; const TRANSIENT_RETRY_DELAY_MS = 250; const TRANSIENT_RETRY_MAX_ATTEMPTS = 1; +const UUID_PATTERN = /^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i; /** * Auth types accepted by the qwen CLI's `--auth-type` flag. @@ -66,6 +67,16 @@ function resolveCliAuthType(settings: string | Record | undefin return QWEN_CLI_AUTH_TYPES.has(selected) ? selected : undefined; } +function isUuid(value: string | undefined): value is string { + return typeof value === 'string' && UUID_PATTERN.test(value); +} + +function extractSyntheticApiError(text: string | undefined): string | undefined { + if (typeof text !== 'string') return undefined; + const match = text.trim().match(/^\[API Error:\s*(.+)\]$/i); + return match?.[1]?.trim() || undefined; +} + interface QwenSessionState { cwd: string; started: boolean; @@ -255,9 +266,14 @@ export class QwenProvider implements TransportProvider { async createSession(config: SessionConfig): Promise { const sessionId = config.bindExistingKey ?? config.sessionKey; const existing = this.sessions.get(sessionId); + const qwenConversationId = existing?.qwenConversationId + ?? (isUuid(config.resumeId) ? config.resumeId : undefined) + ?? (isUuid(config.bindExistingKey) ? config.bindExistingKey : undefined) + ?? (isUuid(config.sessionKey) ? config.sessionKey : undefined) + ?? randomUUID(); this.sessions.set(sessionId, { cwd: normalizeTransportCwd(config.cwd) ?? existing?.cwd ?? normalizeTransportCwd(process.cwd())!, - started: !!(config.bindExistingKey || config.skipCreate || existing?.started), + started: !!(config.resumeId || config.bindExistingKey || config.skipCreate || existing?.started), description: config.description ?? existing?.description, model: typeof config.agentId === 'string' ? config.agentId : existing?.model, env: config.env ?? existing?.env, @@ -265,7 +281,7 @@ export class QwenProvider implements TransportProvider { settings: config.settings ?? existing?.settings, settingsDir: existing?.settingsDir, settingsPath: existing?.settingsPath, - qwenConversationId: existing?.qwenConversationId ?? sessionId, + qwenConversationId, child: existing?.child ?? null, currentMessageId: existing?.currentMessageId ?? null, currentText: existing?.currentText ?? '', @@ -371,7 +387,7 @@ export class QwenProvider implements TransportProvider { settings: undefined, settingsDir: undefined, settingsPath: undefined, - qwenConversationId: sessionId, + qwenConversationId: randomUUID(), child: null, currentMessageId: null, currentText: '', @@ -471,9 +487,11 @@ export class QwenProvider implements TransportProvider { const emitError = (messageText: string, details?: unknown): void => { if (sawError || completed) return; sawError = true; - const code = state.cancelled ? 'CANCELLED' : PROVIDER_ERROR_CODES.PROVIDER_ERROR; - const recoverable = state.cancelled ? true : false; - this.errorCallbacks.forEach((cb) => cb(sessionId, this.makeError(code, messageText, recoverable, details))); + const errorCode = state.cancelled + ? PROVIDER_ERROR_CODES.CANCELLED + : (this.isAuthFailureMessage(messageText) ? PROVIDER_ERROR_CODES.AUTH_FAILED : PROVIDER_ERROR_CODES.PROVIDER_ERROR); + const recoverable = errorCode === PROVIDER_ERROR_CODES.CANCELLED; + this.errorCallbacks.forEach((cb) => cb(sessionId, this.makeError(errorCode, messageText, recoverable, details))); }; const emitComplete = (text: string, messageId?: string, metadata?: Record): void => { @@ -656,6 +674,13 @@ export class QwenProvider implements TransportProvider { } const finalText = collectAssistantText(payload.message?.content); if (finalText) { + const syntheticApiError = extractSyntheticApiError(finalText); + if (syntheticApiError) { + void maybeRetryTransientError(syntheticApiError, payload).then((retried) => { + if (!retried) emitError(syntheticApiError, payload); + }); + return; + } state.pendingFinalText = finalText; state.pendingFinalMetadata = { ...(state.model || payload.message?.model ? { model: state.model ?? payload.message?.model } : {}), @@ -696,6 +721,13 @@ export class QwenProvider implements TransportProvider { }); return; } + const syntheticApiError = extractSyntheticApiError(payload.result); + if (syntheticApiError) { + void maybeRetryTransientError(syntheticApiError, payload).then((retried) => { + if (!retried) emitError(syntheticApiError, payload); + }); + return; + } const resultText = typeof payload.result === 'string' && payload.result.trim() ? payload.result : state.pendingFinalText; @@ -793,6 +825,10 @@ export class QwenProvider implements TransportProvider { return /premature close|fetch failed|connection error|socket hang up|econnreset|etimedout|network error/i.test(message); } + private isAuthFailureMessage(message: string): boolean { + return /invalid access token|token expired|unauthorized|authentication failed|401\b/i.test(message); + } + private emitStatus(sessionId: string, state: QwenSessionState, status: ProviderStatusUpdate): void { const signature = JSON.stringify({ status: status.status, diff --git a/test/agent/qwen-provider.test.ts b/test/agent/qwen-provider.test.ts index 93a0c8953..c3325eb3f 100644 --- a/test/agent/qwen-provider.test.ts +++ b/test/agent/qwen-provider.test.ts @@ -71,6 +71,8 @@ import { TransportSessionRuntime } from '../../src/agent/transport-session-runti import type { ToolCallEvent } from '../../src/agent/transport-provider.js'; import type { ProviderContextPayload } from '../../shared/context-types.js'; +const UUID_PATTERN = /^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i; + function lastSpawn() { const entry = childProcessMock.spawned.at(-1); if (!entry) throw new Error('No spawned qwen process'); @@ -360,14 +362,15 @@ describe('QwenProvider', () => { expect(first.file === 'qwen' || /node(\.exe)?$/i.test(first.file)).toBe(true); expect(first.cwd).toBe('/tmp/project'); expect(first.args).toContain('--session-id'); - expect(first.args).toContain('sess-1'); + const firstSessionId = first.args[first.args.indexOf('--session-id') + 1]; + expect(firstSessionId).toMatch(UUID_PATTERN); expect(first.args).not.toContain('--resume'); expect(first.args).toContain('--append-system-prompt'); expect(first.args).toContain('Be concise'); expect(first.args).toContain('--model'); expect(first.args).toContain('qwen3-coder-plus'); - first.child.stdout.write(`${JSON.stringify({ type: 'system', subtype: 'session_start', session_id: 'sess-1' })}\n`); + first.child.stdout.write(`${JSON.stringify({ type: 'system', subtype: 'session_start', session_id: firstSessionId })}\n`); first.child.stdout.write(`${JSON.stringify({ type: 'stream_event', event: { type: 'message_start', message: { id: 'msg-1' } } })}\n`); first.child.stdout.write(`${JSON.stringify({ type: 'stream_event', event: { type: 'content_block_delta', delta: { type: 'text_delta', text: 'Hel' } } })}\n`); first.child.stdout.write(`${JSON.stringify({ type: 'stream_event', event: { type: 'content_block_delta', delta: { type: 'text_delta', text: 'lo' } } })}\n`); @@ -381,10 +384,27 @@ describe('QwenProvider', () => { await provider.send('sess-1', 'again'); const second = lastSpawn(); expect(second.args).toContain('--resume'); - expect(second.args).toContain('sess-1'); + expect(second.args).toContain(firstSessionId); expect(second.args).not.toContain('--session-id'); }); + it('uses a provided UUID resumeId when restoring qwen sessions', async () => { + const provider = new QwenProvider(); + await provider.connect({}); + const resumeId = 'dd09c62c-4cb2-41be-a2f7-a5682760e3b1'; + await provider.createSession({ + sessionKey: 'sess-resume-id', + cwd: '/tmp/project', + resumeId, + }); + + await provider.send('sess-resume-id', 'hello'); + const run = lastSpawn(); + expect(run.args).toContain('--resume'); + expect(run.args).toContain(resumeId); + expect(run.args).not.toContain('--session-id'); + }); + it('maps normalized payloads into qwen CLI prompt/system arguments', async () => { const provider = new QwenProvider(); await provider.connect({}); @@ -708,6 +728,41 @@ describe('QwenProvider', () => { expect(errors).toEqual(['bad request']); }); + it('surfaces qwen synthetic API auth failures as AUTH_FAILED errors', async () => { + const provider = new QwenProvider(); + await provider.connect({}); + await provider.createSession({ sessionKey: 'sess-auth-fail', cwd: '/tmp/project' }); + + const errors: Array<{ code: string; message: string }> = []; + const completed: string[] = []; + provider.onError((_sid, err) => errors.push({ code: err.code, message: err.message })); + provider.onComplete((_sid, msg) => completed.push(msg.content)); + + await provider.send('sess-auth-fail', 'hello'); + const run = lastSpawn(); + run.child.stdout.write(`${JSON.stringify({ + type: 'assistant', + message: { + id: 'msg-auth-fail', + content: [{ type: 'text', text: '[API Error: 401 invalid access token or token expired]' }], + }, + })}\n`); + run.child.stdout.write(`${JSON.stringify({ + type: 'result', + is_error: false, + result: '[API Error: 401 invalid access token or token expired]', + })}\n`); + run.child.emit('close', 0, null); + await flushIO(); + await flushIO(); + + expect(completed).toEqual([]); + expect(errors).toEqual([{ + code: 'AUTH_FAILED', + message: '401 invalid access token or token expired', + }]); + }); + it('retries a transient Premature close once before surfacing an error', async () => { const provider = new QwenProvider(); await provider.connect({}); From 8cab7de8d50388228d47142c585867c9fcab2893 Mon Sep 17 00:00:00 2001 From: "IM.codes" Date: Thu, 23 Apr 2026 13:22:48 +0800 Subject: [PATCH 3/5] fix: align qwen runtime config with settings --- src/agent/providers/cursor-headless-stream.ts | 2 - src/agent/providers/qwen.ts | 2 +- src/agent/qwen-runtime-config.ts | 9 +++- src/agent/transport-runtime-assembly.ts | 4 -- src/context/embedding.ts | 2 +- src/daemon/command-handler.ts | 3 +- src/daemon/p2p-orchestrator.ts | 15 ------ src/daemon/timeline-projection-worker.ts | 1 - test/agent/qwen-runtime-config.test.ts | 49 +++++++++++++++++++ 9 files changed, 60 insertions(+), 27 deletions(-) diff --git a/src/agent/providers/cursor-headless-stream.ts b/src/agent/providers/cursor-headless-stream.ts index cab0bd244..74c8e9a65 100644 --- a/src/agent/providers/cursor-headless-stream.ts +++ b/src/agent/providers/cursor-headless-stream.ts @@ -1,5 +1,3 @@ -import type { ToolCallEvent } from '../../../shared/agent-message.js'; - type CursorRecord = Record; export interface CursorSessionInitEvent { diff --git a/src/agent/providers/qwen.ts b/src/agent/providers/qwen.ts index 4cc1f5b8f..541a00222 100644 --- a/src/agent/providers/qwen.ts +++ b/src/agent/providers/qwen.ts @@ -472,7 +472,7 @@ export class QwenProvider implements TransportProvider { || state.emittedToolSignatures.size > 0; }; - const maybeRetryTransientError = async (messageText: string, details?: unknown): Promise => { + const maybeRetryTransientError = async (messageText: string, _details?: unknown): Promise => { if (retryScheduled || transientRetryBudget <= 0) return false; if (sawVisibleTurnProgress()) return false; if (!this.isRetryableTransientError(messageText)) return false; diff --git a/src/agent/qwen-runtime-config.ts b/src/agent/qwen-runtime-config.ts index 88d5f35a2..e3ac65d81 100644 --- a/src/agent/qwen-runtime-config.ts +++ b/src/agent/qwen-runtime-config.ts @@ -87,6 +87,10 @@ async function readAuthStatus(): Promise<{ authType: QwenAuthType | null; authLi } } +function isKnownAuthType(authType: QwenAuthType): boolean { + return authType !== QWEN_AUTH_TYPES.UNKNOWN; +} + function detectAuthTypeFromSettings(settings: QwenSettings | null): QwenAuthType { const selectedType = settings?.security?.auth?.selectedType; if (selectedType === QWEN_AUTH_TYPES.OAUTH) return QWEN_AUTH_TYPES.OAUTH; @@ -124,7 +128,10 @@ export async function getQwenRuntimeConfig(force = false): Promise(); -const P2P_REMINDER_TEMPLATES: Record = { - en: enLocale.p2p.final_original_request_reminder, - 'zh-CN': zhCNLocale.p2p.final_original_request_reminder, - 'zh-TW': zhTWLocale.p2p.final_original_request_reminder, - ja: jaLocale.p2p.final_original_request_reminder, - ko: koLocale.p2p.final_original_request_reminder, - es: esLocale.p2p.final_original_request_reminder, - ru: ruLocale.p2p.final_original_request_reminder, -}; - -function buildOriginalRequestReminder(userText: string, locale?: string): string { - const template = P2P_REMINDER_TEMPLATES[locale ?? ''] ?? P2P_REMINDER_TEMPLATES.en; - return template.replace('{{request}}', userText); -} - const P2P_POST_SUMMARY_EXECUTE_TEMPLATES: Record = { en: enLocale.p2p.post_summary_execute_prompt, 'zh-CN': zhCNLocale.p2p.post_summary_execute_prompt, diff --git a/src/daemon/timeline-projection-worker.ts b/src/daemon/timeline-projection-worker.ts index 4151590d5..9903d5388 100644 --- a/src/daemon/timeline-projection-worker.ts +++ b/src/daemon/timeline-projection-worker.ts @@ -8,7 +8,6 @@ import type { ProjectionSessionMeta, ProjectionWorkerEnvelope, ProjectionWorkerRequestType, - ProjectionWorkerRequestMap, ProjectionWorkerResponse, } from './timeline-projection-types.js'; diff --git a/test/agent/qwen-runtime-config.test.ts b/test/agent/qwen-runtime-config.test.ts index 1015ade7f..ab2d79fd1 100644 --- a/test/agent/qwen-runtime-config.test.ts +++ b/test/agent/qwen-runtime-config.test.ts @@ -109,4 +109,53 @@ describe('getQwenRuntimeConfig', () => { expect(config.authType).toBe(QWEN_AUTH_TYPES.OAUTH); expect(config.authLimit).toBe('Up to 1,000 requests/day'); }); + + it('prefers settings auth/model resolution over misleading auth status output', async () => { + childProcessMock.execFile.mockImplementation((...args: any[]) => { + const cb = args.at(-1); + cb?.(null, [ + 'Authentication Method: Alibaba Cloud Coding Plan (Incomplete)', + 'Issue: API key not found in environment or settings', + ].join('\n'), ''); + return {} as never; + }); + fsMock.readFile.mockResolvedValue(JSON.stringify({ + security: { auth: { selectedType: 'openai' } }, + tokenPlan: { region: 'china' }, + model: { name: 'qwen3.6-plus' }, + modelProviders: { + openai: [ + { + id: 'qwen3.6-plus', + envKey: 'BAILIAN_TOKEN_PLAN_API_KEY', + baseUrl: 'https://token-plan.cn-beijing.maas.aliyuncs.com/compatible-mode/v1', + }, + { + id: 'glm-5', + envKey: 'BAILIAN_TOKEN_PLAN_API_KEY', + baseUrl: 'https://token-plan.cn-beijing.maas.aliyuncs.com/compatible-mode/v1', + }, + { + id: 'MiniMax-M2.5', + envKey: 'BAILIAN_TOKEN_PLAN_API_KEY', + baseUrl: 'https://token-plan.cn-beijing.maas.aliyuncs.com/compatible-mode/v1', + }, + { + id: 'deepseek-v3.2', + envKey: 'BAILIAN_TOKEN_PLAN_API_KEY', + baseUrl: 'https://token-plan.cn-beijing.maas.aliyuncs.com/compatible-mode/v1', + }, + ], + }, + })); + + const config = await getQwenRuntimeConfig(true); + expect(config.authType).toBe(QWEN_AUTH_TYPES.API_KEY); + expect(config.availableModels).toEqual([ + 'qwen3.6-plus', + 'glm-5', + 'MiniMax-M2.5', + 'deepseek-v3.2', + ]); + }); }); From d98e7e309c7815f8e163262e242f98227ba84766 Mon Sep 17 00:00:00 2001 From: "IM.codes" Date: Thu, 23 Apr 2026 15:13:51 +0800 Subject: [PATCH 4/5] fix: sync timeline projection incrementally --- src/daemon/timeline-projection-worker.ts | 125 +++++++++++++++++++++-- test/daemon/timeline-projection.test.ts | 69 ++++++++++++- 2 files changed, 182 insertions(+), 12 deletions(-) diff --git a/src/daemon/timeline-projection-worker.ts b/src/daemon/timeline-projection-worker.ts index 9903d5388..c58b8ee7d 100644 --- a/src/daemon/timeline-projection-worker.ts +++ b/src/daemon/timeline-projection-worker.ts @@ -1,6 +1,6 @@ import { parentPort, workerData } from 'node:worker_threads'; import { createRequire } from 'node:module'; -import { mkdirSync, statSync, existsSync, readFileSync } from 'node:fs'; +import { mkdirSync, statSync, existsSync, readFileSync, openSync, readSync, closeSync } from 'node:fs'; import { dirname, join } from 'node:path'; import { homedir } from 'node:os'; import type { TimelineEvent, TimelineEventType } from './timeline-event.js'; @@ -175,6 +175,45 @@ function parseLinesAscending(sessionId: string): TimelineEvent[] { return events; } +function parseAppendedEvents(sessionId: string, startOffset: number, endOffset: number): TimelineEvent[] | null { + const filePath = sessionFilePath(sessionId); + if (!existsSync(filePath)) return []; + if (endOffset <= startOffset) return []; + + let fd: number | null = null; + try { + fd = openSync(filePath, 'r'); + const expectedLength = endOffset - startOffset; + const buf = Buffer.alloc(expectedLength); + let totalRead = 0; + while (totalRead < expectedLength) { + const bytesRead = readSync(fd, buf, totalRead, expectedLength - totalRead, startOffset + totalRead); + if (bytesRead <= 0) break; + totalRead += bytesRead; + } + if (totalRead !== expectedLength) return null; + + const raw = buf.toString('utf8'); + if (!raw.trim()) return []; + + const events: TimelineEvent[] = []; + for (const line of raw.split('\n')) { + if (!line) continue; + try { + const event = JSON.parse(line) as TimelineEvent; + if (event.sessionId === sessionId) events.push(event); + } catch { + return null; + } + } + return events; + } catch { + return null; + } finally { + if (fd !== null) closeSync(fd); + } +} + function extractTextAndStreaming(event: TimelineEvent): { text: string | null; streaming: number } { const text = typeof event.payload?.text === 'string' ? event.payload.text : null; const streaming = event.payload?.streaming === true ? 1 : 0; @@ -249,29 +288,89 @@ async function rebuildSessionInternal(sessionId: string): Promise { return promise; } -async function ensureFreshSession(sessionId: string): Promise { - const meta = readSessionMeta(sessionId); +function markSessionReady( + sessionId: string, + meta: ProjectionSessionMeta | null, + fileMeta: { exists: boolean; size: number; mtimeMs: number }, +): void { + upsertSessionMeta(sessionId, { + lastProjectedAppendOrdinal: meta?.lastProjectedAppendOrdinal ?? 0, + sourceFileSizeBytes: fileMeta.size, + sourceFileMtimeMs: fileMeta.mtimeMs, + status: fileMeta.exists ? 'ready' : 'missing', + lastRebuiltAt: meta?.lastRebuiltAt ?? null, + }); +} + +async function syncSessionDelta(sessionId: string, meta: ProjectionSessionMeta): Promise { const fileMeta = currentFileMeta(sessionId); if (!fileMeta.exists) { deleteSessionRows(sessionId); return false; } - if (!meta) { + + if (fileMeta.size === meta.sourceFileSizeBytes) { + // JSONL is append-only. If only mtime drifted, just refresh the tracked + // source metadata instead of rebuilding the full projection. + if (fileMeta.mtimeMs !== meta.sourceFileMtimeMs) { + markSessionReady(sessionId, meta, fileMeta); + } + return true; + } + + if (fileMeta.size < meta.sourceFileSizeBytes || fileMeta.mtimeMs < meta.sourceFileMtimeMs) { await rebuildSessionInternal(sessionId); return true; } - if (meta.status !== 'ready' - || meta.projectionVersion !== PROJECTION_VERSION - || meta.sourceFileSizeBytes !== fileMeta.size - || meta.sourceFileMtimeMs !== fileMeta.mtimeMs) { + + const appendedEvents = parseAppendedEvents(sessionId, meta.sourceFileSizeBytes, fileMeta.size); + if (appendedEvents === null) { + await rebuildSessionInternal(sessionId); + return true; + } + + if (appendedEvents.length === 0) { + markSessionReady(sessionId, meta, fileMeta); + return true; + } + + const database = ensureDb(); + runInTransaction(() => { + let appendOrdinal = meta.lastProjectedAppendOrdinal; + for (const event of appendedEvents) { + appendOrdinal += 1; + insertProjectedEvent(database, sessionId, appendOrdinal, event); + } upsertSessionMeta(sessionId, { - lastProjectedAppendOrdinal: meta.lastProjectedAppendOrdinal, + lastProjectedAppendOrdinal: appendOrdinal, sourceFileSizeBytes: fileMeta.size, sourceFileMtimeMs: fileMeta.mtimeMs, - status: 'stale', + status: 'ready', lastRebuiltAt: meta.lastRebuiltAt, }); + }); + writesSinceCheckpoint += appendedEvents.length; + maybeCheckpoint(); + return true; +} + +async function ensureFreshSession(sessionId: string): Promise { + const meta = readSessionMeta(sessionId); + const fileMeta = currentFileMeta(sessionId); + if (!fileMeta.exists) { + deleteSessionRows(sessionId); + return false; + } + if (!meta) { await rebuildSessionInternal(sessionId); + return true; + } + if (meta.status !== 'ready' || meta.projectionVersion !== PROJECTION_VERSION) { + await rebuildSessionInternal(sessionId); + return true; + } + if (meta.sourceFileSizeBytes !== fileMeta.size || meta.sourceFileMtimeMs !== fileMeta.mtimeMs) { + await syncSessionDelta(sessionId, meta); } return true; } @@ -311,7 +410,11 @@ async function handleRecordAppendedEvent(event: TimelineEvent): Promise || fileMeta.size !== meta.sourceFileSizeBytes + appendedBytes || fileMeta.mtimeMs < meta.sourceFileMtimeMs ) { - await rebuildSessionInternal(event.sessionId); + if (meta && meta.status === 'ready' && meta.projectionVersion === PROJECTION_VERSION) { + await syncSessionDelta(event.sessionId, meta); + } else { + await rebuildSessionInternal(event.sessionId); + } return true; } const database = ensureDb(); diff --git a/test/daemon/timeline-projection.test.ts b/test/daemon/timeline-projection.test.ts index cccdac52e..5c235a1c6 100644 --- a/test/daemon/timeline-projection.test.ts +++ b/test/daemon/timeline-projection.test.ts @@ -1,10 +1,14 @@ import { afterEach, describe, expect, it, vi } from 'vitest'; -import { appendFileSync, mkdirSync, mkdtempSync, rmSync } from 'node:fs'; +import { appendFileSync, mkdirSync, mkdtempSync, rmSync, utimesSync } from 'node:fs'; import { join } from 'node:path'; import { tmpdir } from 'node:os'; +import { createRequire } from 'node:module'; import type { TimelineEvent } from '../../src/daemon/timeline-event.js'; +const require = createRequire(import.meta.url); +const { DatabaseSync } = require('node:sqlite') as typeof import('node:sqlite'); + const originalHome = process.env.HOME; const originalUserProfile = process.env.USERPROFILE; const originalDbPath = process.env.IMCODES_TIMELINE_PROJECTION_DB_PATH; @@ -60,6 +64,25 @@ describe('timeline projection', () => { return { timelineProjection, timelineStore }; } + function readSessionMeta(sessionId: string): { lastRebuiltAt: number | null; status: string; lastProjectedAppendOrdinal: number } { + const db = new DatabaseSync(dbPath!, { readonly: true }); + try { + const row = db.prepare(` + SELECT last_rebuilt_at, status, last_projected_append_ordinal + FROM timeline_projection_sessions + WHERE session_id = ? + `).get(sessionId) as Record | undefined; + if (!row) throw new Error(`missing projection session row for ${sessionId}`); + return { + lastRebuiltAt: typeof row.last_rebuilt_at === 'number' ? row.last_rebuilt_at : null, + status: String(row.status), + lastProjectedAppendOrdinal: Number(row.last_projected_append_ordinal), + }; + } finally { + db.close(); + } + } + it('preserves append order for equal-ts events and honors afterTs / beforeTs exclusivity', async () => { const { timelineProjection, timelineStore } = await loadModules(); const sessionId = 'projection_order'; @@ -125,4 +148,48 @@ describe('timeline projection', () => { const rebuiltFromAuthoritative = await timelineProjection.queryHistory({ sessionId, limit: 10 }); expect(rebuiltFromAuthoritative?.map((event) => event.seq)).toEqual([2, 3]); }); + + it('incrementally syncs appended JSONL tails without forcing a full rebuild', async () => { + const { timelineProjection, timelineStore } = await loadModules(); + const sessionId = 'projection_incremental_tail'; + const timelineFile = timelineStore.filePath(sessionId); + mkdirSync(join(tempHome!, '.imcodes', 'timeline'), { recursive: true }); + + timelineStore.append(makeEvent(sessionId, 1, 'assistant.text', { text: 'one' }, 1000)); + timelineStore.append(makeEvent(sessionId, 2, 'assistant.text', { text: 'two' }, 1001)); + await timelineProjection.rebuildSession(sessionId); + + const before = readSessionMeta(sessionId); + appendFileSync(timelineFile, `${JSON.stringify(makeEvent(sessionId, 3, 'assistant.text', { text: 'three' }, 1002))}\n`); + + const synced = await timelineStore.readPreferred(sessionId, { limit: 10 }); + const after = readSessionMeta(sessionId); + + expect(synced.map((event) => event.seq)).toEqual([1, 2, 3]); + expect(after.status).toBe('ready'); + expect(after.lastProjectedAppendOrdinal).toBe(3); + expect(after.lastRebuiltAt).toBe(before.lastRebuiltAt); + }); + + it('does not rebuild when only timeline file mtime changes', async () => { + const { timelineProjection, timelineStore } = await loadModules(); + const sessionId = 'projection_mtime_only'; + const timelineFile = timelineStore.filePath(sessionId); + + timelineStore.append(makeEvent(sessionId, 1, 'assistant.text', { text: 'one' }, 1000)); + timelineStore.append(makeEvent(sessionId, 2, 'assistant.text', { text: 'two' }, 1001)); + await timelineProjection.rebuildSession(sessionId); + + const before = readSessionMeta(sessionId); + const bumpedAt = new Date(Date.now() + 5_000); + utimesSync(timelineFile, bumpedAt, bumpedAt); + + const read = await timelineStore.readPreferred(sessionId, { limit: 10 }); + const after = readSessionMeta(sessionId); + + expect(read.map((event) => event.seq)).toEqual([1, 2]); + expect(after.status).toBe('ready'); + expect(after.lastProjectedAppendOrdinal).toBe(2); + expect(after.lastRebuiltAt).toBe(before.lastRebuiltAt); + }); }); From 323ad4b8ca9a2863333e9b59db7406d00308443e Mon Sep 17 00:00:00 2001 From: "IM.codes" Date: Thu, 23 Apr 2026 15:41:54 +0800 Subject: [PATCH 5/5] test: stabilize timeline projection fallback cleanup --- .../daemon/timeline-store.projection-fallback.test.ts | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/test/daemon/timeline-store.projection-fallback.test.ts b/test/daemon/timeline-store.projection-fallback.test.ts index d9791bd1e..9cb191338 100644 --- a/test/daemon/timeline-store.projection-fallback.test.ts +++ b/test/daemon/timeline-store.projection-fallback.test.ts @@ -17,6 +17,15 @@ vi.mock('../../src/daemon/timeline-projection.js', () => ({ timelineProjection: projectionMocks, })); +vi.mock('../../src/util/logger.js', () => ({ + default: { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }, +})); + describe('timeline-store projection fallbacks', () => { const originalHome = process.env.HOME; const originalUserProfile = process.env.USERPROFILE; @@ -29,7 +38,7 @@ describe('timeline-store projection fallbacks', () => { else process.env.HOME = originalHome; if (originalUserProfile === undefined) delete process.env.USERPROFILE; else process.env.USERPROFILE = originalUserProfile; - if (tempHome) rmSync(tempHome, { recursive: true, force: true }); + if (tempHome) rmSync(tempHome, { recursive: true, force: true, maxRetries: 5, retryDelay: 50 }); tempHome = null; });