Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 55 additions & 21 deletions src/lib/response-wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,20 @@ export class ResponseWrapper {
this.options = options;
}

/**
* Type guard to check if a value is a non-streaming response
*/
private isNonStreamingResponse(value: unknown): value is models.OpenResponsesNonStreamingResponse {
return (
value !== null &&
typeof value === "object" &&
"id" in value &&
"object" in value &&
"output" in value &&
!("toReadableStream" in value)
);
}

/**
* Initialize the stream if not already started
* This is idempotent - multiple calls will return the same promise
Expand Down Expand Up @@ -284,19 +298,57 @@ export class ResponseWrapper {
value as EventStream<models.OpenResponsesStreamEvent>,
);
currentResponse = await consumeStreamForCompletion(stream);
} else if (this.isNonStreamingResponse(value)) {
currentResponse = value;
} else {
currentResponse = value as models.OpenResponsesNonStreamingResponse;
throw new Error("Unexpected response type from API");
}

currentRound++;
}

// Validate the final response has required fields
if (!currentResponse || !currentResponse.id || !currentResponse.output) {
throw new Error("Invalid final response: missing required fields");
}

// Ensure the response is in a completed state (has output content)
if (!Array.isArray(currentResponse.output) || currentResponse.output.length === 0) {
throw new Error("Invalid final response: empty or invalid output");
}

this.finalResponse = currentResponse;
})();

return this.toolExecutionPromise;
}

/**
* Internal helper to get the message after tool execution
*/
private async getMessageInternal(): Promise<models.AssistantMessage> {
await this.executeToolsIfNeeded();

if (!this.finalResponse) {
throw new Error("Response not available");
}

return extractMessageFromResponse(this.finalResponse);
}

/**
* Internal helper to get the text after tool execution
*/
private async getTextInternal(): Promise<string> {
await this.executeToolsIfNeeded();

if (!this.finalResponse) {
throw new Error("Response not available");
}

return extractTextFromResponse(this.finalResponse);
}

/**
* Get the completed message from the response.
* This will consume the stream until completion, execute any tools, and extract the first message.
Expand All @@ -307,16 +359,7 @@ export class ResponseWrapper {
return this.messagePromise;
}

this.messagePromise = (async (): Promise<models.AssistantMessage> => {
await this.executeToolsIfNeeded();

if (!this.finalResponse) {
throw new Error('Response not available');
}

return extractMessageFromResponse(this.finalResponse);
})();

this.messagePromise = this.getMessageInternal();
return this.messagePromise;
}

Expand All @@ -329,16 +372,7 @@ export class ResponseWrapper {
return this.textPromise;
}

this.textPromise = (async () => {
await this.executeToolsIfNeeded();

if (!this.finalResponse) {
throw new Error('Response not available');
}

return extractTextFromResponse(this.finalResponse);
})();

this.textPromise = this.getTextInternal();
return this.textPromise;
}

Expand Down
46 changes: 34 additions & 12 deletions src/lib/tool-orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,26 +80,23 @@ export async function executeToolLoop(
break;
}

// Execute all tool calls
const roundResults: ToolExecutionResult[] = [];

for (const toolCall of toolCalls) {
// Execute all tool calls in parallel (parallel tool calling)
const toolCallPromises = toolCalls.map(async (toolCall) => {
const tool = findToolByName(tools, toolCall.name);

if (!tool) {
// Tool not found in definitions
roundResults.push({
return {
toolCallId: toolCall.id,
toolName: toolCall.name,
result: null,
error: new Error(`Tool "${toolCall.name}" not found in tool definitions`),
});
continue;
} as ToolExecutionResult;
}

if (!hasExecuteFunction(tool)) {
// Tool has no execute function - skip
continue;
// Tool has no execute function - return null to filter out
return null;
}

// Build turn context
Expand All @@ -109,9 +106,34 @@ export async function executeToolLoop(
};

// Execute the tool
const result = await executeTool(tool, toolCall, turnContext, onPreliminaryResult);
roundResults.push(result);
}
return executeTool(tool, toolCall, turnContext, onPreliminaryResult);
});

// Wait for all tool executions to complete in parallel
const settledResults = await Promise.allSettled(toolCallPromises);

// Process settled results, handling both fulfilled and rejected promises
const roundResults: ToolExecutionResult[] = [];
settledResults.forEach((settled, i) => {
const toolCall = toolCalls[i];
if (!toolCall) return;

if (settled.status === "fulfilled") {
if (settled.value !== null) {
roundResults.push(settled.value);
}
} else {
// Promise rejected - create error result
roundResults.push({
toolCallId: toolCall.id,
toolName: toolCall.name,
result: null,
error: settled.reason instanceof Error
? settled.reason
: new Error(String(settled.reason)),
});
}
});

toolExecutionResults.push(...roundResults);

Expand Down