From e401e386c387e2d400360c2644ddbfc669f183f7 Mon Sep 17 00:00:00 2001 From: tsubasakong Date: Fri, 13 Mar 2026 11:41:24 -0700 Subject: [PATCH] Add retries for transient Codex server errors --- src/agent/runtime/attempt.ts | 217 +++++++++++++++++++++++++--- test/runtime-seams.test.mjs | 264 +++++++++++++++++++++++++++++++++++ 2 files changed, 460 insertions(+), 21 deletions(-) diff --git a/src/agent/runtime/attempt.ts b/src/agent/runtime/attempt.ts index a1d3b54..5665230 100644 --- a/src/agent/runtime/attempt.ts +++ b/src/agent/runtime/attempt.ts @@ -66,6 +66,16 @@ function selectObservationImage( } const MAX_REUSED_SESSION_MESSAGES = 64; +const RETRYABLE_MODEL_ERROR_MAX_RETRIES = 2; +const RETRYABLE_MODEL_ERROR_BASE_DELAY_MS = 1_000; + +interface RuntimeModelInfo { + provider: string; + api: string; + model: string; + currentApp: string; + stepNo: number; +} function isOpenAiLikeBaseUrl(baseUrl: string): boolean { const lower = baseUrl.toLowerCase(); @@ -81,6 +91,32 @@ function shouldShowCodexCliHint(modelId: string, baseUrl: string): boolean { return isOpenAiLikeBaseUrl(baseUrl) && isCodexCliCapableModelId(modelId); } +function isOpenAiLikeRuntimeModel(modelInfo: Pick): boolean { + const haystack = `${modelInfo.provider} ${modelInfo.api} ${modelInfo.model}`.toLowerCase(); + return haystack.includes("openai") || haystack.includes("codex") || haystack.includes("gpt-"); +} + +function hasRetryableServerErrorSignature(detail: string): boolean { + const lower = detail.toLowerCase(); + if ( + lower.includes("code=server_error") + || lower.includes("\"code\":\"server_error\"") + || lower.includes("type=server_error") + || lower.includes("\"type\":\"server_error\"") + ) { + return true; + } + return /\bstatus=(500|502|503|504)\b/i.test(detail) + || /"(?:status|statuscode)"\s*:\s*(500|502|503|504)\b/i.test(detail); +} + +function isRetryableUpstreamModelError( + detail: string, + modelInfo: Pick, +): boolean { + return isOpenAiLikeRuntimeModel(modelInfo) && hasRetryableServerErrorSignature(detail); +} + const PHONE_ONLY_TOOL_NAMES = new Set([ "tap", "tap_element", @@ -210,7 +246,7 @@ export async function runRuntimeAttempt( ); const autoSkillRefiner = new AutoSkillRefiner(deps.config); const reusedSessionMessages = session.reused ? loadReusedSessionMessages(session.path) : []; - let runtimeModelInfo = { + let runtimeModelInfo: RuntimeModelInfo = { provider: "unknown", api: "unknown", model: profile.model, @@ -389,6 +425,10 @@ export async function runRuntimeAttempt( const recordModelResponseError = (source: string, error: unknown): string => { const detail = formatDetailedError(error); + return recordFormattedModelResponseError(source, detail); + }; + + const recordFormattedModelResponseError = (source: string, detail: string): string => { runtimeModelInfo = { ...runtimeModelInfo, stepNo: ctx.stepCount, @@ -426,7 +466,21 @@ export async function runRuntimeAttempt( : requestedToolNames; const tools = deps.buildPhoneAgentTools(ctx, availableToolNamesForRun); const apiKey = auth.apiKey; - const turnFallbackTasks: Promise[] = []; + const continuationTasks = new Set>(); + let retryableModelErrorCount = 0; + + const trackContinuationTask = (task: Promise): void => { + continuationTasks.add(task); + void task.finally(() => { + continuationTasks.delete(task); + }); + }; + + const drainContinuationTasks = async (): Promise => { + while (continuationTasks.size > 0) { + await Promise.allSettled([...continuationTasks]); + } + }; const maybeEscalateSecureSurfaceTakeover = async (): Promise => { if (ctx.finishMessage || ctx.failMessage) { @@ -624,6 +678,8 @@ export async function runRuntimeAttempt( const bridgeState: { lastAssistantMessage: PiAssistantMessage | null } = { lastAssistantMessage: null, }; + const readLastBridgeAssistantMessage = (): PiAssistantMessage | null => bridgeState.lastAssistantMessage; + let bridgeRetryableModelErrorCount = 0; let abortRequested = false; let stopPollTimer: NodeJS.Timeout | null = null; const unsubscribe = bridge.subscribeRaw((event) => { @@ -647,7 +703,73 @@ export async function runRuntimeAttempt( ctx.failMessage = "Task stopped by user."; void bridge.abort().catch(() => {}); }, 250); - await bridge.prompt(`Task: ${request.task}`); + let promptText: string | null = `Task: ${request.task}`; + while (promptText && !ctx.finishMessage && !ctx.failMessage && !ctx.stopRequested()) { + bridgeState.lastAssistantMessage = null; + await bridge.prompt(promptText); + if (ctx.finishMessage || ctx.failMessage || ctx.stopRequested()) { + break; + } + const lastAssistantMessage = readLastBridgeAssistantMessage(); + if (!lastAssistantMessage) { + break; + } + if (lastAssistantMessage.stopReason === "error") { + const errorPayload = { + message: lastAssistantMessage.errorMessage || lastAssistantMessage.stopReason, + errorMessage: lastAssistantMessage.errorMessage, + stopReason: lastAssistantMessage.stopReason, + }; + const detail = formatDetailedError(errorPayload); + if ( + bridgeRetryableModelErrorCount < RETRYABLE_MODEL_ERROR_MAX_RETRIES + && isRetryableUpstreamModelError(detail, runtimeModelInfo) + ) { + bridgeRetryableModelErrorCount += 1; + const retryAttempt = bridgeRetryableModelErrorCount; + const delayMs = RETRYABLE_MODEL_ERROR_BASE_DELAY_MS * (2 ** (retryAttempt - 1)); + appendSessionEvent( + "model_response_retry_scheduled", + { + source: "pi_session_bridge", + provider: runtimeModelInfo.provider, + api: runtimeModelInfo.api, + model: runtimeModelInfo.model, + stepNo: ctx.stepCount, + currentApp: ctx.latestSnapshot?.currentApp ?? "unknown", + retryAttempt, + maxRetries: RETRYABLE_MODEL_ERROR_MAX_RETRIES, + delayMs, + }, + `model_response_retry_scheduled attempt=${retryAttempt}/${RETRYABLE_MODEL_ERROR_MAX_RETRIES} delay_ms=${delayMs}`, + ); + await sleep(delayMs); + if (ctx.finishMessage || ctx.failMessage || ctx.stopRequested()) { + break; + } + if (ctx.stepCount >= ctx.maxSteps) { + if (!completeBoundedCronRunIfNeeded(ctx)) { + ctx.failMessage = `Max steps reached (${ctx.maxSteps})`; + } + break; + } + promptText = `Step ${ctx.stepCount + 1}: continue executing the task.`; + continue; + } + ctx.failMessage = recordFormattedModelResponseError("pi_session_bridge", detail); + break; + } + if (lastAssistantMessage.stopReason === "aborted") { + ctx.failMessage = recordModelResponseError("pi_session_bridge", { + message: lastAssistantMessage.errorMessage || lastAssistantMessage.stopReason, + errorMessage: lastAssistantMessage.errorMessage, + stopReason: lastAssistantMessage.stopReason, + }); + break; + } + bridgeRetryableModelErrorCount = 0; + break; + } } finally { if (stopPollTimer) { clearInterval(stopPollTimer); @@ -1014,7 +1136,57 @@ export async function runRuntimeAttempt( if (assistantMessage.role !== "assistant") { return; } - if (assistantMessage.stopReason === "error" || assistantMessage.stopReason === "aborted") { + if (assistantMessage.stopReason === "error") { + const errorPayload = { + message: assistantMessage.errorMessage || assistantMessage.stopReason, + errorMessage: assistantMessage.errorMessage, + stopReason: assistantMessage.stopReason, + }; + const detail = formatDetailedError(errorPayload); + if ( + retryableModelErrorCount < RETRYABLE_MODEL_ERROR_MAX_RETRIES + && isRetryableUpstreamModelError(detail, runtimeModelInfo) + ) { + retryableModelErrorCount += 1; + const retryAttempt = retryableModelErrorCount; + const delayMs = RETRYABLE_MODEL_ERROR_BASE_DELAY_MS * (2 ** (retryAttempt - 1)); + appendSessionEvent( + "model_response_retry_scheduled", + { + source: "legacy_agent_core", + provider: runtimeModelInfo.provider, + api: runtimeModelInfo.api, + model: runtimeModelInfo.model, + stepNo: ctx.stepCount, + currentApp: ctx.latestSnapshot?.currentApp ?? "unknown", + retryAttempt, + maxRetries: RETRYABLE_MODEL_ERROR_MAX_RETRIES, + delayMs, + }, + `model_response_retry_scheduled attempt=${retryAttempt}/${RETRYABLE_MODEL_ERROR_MAX_RETRIES} delay_ms=${delayMs}`, + ); + trackContinuationTask((async () => { + try { + await sleep(delayMs); + if (ctx.finishMessage || ctx.failMessage || ctx.stopRequested()) { + return; + } + checkContinuation(); + if (!ctx.finishMessage && !ctx.failMessage && !ctx.stopRequested()) { + await agent.waitForIdle(); + } + } catch (error) { + if (!ctx.finishMessage && !ctx.failMessage) { + ctx.failMessage = recordModelResponseError("legacy_agent_core_retry", error); + } + } + })()); + return; + } + ctx.failMessage = recordFormattedModelResponseError("legacy_agent_core", detail); + return; + } + if (assistantMessage.stopReason === "aborted") { ctx.failMessage = recordModelResponseError("legacy_agent_core", { message: assistantMessage.errorMessage || assistantMessage.stopReason, errorMessage: assistantMessage.errorMessage, @@ -1022,28 +1194,33 @@ export async function runRuntimeAttempt( }); return; } + retryableModelErrorCount = 0; const hasToolCall = assistantMessage.content.some((item) => item.type === "toolCall"); if (!hasToolCall && !ctx.finishMessage && !ctx.failMessage) { const fallbackTask = (async () => { - const parsed = deps.parseTextualToolFallback(assistantMessage, ctx.task); - if (!parsed) { - ctx.failMessage = "Model response did not include a tool call."; - return; - } - const fallbackTool = tools.find((item) => item.name === parsed.toolName); - if (!fallbackTool) { - ctx.failMessage = `Model textual fallback resolved unknown tool '${parsed.toolName}'.`; - return; - } try { + const parsed = deps.parseTextualToolFallback(assistantMessage, ctx.task); + if (!parsed) { + ctx.failMessage = "Model response did not include a tool call."; + return; + } + const fallbackTool = tools.find((item) => item.name === parsed.toolName); + if (!fallbackTool) { + ctx.failMessage = `Model textual fallback resolved unknown tool '${parsed.toolName}'.`; + return; + } await fallbackTool.execute(`text-fallback-${Date.now()}`, parsed.params); + checkContinuation(); + if (!ctx.finishMessage && !ctx.failMessage && !ctx.stopRequested()) { + await agent.waitForIdle(); + } } catch (error) { - ctx.failMessage = `Textual tool fallback execution error: ${(error as Error).message}`; - return; + if (!ctx.finishMessage && !ctx.failMessage) { + ctx.failMessage = `Textual tool fallback execution error: ${(error as Error).message}`; + } } - checkContinuation(); })(); - turnFallbackTasks.push(fallbackTask); + trackContinuationTask(fallbackTask); return; } checkContinuation(); @@ -1053,9 +1230,7 @@ export async function runRuntimeAttempt( console.log(`[OpenPocket][agent-core] starting task: ${request.task}`); await agent.prompt(`Task: ${request.task}`); await agent.waitForIdle(); - if (turnFallbackTasks.length > 0) { - await Promise.allSettled(turnFallbackTasks); - } + await drainContinuationTasks(); const agentStateError = (agent as { state?: { error?: string } }).state?.error; if (!ctx.finishMessage && !ctx.failMessage && typeof agentStateError === "string" && agentStateError.trim()) { ctx.failMessage = recordModelResponseError("legacy_agent_core_state", agentStateError); diff --git a/test/runtime-seams.test.mjs b/test/runtime-seams.test.mjs index 9c6b748..bae5e29 100644 --- a/test/runtime-seams.test.mjs +++ b/test/runtime-seams.test.mjs @@ -124,6 +124,27 @@ function makeSnapshot(overrides = {}) { }; } +function createAssistantErrorMessage(errorMessage, model, stopReason = "error") { + return { + role: "assistant", + content: [], + api: model?.api ?? "openai-codex-responses", + provider: model?.provider ?? "openai-codex", + model: model?.id ?? "mock-model", + usage: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + stopReason, + errorMessage, + timestamp: Date.now(), + }; +} + test("runRuntimeTask keeps busy rejection contract", async () => { const result = await runRuntimeTask( { @@ -262,6 +283,102 @@ test("runRuntimeAttempt uses pi_session_bridge backend when configured", async ( assert.equal(agentFactoryCalls, 0); }); +test("runRuntimeAttempt retries retryable pi_session_bridge Codex server errors instead of failing immediately", async () => { + const runtime = createRuntimeWithApiKey(); + runtime.config.agent.runtimeBackend = "pi_session_bridge"; + + let promptCalls = 0; + const promptTexts = []; + const listeners = new Set(); + const deps = createAttemptDeps(runtime); + deps.piSessionBridgeFactory = async () => ({ + sessionId: "pi-bridge-session-retry", + sessionFile: "/tmp/pi-bridge-session-retry.jsonl", + prompt: async (text) => { + promptCalls += 1; + promptTexts.push(text); + const message = promptCalls === 1 + ? createAssistantErrorMessage( + 'Codex error: {"type":"error","error":{"type":"server_error","code":"server_error","message":"An error occurred while processing your request. Please include the request ID req-pi-bridge-retry-1 in your message.","param":null},"sequence_number":2}', + runtime.config.models[runtime.config.defaultModel], + ) + : { + role: "assistant", + content: [{ type: "text", text: "finish(message=\"pi-bridge-retried-ok\")" }], + stopReason: "stop", + timestamp: Date.now(), + }; + for (const listener of listeners) { + listener({ + type: "turn_end", + message, + toolResults: [], + }); + } + }, + abort: async () => {}, + dispose: () => {}, + subscribeRaw: (listener) => { + listeners.add(listener); + return () => listeners.delete(listener); + }, + subscribeNormalized: (_listener) => () => {}, + }); + + const outcome = await runRuntimeAttempt(deps, { + task: "pi-session-bridge retry transient codex server error", + availableToolNames: ["finish"], + }); + + assert.equal(outcome.result.ok, true); + assert.match(outcome.result.message, /pi-bridge-retried-ok/); + assert.equal(promptCalls, 2); + assert.equal(promptTexts[0], "Task: pi-session-bridge retry transient codex server error"); + assert.equal(promptTexts[1], "Step 1: continue executing the task."); +}); + +test("runRuntimeAttempt does not retry non-retryable pi_session_bridge Codex model errors", async () => { + const runtime = createRuntimeWithApiKey(); + runtime.config.agent.runtimeBackend = "pi_session_bridge"; + + let promptCalls = 0; + const listeners = new Set(); + const deps = createAttemptDeps(runtime); + deps.piSessionBridgeFactory = async () => ({ + sessionId: "pi-bridge-session-no-retry", + sessionFile: "/tmp/pi-bridge-session-no-retry.jsonl", + prompt: async () => { + promptCalls += 1; + for (const listener of listeners) { + listener({ + type: "turn_end", + message: createAssistantErrorMessage( + 'Codex error: {"type":"error","error":{"type":"invalid_request_error","code":"context_length_exceeded","message":"Context length exceeded.","param":null},"sequence_number":2}', + runtime.config.models[runtime.config.defaultModel], + ), + toolResults: [], + }); + } + }, + abort: async () => {}, + dispose: () => {}, + subscribeRaw: (listener) => { + listeners.add(listener); + return () => listeners.delete(listener); + }, + subscribeNormalized: (_listener) => () => {}, + }); + + const outcome = await runRuntimeAttempt(deps, { + task: "pi-session-bridge do not retry invalid request errors", + availableToolNames: ["finish"], + }); + + assert.equal(outcome.result.ok, false); + assert.match(outcome.result.message, /context_length_exceeded/); + assert.equal(promptCalls, 1); +}); + test("runRuntimeAttempt falls back to legacy backend when phone-only tools are requested", async () => { const runtime = createRuntimeWithApiKey(); runtime.config.agent.runtimeBackend = "pi_session_bridge"; @@ -383,3 +500,150 @@ test("runRuntimeAttempt treats bounded cron step budget exhaustion as a normal c assert.match(outcome.result.message, /scheduled run window/i); assert.doesNotMatch(outcome.result.message, /Max steps reached/i); }); + +test("runRuntimeAttempt retries retryable Codex server errors instead of failing immediately", async () => { + const runtime = createRuntimeWithApiKey(); + runtime.adb = { + queryLaunchablePackages: () => [], + resolveDeviceId: () => "emulator-5554", + captureScreenSnapshot: () => makeSnapshot({ currentApp: "com.twitter.android" }), + executeAction: async () => "ok", + }; + + let followUpCalls = 0; + runtime.agentFactory = (options) => { + const listeners = new Set(); + let idlePromise = Promise.resolve(); + + const emit = (event) => { + for (const listener of listeners) { + listener(event); + } + }; + + return { + followUp() { + followUpCalls += 1; + idlePromise = (async () => { + if (options.transformContext) { + await options.transformContext([]); + } + const finishTool = options.initialState?.tools?.find((item) => item.name === "finish"); + if (!finishTool) { + throw new Error("finish tool not found"); + } + await finishTool.execute("tc-retry-finish", { + thought: "retry after transient codex error", + message: "retried ok", + }); + emit({ + type: "turn_end", + message: { + role: "assistant", + content: [{ + type: "toolCall", + id: "tc-retry-finish", + name: "finish", + arguments: { + thought: "retry after transient codex error", + message: "retried ok", + }, + }], + stopReason: "toolUse", + timestamp: Date.now(), + }, + toolResults: [], + }); + })(); + }, + subscribe(listener) { + listeners.add(listener); + }, + async prompt() { + idlePromise = (async () => { + if (options.transformContext) { + await options.transformContext([]); + } + emit({ + type: "turn_end", + message: createAssistantErrorMessage( + 'Codex error: {"type":"error","error":{"type":"server_error","code":"server_error","message":"An error occurred while processing your request. Please include the request ID req-codex-retry-1 in your message.","param":null},"sequence_number":2}', + options.initialState?.model, + ), + toolResults: [], + }); + })(); + await idlePromise; + }, + async waitForIdle() { + await idlePromise; + }, + abort() {}, + }; + }; + + const outcome = await runRuntimeAttempt(createAttemptDeps(runtime), { + task: "Retry transient codex server error", + availableToolNames: ["finish"], + }); + + assert.equal(outcome.result.ok, true); + assert.match(outcome.result.message, /retried ok/); + assert.equal(followUpCalls, 1); +}); + +test("runRuntimeAttempt does not retry non-retryable Codex model errors", async () => { + const runtime = createRuntimeWithApiKey(); + runtime.adb = { + queryLaunchablePackages: () => [], + resolveDeviceId: () => "emulator-5554", + captureScreenSnapshot: () => makeSnapshot({ currentApp: "com.twitter.android" }), + executeAction: async () => "ok", + }; + + let followUpCalls = 0; + runtime.agentFactory = (options) => { + const listeners = new Set(); + let idlePromise = Promise.resolve(); + + return { + followUp() { + followUpCalls += 1; + }, + subscribe(listener) { + listeners.add(listener); + }, + async prompt() { + idlePromise = (async () => { + if (options.transformContext) { + await options.transformContext([]); + } + for (const listener of listeners) { + listener({ + type: "turn_end", + message: createAssistantErrorMessage( + 'Codex error: {"type":"error","error":{"type":"invalid_request_error","code":"context_length_exceeded","message":"Context length exceeded.","param":null},"sequence_number":2}', + options.initialState?.model, + ), + toolResults: [], + }); + } + })(); + await idlePromise; + }, + async waitForIdle() { + await idlePromise; + }, + abort() {}, + }; + }; + + const outcome = await runRuntimeAttempt(createAttemptDeps(runtime), { + task: "Do not retry invalid request errors", + availableToolNames: ["finish"], + }); + + assert.equal(outcome.result.ok, false); + assert.match(outcome.result.message, /context_length_exceeded/); + assert.equal(followUpCalls, 0); +});