Skip to content
Merged

Dev #13

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/agent/providers/cursor-headless-stream.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import type { ToolCallEvent } from '../../../shared/agent-message.js';

type CursorRecord = Record<string, unknown>;

export interface CursorSessionInitEvent {
Expand Down
50 changes: 43 additions & 7 deletions src/agent/providers/qwen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const execFileAsync = promisify(execFile);
const QWEN_BIN = 'qwen';
const TRANSIENT_RETRY_DELAY_MS = 250;
const TRANSIENT_RETRY_MAX_ATTEMPTS = 1;
const UUID_PATTERN = /^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i;

/**
* Auth types accepted by the qwen CLI's `--auth-type` flag.
Expand Down Expand Up @@ -66,6 +67,16 @@ function resolveCliAuthType(settings: string | Record<string, unknown> | undefin
return QWEN_CLI_AUTH_TYPES.has(selected) ? selected : undefined;
}

function isUuid(value: string | undefined): value is string {
return typeof value === 'string' && UUID_PATTERN.test(value);
}

function extractSyntheticApiError(text: string | undefined): string | undefined {
if (typeof text !== 'string') return undefined;
const match = text.trim().match(/^\[API Error:\s*(.+)\]$/i);
return match?.[1]?.trim() || undefined;
}

interface QwenSessionState {
cwd: string;
started: boolean;
Expand Down Expand Up @@ -255,17 +266,22 @@ export class QwenProvider implements TransportProvider {
async createSession(config: SessionConfig): Promise<string> {
const sessionId = config.bindExistingKey ?? config.sessionKey;
const existing = this.sessions.get(sessionId);
const qwenConversationId = existing?.qwenConversationId
?? (isUuid(config.resumeId) ? config.resumeId : undefined)
?? (isUuid(config.bindExistingKey) ? config.bindExistingKey : undefined)
?? (isUuid(config.sessionKey) ? config.sessionKey : undefined)
?? randomUUID();
this.sessions.set(sessionId, {
cwd: normalizeTransportCwd(config.cwd) ?? existing?.cwd ?? normalizeTransportCwd(process.cwd())!,
started: !!(config.bindExistingKey || config.skipCreate || existing?.started),
started: !!(config.resumeId || config.bindExistingKey || config.skipCreate || existing?.started),
description: config.description ?? existing?.description,
model: typeof config.agentId === 'string' ? config.agentId : existing?.model,
env: config.env ?? existing?.env,
effort: config.effort ?? existing?.effort ?? DEFAULT_TRANSPORT_EFFORT,
settings: config.settings ?? existing?.settings,
settingsDir: existing?.settingsDir,
settingsPath: existing?.settingsPath,
qwenConversationId: existing?.qwenConversationId ?? sessionId,
qwenConversationId,
child: existing?.child ?? null,
currentMessageId: existing?.currentMessageId ?? null,
currentText: existing?.currentText ?? '',
Expand Down Expand Up @@ -371,7 +387,7 @@ export class QwenProvider implements TransportProvider {
settings: undefined,
settingsDir: undefined,
settingsPath: undefined,
qwenConversationId: sessionId,
qwenConversationId: randomUUID(),
child: null,
currentMessageId: null,
currentText: '',
Expand Down Expand Up @@ -456,7 +472,7 @@ export class QwenProvider implements TransportProvider {
|| state.emittedToolSignatures.size > 0;
};

const maybeRetryTransientError = async (messageText: string, details?: unknown): Promise<boolean> => {
const maybeRetryTransientError = async (messageText: string, _details?: unknown): Promise<boolean> => {
if (retryScheduled || transientRetryBudget <= 0) return false;
if (sawVisibleTurnProgress()) return false;
if (!this.isRetryableTransientError(messageText)) return false;
Expand All @@ -471,9 +487,11 @@ export class QwenProvider implements TransportProvider {
const emitError = (messageText: string, details?: unknown): void => {
if (sawError || completed) return;
sawError = true;
const code = state.cancelled ? 'CANCELLED' : PROVIDER_ERROR_CODES.PROVIDER_ERROR;
const recoverable = state.cancelled ? true : false;
this.errorCallbacks.forEach((cb) => cb(sessionId, this.makeError(code, messageText, recoverable, details)));
const errorCode = state.cancelled
? PROVIDER_ERROR_CODES.CANCELLED
: (this.isAuthFailureMessage(messageText) ? PROVIDER_ERROR_CODES.AUTH_FAILED : PROVIDER_ERROR_CODES.PROVIDER_ERROR);
const recoverable = errorCode === PROVIDER_ERROR_CODES.CANCELLED;
this.errorCallbacks.forEach((cb) => cb(sessionId, this.makeError(errorCode, messageText, recoverable, details)));
};

const emitComplete = (text: string, messageId?: string, metadata?: Record<string, unknown>): void => {
Expand Down Expand Up @@ -656,6 +674,13 @@ export class QwenProvider implements TransportProvider {
}
const finalText = collectAssistantText(payload.message?.content);
if (finalText) {
const syntheticApiError = extractSyntheticApiError(finalText);
if (syntheticApiError) {
void maybeRetryTransientError(syntheticApiError, payload).then((retried) => {
if (!retried) emitError(syntheticApiError, payload);
});
return;
}
state.pendingFinalText = finalText;
state.pendingFinalMetadata = {
...(state.model || payload.message?.model ? { model: state.model ?? payload.message?.model } : {}),
Expand Down Expand Up @@ -696,6 +721,13 @@ export class QwenProvider implements TransportProvider {
});
return;
}
const syntheticApiError = extractSyntheticApiError(payload.result);
if (syntheticApiError) {
void maybeRetryTransientError(syntheticApiError, payload).then((retried) => {
if (!retried) emitError(syntheticApiError, payload);
});
return;
}
const resultText = typeof payload.result === 'string' && payload.result.trim()
? payload.result
: state.pendingFinalText;
Expand Down Expand Up @@ -793,6 +825,10 @@ export class QwenProvider implements TransportProvider {
return /premature close|fetch failed|connection error|socket hang up|econnreset|etimedout|network error/i.test(message);
}

private isAuthFailureMessage(message: string): boolean {
return /invalid access token|token expired|unauthorized|authentication failed|401\b/i.test(message);
}

private emitStatus(sessionId: string, state: QwenSessionState, status: ProviderStatusUpdate): void {
const signature = JSON.stringify({
status: status.status,
Expand Down
9 changes: 8 additions & 1 deletion src/agent/qwen-runtime-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ async function readAuthStatus(): Promise<{ authType: QwenAuthType | null; authLi
}
}

function isKnownAuthType(authType: QwenAuthType): boolean {
return authType !== QWEN_AUTH_TYPES.UNKNOWN;
}

function detectAuthTypeFromSettings(settings: QwenSettings | null): QwenAuthType {
const selectedType = settings?.security?.auth?.selectedType;
if (selectedType === QWEN_AUTH_TYPES.OAUTH) return QWEN_AUTH_TYPES.OAUTH;
Expand Down Expand Up @@ -124,7 +128,10 @@ export async function getQwenRuntimeConfig(force = false): Promise<QwenRuntimeCo

const settings = await readSettings();
const status = await readAuthStatus();
const authType = status?.authType ?? detectAuthTypeFromSettings(settings);
const authTypeFromSettings = detectAuthTypeFromSettings(settings);
const authType = isKnownAuthType(authTypeFromSettings)
? authTypeFromSettings
: (status?.authType ?? authTypeFromSettings);
const availableModels = getAvailableModelsFromSettings(settings, authType);

const value = {
Expand Down
4 changes: 0 additions & 4 deletions src/agent/transport-runtime-assembly.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,6 @@ export function buildProviderContextPayload(
provider: TransportProvider,
input: TransportRuntimeAssemblyInput,
): ProviderContextPayload {
const namespace = input.namespace ?? {
scope: 'personal',
projectId: 'transport-default',
};
const { supportClass, authority } = resolveTransportDispatchAuthority(provider, input);
const sanitizedRecall = {
startupMemory: authority.authoritySource === 'processed_local' ? input.startupMemory : undefined,
Expand Down
2 changes: 1 addition & 1 deletion src/context/embedding.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* Lazy-loaded on first call — subsequent calls reuse the pipeline.
*/

import { EMBEDDING_MODEL, EMBEDDING_DTYPE, EMBEDDING_DIM, cosineSimilarity } from '../../shared/embedding-config.js';
import { EMBEDDING_MODEL, EMBEDDING_DTYPE, EMBEDDING_DIM } from '../../shared/embedding-config.js';
import logger from '../util/logger.js';

// Re-export shared constants for backward compatibility with existing imports
Expand Down
52 changes: 50 additions & 2 deletions src/daemon/command-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { timelineEmitter } from './timeline-emitter.js';
import { timelineStore } from './timeline-store.js';
import type { MemoryContextTimelinePayload } from '../shared/timeline/types.js';
import { emitSessionInlineError } from './session-error.js';
import { enqueueResend, getResendEntries, getResendCount, clearResend } from './transport-resend-queue.js';
import { enqueueResend, getResendEntries, clearResend } from './transport-resend-queue.js';
import {
startSubSession,
stopSubSession,
Expand Down Expand Up @@ -78,7 +78,6 @@ import { getSavedP2pConfig, upsertSavedP2pConfig } from '../store/p2p-config-sto
import { getProcessedProjectionStats, queryPendingContextEvents, queryProcessedProjections, recordMemoryHits } from '../store/context-store.js';
import {
isKnownTestProjectName,
isKnownTestSessionLike,
isKnownTestSessionName,
} from '../../shared/test-session-guard.js';
import {
Expand Down Expand Up @@ -1844,6 +1843,23 @@ async function handleSend(cmd: Record<string, unknown>, serverLink: ServerLink):
const status = isLegacy ? 'accepted_legacy' : 'accepted';
timelineEmitter.emit(sessionName, 'command.ack', { commandId: effectiveId, status });
emitCommandAckReliable(serverLink, { commandId: effectiveId, sessionName, status });
// Best-effort resume for sessions that failed to launch or whose runtime
// vanished outside the provider reconnect path. The resend queue drains on
// successful relaunch, so the queued user message still delivers.
void runExclusiveSessionRelaunch(sessionName, async () => {
try {
await resumeTransportRuntimeAfterLoss(record);
} catch (err) {
logger.error({ err, sessionName }, 'auto-resume after missing transport runtime failed');
const resumeErr = err instanceof Error ? err.message : String(err);
timelineEmitter.emit(
sessionName,
'assistant.text',
{ text: `⚠️ Auto-resume failed: ${resumeErr}. Restart the session manually to recover.`, streaming: false, memoryExcluded: true },
{ source: 'daemon', confidence: 'high' },
);
}
});
return;
}
if (transportRuntime && !transportRuntime.providerSessionId) {
Expand Down Expand Up @@ -3054,6 +3070,38 @@ async function handleSubSessionStart(cmd: Record<string, unknown>, serverLink: S
} catch { /* not connected */ }
} catch (e: unknown) {
logger.error({ err: e, id, type }, 'subsession.start failed (transport)');
const now = Date.now();
const errMsg = e instanceof Error ? e.message : String(e);
const existing = getSession(sessionName);
const errorRecord: SessionRecord = {
name: sessionName,
projectName: existing?.projectName ?? sessionName,
role: existing?.role ?? 'w1',
agentType: type,
projectDir: existing?.projectDir ?? ((cwd as string) || process.cwd()),
state: 'error',
restarts: existing?.restarts ?? 0,
restartTimestamps: existing?.restartTimestamps ?? [],
createdAt: existing?.createdAt ?? now,
updatedAt: now,
runtimeType: 'transport',
providerId: type,
...(description ? { description } : {}),
...(ccPreset ? { ccPreset } : {}),
...(effort ? { effort } : {}),
...(parentSession ? { parentSession } : {}),
...(cmd.requestedModel || cmd.model
? { requestedModel: ((cmd.requestedModel as string | undefined) ?? (cmd.model as string | undefined)) }
: {}),
userCreated: true,
};
upsertSession(errorRecord);
timelineEmitter.emit(
sessionName,
'session.state',
{ state: 'error', error: errMsg },
{ source: 'daemon', confidence: 'high' },
);
}
return;
}
Expand Down
15 changes: 0 additions & 15 deletions src/daemon/p2p-orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,21 +154,6 @@ export interface P2pRun {

const activeRuns = new Map<string, P2pRun>();

const P2P_REMINDER_TEMPLATES: Record<string, string> = {
en: enLocale.p2p.final_original_request_reminder,
'zh-CN': zhCNLocale.p2p.final_original_request_reminder,
'zh-TW': zhTWLocale.p2p.final_original_request_reminder,
ja: jaLocale.p2p.final_original_request_reminder,
ko: koLocale.p2p.final_original_request_reminder,
es: esLocale.p2p.final_original_request_reminder,
ru: ruLocale.p2p.final_original_request_reminder,
};

function buildOriginalRequestReminder(userText: string, locale?: string): string {
const template = P2P_REMINDER_TEMPLATES[locale ?? ''] ?? P2P_REMINDER_TEMPLATES.en;
return template.replace('{{request}}', userText);
}

const P2P_POST_SUMMARY_EXECUTE_TEMPLATES: Record<string, string> = {
en: enLocale.p2p.post_summary_execute_prompt,
'zh-CN': zhCNLocale.p2p.post_summary_execute_prompt,
Expand Down
Loading
Loading