Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 196 additions & 21 deletions src/agent/runtime/attempt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@ function selectObservationImage(
}

const MAX_REUSED_SESSION_MESSAGES = 64;
const RETRYABLE_MODEL_ERROR_MAX_RETRIES = 2;
const RETRYABLE_MODEL_ERROR_BASE_DELAY_MS = 1_000;

interface RuntimeModelInfo {
provider: string;
api: string;
model: string;
currentApp: string;
stepNo: number;
}

function isOpenAiLikeBaseUrl(baseUrl: string): boolean {
const lower = baseUrl.toLowerCase();
Expand All @@ -81,6 +91,32 @@ function shouldShowCodexCliHint(modelId: string, baseUrl: string): boolean {
return isOpenAiLikeBaseUrl(baseUrl) && isCodexCliCapableModelId(modelId);
}

function isOpenAiLikeRuntimeModel(modelInfo: Pick<RuntimeModelInfo, "provider" | "api" | "model">): boolean {
const haystack = `${modelInfo.provider} ${modelInfo.api} ${modelInfo.model}`.toLowerCase();
return haystack.includes("openai") || haystack.includes("codex") || haystack.includes("gpt-");
}

function hasRetryableServerErrorSignature(detail: string): boolean {
const lower = detail.toLowerCase();
if (
lower.includes("code=server_error")
|| lower.includes("\"code\":\"server_error\"")
|| lower.includes("type=server_error")
|| lower.includes("\"type\":\"server_error\"")
) {
return true;
}
return /\bstatus=(500|502|503|504)\b/i.test(detail)
|| /"(?:status|statuscode)"\s*:\s*(500|502|503|504)\b/i.test(detail);
}

function isRetryableUpstreamModelError(
detail: string,
modelInfo: Pick<RuntimeModelInfo, "provider" | "api" | "model">,
): boolean {
return isOpenAiLikeRuntimeModel(modelInfo) && hasRetryableServerErrorSignature(detail);
}

const PHONE_ONLY_TOOL_NAMES = new Set([
"tap",
"tap_element",
Expand Down Expand Up @@ -210,7 +246,7 @@ export async function runRuntimeAttempt(
);
const autoSkillRefiner = new AutoSkillRefiner(deps.config);
const reusedSessionMessages = session.reused ? loadReusedSessionMessages(session.path) : [];
let runtimeModelInfo = {
let runtimeModelInfo: RuntimeModelInfo = {
provider: "unknown",
api: "unknown",
model: profile.model,
Expand Down Expand Up @@ -389,6 +425,10 @@ export async function runRuntimeAttempt(

const recordModelResponseError = (source: string, error: unknown): string => {
const detail = formatDetailedError(error);
return recordFormattedModelResponseError(source, detail);
};

const recordFormattedModelResponseError = (source: string, detail: string): string => {
runtimeModelInfo = {
...runtimeModelInfo,
stepNo: ctx.stepCount,
Expand Down Expand Up @@ -426,7 +466,21 @@ export async function runRuntimeAttempt(
: requestedToolNames;
const tools = deps.buildPhoneAgentTools(ctx, availableToolNamesForRun);
const apiKey = auth.apiKey;
const turnFallbackTasks: Promise<void>[] = [];
const continuationTasks = new Set<Promise<void>>();
let retryableModelErrorCount = 0;

const trackContinuationTask = (task: Promise<void>): void => {
continuationTasks.add(task);
void task.finally(() => {
continuationTasks.delete(task);
});
};

const drainContinuationTasks = async (): Promise<void> => {
while (continuationTasks.size > 0) {
await Promise.allSettled([...continuationTasks]);
}
};

const maybeEscalateSecureSurfaceTakeover = async (): Promise<void> => {
if (ctx.finishMessage || ctx.failMessage) {
Expand Down Expand Up @@ -624,6 +678,8 @@ export async function runRuntimeAttempt(
const bridgeState: { lastAssistantMessage: PiAssistantMessage | null } = {
lastAssistantMessage: null,
};
const readLastBridgeAssistantMessage = (): PiAssistantMessage | null => bridgeState.lastAssistantMessage;
let bridgeRetryableModelErrorCount = 0;
let abortRequested = false;
let stopPollTimer: NodeJS.Timeout | null = null;
const unsubscribe = bridge.subscribeRaw((event) => {
Expand All @@ -647,7 +703,73 @@ export async function runRuntimeAttempt(
ctx.failMessage = "Task stopped by user.";
void bridge.abort().catch(() => {});
}, 250);
await bridge.prompt(`Task: ${request.task}`);
let promptText: string | null = `Task: ${request.task}`;
while (promptText && !ctx.finishMessage && !ctx.failMessage && !ctx.stopRequested()) {
bridgeState.lastAssistantMessage = null;
await bridge.prompt(promptText);
if (ctx.finishMessage || ctx.failMessage || ctx.stopRequested()) {
break;
}
const lastAssistantMessage = readLastBridgeAssistantMessage();
if (!lastAssistantMessage) {
break;
}
if (lastAssistantMessage.stopReason === "error") {
const errorPayload = {
message: lastAssistantMessage.errorMessage || lastAssistantMessage.stopReason,
errorMessage: lastAssistantMessage.errorMessage,
stopReason: lastAssistantMessage.stopReason,
};
const detail = formatDetailedError(errorPayload);
if (
bridgeRetryableModelErrorCount < RETRYABLE_MODEL_ERROR_MAX_RETRIES
&& isRetryableUpstreamModelError(detail, runtimeModelInfo)
) {
bridgeRetryableModelErrorCount += 1;
const retryAttempt = bridgeRetryableModelErrorCount;
const delayMs = RETRYABLE_MODEL_ERROR_BASE_DELAY_MS * (2 ** (retryAttempt - 1));
appendSessionEvent(
"model_response_retry_scheduled",
{
source: "pi_session_bridge",
provider: runtimeModelInfo.provider,
api: runtimeModelInfo.api,
model: runtimeModelInfo.model,
stepNo: ctx.stepCount,
currentApp: ctx.latestSnapshot?.currentApp ?? "unknown",
retryAttempt,
maxRetries: RETRYABLE_MODEL_ERROR_MAX_RETRIES,
delayMs,
},
`model_response_retry_scheduled attempt=${retryAttempt}/${RETRYABLE_MODEL_ERROR_MAX_RETRIES} delay_ms=${delayMs}`,
);
await sleep(delayMs);
if (ctx.finishMessage || ctx.failMessage || ctx.stopRequested()) {
break;
}
if (ctx.stepCount >= ctx.maxSteps) {
if (!completeBoundedCronRunIfNeeded(ctx)) {
ctx.failMessage = `Max steps reached (${ctx.maxSteps})`;
}
break;
}
promptText = `Step ${ctx.stepCount + 1}: continue executing the task.`;
continue;
}
ctx.failMessage = recordFormattedModelResponseError("pi_session_bridge", detail);
break;
}
if (lastAssistantMessage.stopReason === "aborted") {
ctx.failMessage = recordModelResponseError("pi_session_bridge", {
message: lastAssistantMessage.errorMessage || lastAssistantMessage.stopReason,
errorMessage: lastAssistantMessage.errorMessage,
stopReason: lastAssistantMessage.stopReason,
});
break;
}
bridgeRetryableModelErrorCount = 0;
break;
}
} finally {
if (stopPollTimer) {
clearInterval(stopPollTimer);
Expand Down Expand Up @@ -1014,36 +1136,91 @@ export async function runRuntimeAttempt(
if (assistantMessage.role !== "assistant") {
return;
}
if (assistantMessage.stopReason === "error" || assistantMessage.stopReason === "aborted") {
if (assistantMessage.stopReason === "error") {
const errorPayload = {
message: assistantMessage.errorMessage || assistantMessage.stopReason,
errorMessage: assistantMessage.errorMessage,
stopReason: assistantMessage.stopReason,
};
const detail = formatDetailedError(errorPayload);
if (
retryableModelErrorCount < RETRYABLE_MODEL_ERROR_MAX_RETRIES
&& isRetryableUpstreamModelError(detail, runtimeModelInfo)
) {
retryableModelErrorCount += 1;
const retryAttempt = retryableModelErrorCount;
const delayMs = RETRYABLE_MODEL_ERROR_BASE_DELAY_MS * (2 ** (retryAttempt - 1));
appendSessionEvent(
"model_response_retry_scheduled",
{
source: "legacy_agent_core",
provider: runtimeModelInfo.provider,
api: runtimeModelInfo.api,
model: runtimeModelInfo.model,
stepNo: ctx.stepCount,
currentApp: ctx.latestSnapshot?.currentApp ?? "unknown",
retryAttempt,
maxRetries: RETRYABLE_MODEL_ERROR_MAX_RETRIES,
delayMs,
},
`model_response_retry_scheduled attempt=${retryAttempt}/${RETRYABLE_MODEL_ERROR_MAX_RETRIES} delay_ms=${delayMs}`,
);
trackContinuationTask((async () => {
try {
await sleep(delayMs);
if (ctx.finishMessage || ctx.failMessage || ctx.stopRequested()) {
return;
}
checkContinuation();
if (!ctx.finishMessage && !ctx.failMessage && !ctx.stopRequested()) {
await agent.waitForIdle();
}
} catch (error) {
if (!ctx.finishMessage && !ctx.failMessage) {
ctx.failMessage = recordModelResponseError("legacy_agent_core_retry", error);
}
}
})());
return;
}
ctx.failMessage = recordFormattedModelResponseError("legacy_agent_core", detail);
return;
}
if (assistantMessage.stopReason === "aborted") {
ctx.failMessage = recordModelResponseError("legacy_agent_core", {
message: assistantMessage.errorMessage || assistantMessage.stopReason,
errorMessage: assistantMessage.errorMessage,
stopReason: assistantMessage.stopReason,
});
return;
}
retryableModelErrorCount = 0;
const hasToolCall = assistantMessage.content.some((item) => item.type === "toolCall");
if (!hasToolCall && !ctx.finishMessage && !ctx.failMessage) {
const fallbackTask = (async () => {
const parsed = deps.parseTextualToolFallback(assistantMessage, ctx.task);
if (!parsed) {
ctx.failMessage = "Model response did not include a tool call.";
return;
}
const fallbackTool = tools.find((item) => item.name === parsed.toolName);
if (!fallbackTool) {
ctx.failMessage = `Model textual fallback resolved unknown tool '${parsed.toolName}'.`;
return;
}
try {
const parsed = deps.parseTextualToolFallback(assistantMessage, ctx.task);
if (!parsed) {
ctx.failMessage = "Model response did not include a tool call.";
return;
}
const fallbackTool = tools.find((item) => item.name === parsed.toolName);
if (!fallbackTool) {
ctx.failMessage = `Model textual fallback resolved unknown tool '${parsed.toolName}'.`;
return;
}
await fallbackTool.execute(`text-fallback-${Date.now()}`, parsed.params);
checkContinuation();
if (!ctx.finishMessage && !ctx.failMessage && !ctx.stopRequested()) {
await agent.waitForIdle();
}
} catch (error) {
ctx.failMessage = `Textual tool fallback execution error: ${(error as Error).message}`;
return;
if (!ctx.finishMessage && !ctx.failMessage) {
ctx.failMessage = `Textual tool fallback execution error: ${(error as Error).message}`;
}
}
checkContinuation();
})();
turnFallbackTasks.push(fallbackTask);
trackContinuationTask(fallbackTask);
return;
}
checkContinuation();
Expand All @@ -1053,9 +1230,7 @@ export async function runRuntimeAttempt(
console.log(`[OpenPocket][agent-core] starting task: ${request.task}`);
await agent.prompt(`Task: ${request.task}`);
await agent.waitForIdle();
if (turnFallbackTasks.length > 0) {
await Promise.allSettled(turnFallbackTasks);
}
await drainContinuationTasks();
const agentStateError = (agent as { state?: { error?: string } }).state?.error;
if (!ctx.finishMessage && !ctx.failMessage && typeof agentStateError === "string" && agentStateError.trim()) {
ctx.failMessage = recordModelResponseError("legacy_agent_core_state", agentStateError);
Expand Down
Loading
Loading