From ed59cd170a32f0590d0c030fb511e1209caacc7c Mon Sep 17 00:00:00 2001 From: Matt Apperson Date: Thu, 20 Nov 2025 18:27:20 -0500 Subject: [PATCH] fix: address PR #49 review feedback - Add type guard for safer response type checking - Refactor IIFE patterns in getMessage/getText to private helper methods - Add braces to single-line if statement for readability - Add validation before assigning finalResponse - Implement parallel tool calling with Promise.allSettled for better error handling --- src/lib/response-wrapper.ts | 83 ++++++++++++++++++++++++++---------- src/lib/tool-orchestrator.ts | 46 ++++++++++++++------ 2 files changed, 94 insertions(+), 35 deletions(-) diff --git a/src/lib/response-wrapper.ts b/src/lib/response-wrapper.ts index 95ad7a58..9596d0fb 100644 --- a/src/lib/response-wrapper.ts +++ b/src/lib/response-wrapper.ts @@ -70,6 +70,20 @@ export class ResponseWrapper { this.options = options; } + /** + * Type guard to check if a value is a non-streaming response + */ + private isNonStreamingResponse(value: unknown): value is models.OpenResponsesNonStreamingResponse { + return ( + value !== null && + typeof value === "object" && + "id" in value && + "object" in value && + "output" in value && + !("toReadableStream" in value) + ); + } + /** * Initialize the stream if not already started * This is idempotent - multiple calls will return the same promise @@ -265,21 +279,60 @@ export class ResponseWrapper { const value = newResult.value; if (value && typeof value === "object" && "toReadableStream" in value) { // It's a stream, consume it - const stream = new ReusableReadableStream(value as EventStream); + const streamValue = value as EventStream; + const stream = new ReusableReadableStream(streamValue); currentResponse = await consumeStreamForCompletion(stream); + } else if (this.isNonStreamingResponse(value)) { + currentResponse = value; } else { - currentResponse = value as models.OpenResponsesNonStreamingResponse; + throw new Error("Unexpected response type from API"); } currentRound++; } + // Validate the final response has required fields + if (!currentResponse || !currentResponse.id || !currentResponse.output) { + throw new Error("Invalid final response: missing required fields"); + } + + // Ensure the response is in a completed state (has output content) + if (!Array.isArray(currentResponse.output) || currentResponse.output.length === 0) { + throw new Error("Invalid final response: empty or invalid output"); + } + this.finalResponse = currentResponse; })(); return this.toolExecutionPromise; } + /** + * Internal helper to get the message after tool execution + */ + private async getMessageInternal(): Promise { + await this.executeToolsIfNeeded(); + + if (!this.finalResponse) { + throw new Error("Response not available"); + } + + return extractMessageFromResponse(this.finalResponse); + } + + /** + * Internal helper to get the text after tool execution + */ + private async getTextInternal(): Promise { + await this.executeToolsIfNeeded(); + + if (!this.finalResponse) { + throw new Error("Response not available"); + } + + return extractTextFromResponse(this.finalResponse); + } + /** * Get the completed message from the response. * This will consume the stream until completion, execute any tools, and extract the first message. @@ -290,16 +343,7 @@ export class ResponseWrapper { return this.messagePromise; } - this.messagePromise = (async (): Promise => { - await this.executeToolsIfNeeded(); - - if (!this.finalResponse) { - throw new Error("Response not available"); - } - - return extractMessageFromResponse(this.finalResponse); - })(); - + this.messagePromise = this.getMessageInternal(); return this.messagePromise; } @@ -312,16 +356,7 @@ export class ResponseWrapper { return this.textPromise; } - this.textPromise = (async () => { - await this.executeToolsIfNeeded(); - - if (!this.finalResponse) { - throw new Error("Response not available"); - } - - return extractTextFromResponse(this.finalResponse); - })(); - + this.textPromise = this.getTextInternal(); return this.textPromise; } @@ -501,7 +536,9 @@ export class ResponseWrapper { const consumer = this.reusableStream.createConsumer(); for await (const event of consumer) { - if (!("type" in event)) continue; + if (!("type" in event)) { + continue; + } // Transform responses events to chat-like format // This is a simplified transformation - you may need to adjust based on your needs diff --git a/src/lib/tool-orchestrator.ts b/src/lib/tool-orchestrator.ts index d5524bd7..5b0d6c9a 100644 --- a/src/lib/tool-orchestrator.ts +++ b/src/lib/tool-orchestrator.ts @@ -88,26 +88,23 @@ export async function executeToolLoop( break; } - // Execute all tool calls - const roundResults: ToolExecutionResult[] = []; - - for (const toolCall of toolCalls) { + // Execute all tool calls in parallel (parallel tool calling) + const toolCallPromises = toolCalls.map(async (toolCall) => { const tool = findToolByName(tools, toolCall.name); if (!tool) { // Tool not found in definitions - roundResults.push({ + return { toolCallId: toolCall.id, toolName: toolCall.name, result: null, error: new Error(`Tool "${toolCall.name}" not found in tool definitions`), - }); - continue; + } as ToolExecutionResult; } if (!hasExecuteFunction(tool)) { - // Tool has no execute function - skip - continue; + // Tool has no execute function - return null to filter out + return null; } // Build turn context @@ -117,9 +114,34 @@ export async function executeToolLoop( }; // Execute the tool - const result = await executeTool(tool, toolCall, turnContext, onPreliminaryResult); - roundResults.push(result); - } + return executeTool(tool, toolCall, turnContext, onPreliminaryResult); + }); + + // Wait for all tool executions to complete in parallel + const settledResults = await Promise.allSettled(toolCallPromises); + + // Process settled results, handling both fulfilled and rejected promises + const roundResults: ToolExecutionResult[] = []; + settledResults.forEach((settled, i) => { + const toolCall = toolCalls[i]; + if (!toolCall) return; + + if (settled.status === "fulfilled") { + if (settled.value !== null) { + roundResults.push(settled.value); + } + } else { + // Promise rejected - create error result + roundResults.push({ + toolCallId: toolCall.id, + toolName: toolCall.name, + result: null, + error: settled.reason instanceof Error + ? settled.reason + : new Error(String(settled.reason)), + }); + } + }); toolExecutionResults.push(...roundResults);