diff --git a/external/mcp/package.json b/external/mcp/package.json index ef3a5fe15..94538da2d 100644 --- a/external/mcp/package.json +++ b/external/mcp/package.json @@ -41,7 +41,7 @@ "@microsoft/teams.apps": "2.0.5", "@microsoft/teams.common": "2.0.5", "@microsoft/teams.dev": "2.0.5", - "@modelcontextprotocol/sdk": "^1.9.0" + "@modelcontextprotocol/sdk": "^1.25.0" }, "peerDependenciesMeta": { "@microsoft/teams.dev": { diff --git a/external/mcp/src/connection.ts b/external/mcp/src/connection.ts index 501763ab3..89b360af2 100644 --- a/external/mcp/src/connection.ts +++ b/external/mcp/src/connection.ts @@ -1,7 +1,10 @@ import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js'; +import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'; + +export type ServerTransport = SSEServerTransport | StreamableHTTPServerTransport; export interface IConnection { readonly id: number; - readonly transport: SSEServerTransport; + readonly transport: ServerTransport; readonly createdAt: Date; } diff --git a/external/mcp/src/plugin.ts b/external/mcp/src/plugin.ts index 3c763d300..1ef0eddd9 100644 --- a/external/mcp/src/plugin.ts +++ b/external/mcp/src/plugin.ts @@ -4,6 +4,7 @@ import { ServerOptions } from '@modelcontextprotocol/sdk/server/index.js'; import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js'; import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'; +import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'; import { CallToolResult } from '@modelcontextprotocol/sdk/types.js'; import { jsonSchemaToZod } from 'json-schema-to-zod'; @@ -41,6 +42,28 @@ export type McpSSETransportOptions = { readonly path?: string; }; +/** + * MCP transport options for streamable-http + */ +export type McpStreamableHTTPTransportOptions = { + /** + * the transport type + */ + readonly type: 'streamable-http'; + + /** + * the url path + * @default /mcp + */ + readonly path?: string; + + /** + * whether to use stateful sessions + * @default true + */ + readonly stateful?: boolean; +}; + /** * MCP transport options for stdio */ @@ -61,6 +84,14 @@ export type McpStdioTransportOptions = { readonly stdout?: Writable; }; +/** + * Union type for all MCP transport options + */ +export type McpTransportOptions = + | McpSSETransportOptions + | McpStreamableHTTPTransportOptions + | McpStdioTransportOptions; + export type McpPluginOptions = ServerOptions & { /** * the MCP server name @@ -83,7 +114,7 @@ export type McpPluginOptions = ServerOptions & { * the transport or transport options * @default sse */ - readonly transport?: McpSSETransportOptions | McpStdioTransportOptions; + readonly transport?: McpTransportOptions; /** * the url to use for the local @@ -121,9 +152,10 @@ export class McpPlugin implements IPlugin { protected id: number = -1; protected inspector: string; protected connections: Record = {}; - protected transport: McpSSETransportOptions | McpStdioTransportOptions = { + protected transport: McpTransportOptions = { type: 'sse', }; + protected httpSessions: Map = new Map(); constructor(options: McpServer | McpPluginOptions = {}) { this.inspector = @@ -197,20 +229,31 @@ export class McpPlugin implements IPlugin { url: this.inspector, }); - if (this.transport.type === 'sse') { - return this.onInitSSE(this.httpPlugin, this.transport); + switch (this.transport.type) { + case 'sse': + return this.onInitSSE(this.httpPlugin, this.transport); + case 'streamable-http': + return this.onInitStreamableHTTP(this.httpPlugin, this.transport); + case 'stdio': + return this.onInitStdio(this.transport); } - - return this.onInitStdio(this.transport); } onStart({ port }: IPluginStartEvent) { - if (this.transport.type === 'sse') { - this.logger.info( - `listening at http://localhost:${port}${this.transport.path || '/mcp'}`, - ); - } else { - this.logger.info('listening on stdin'); + switch (this.transport.type) { + case 'sse': + this.logger.info( + `listening at http://localhost:${port}${this.transport.path || '/mcp'} (SSE)`, + ); + break; + case 'streamable-http': + this.logger.info( + `listening at http://localhost:${port}${this.transport.path || '/mcp'} (Streamable HTTP)`, + ); + break; + case 'stdio': + this.logger.info('listening on stdin'); + break; } } @@ -251,6 +294,94 @@ export class McpPlugin implements IPlugin { }); } + protected onInitStreamableHTTP( + http: HttpPlugin, + options: McpStreamableHTTPTransportOptions + ) { + const path = options.path || '/mcp'; + const stateful = options.stateful !== false; // default to true + + // POST handler - main request handler for MCP messages + http.post(path, async (req, res) => { + const sessionId = req.headers['mcp-session-id'] as string | undefined; + + // Check for existing session + if (sessionId && this.httpSessions.has(sessionId)) { + const transport = this.httpSessions.get(sessionId)!; + await transport.handleRequest(req, res); + return; + } + + // Create new session for stateful mode or handle stateless request + const transport = new StreamableHTTPServerTransport({ + sessionIdGenerator: stateful ? undefined : () => undefined, + }); + + // For stateful sessions, store the transport + if (stateful) { + transport.onclose = () => { + const sid = transport.sessionId; + if (sid) { + this.httpSessions.delete(sid); + this.logger.debug(`Session ${sid} closed`); + } + }; + } + + // Connect to the MCP server + await this.server.connect(transport); + + // Handle the initial request + await transport.handleRequest(req, res); + + // Store session after handling first request (sessionId is set) + if (stateful && transport.sessionId) { + this.httpSessions.set(transport.sessionId, transport); + this.logger.debug(`Session ${transport.sessionId} created`); + } + }); + + // GET handler - for SSE stream (reconnection in stateful mode) + http.get(path, async (req, res) => { + const sessionId = req.headers['mcp-session-id'] as string | undefined; + + if (!sessionId || !this.httpSessions.has(sessionId)) { + res.status(400).json({ + jsonrpc: '2.0', + error: { + code: -32000, + message: 'Bad Request: No valid session ID provided', + }, + id: null, + }); + return; + } + + const transport = this.httpSessions.get(sessionId)!; + await transport.handleRequest(req, res); + }); + + // DELETE handler - for session termination + http.delete(path, async (req, res) => { + const sessionId = req.headers['mcp-session-id'] as string | undefined; + + if (!sessionId || !this.httpSessions.has(sessionId)) { + res.status(400).json({ + jsonrpc: '2.0', + error: { + code: -32000, + message: 'Bad Request: No valid session ID provided', + }, + id: null, + }); + return; + } + + const transport = this.httpSessions.get(sessionId)!; + await transport.handleRequest(req, res); + }); + } + protected onToolCall(name: string, prompt: IChatPrompt) { return async (args: any): Promise => { try { diff --git a/packages/ai/src/local-memory.ts b/packages/ai/src/local-memory.ts index 928e2501b..540b2b1e8 100644 --- a/packages/ai/src/local-memory.ts +++ b/packages/ai/src/local-memory.ts @@ -45,9 +45,10 @@ export class LocalMemory implements IMemory { } while ( - len > (this.options.max || 100) || - (this.messages[0].role === 'model' && this.messages[0].function_calls?.length) || - this.messages[0].role === 'function' + len > 0 && + (len > (this.options.max || 100) || + (this.messages[0]?.role === 'model' && this.messages[0]?.function_calls?.length) || + this.messages[0]?.role === 'function') ) { const removed = this.pop(); @@ -87,8 +88,9 @@ export class LocalMemory implements IMemory { let last = this.messages[end]; - while ((last.role === 'model' && last.function_calls?.length) || last.role === 'function') { + while (last && ((last.role === 'model' && last.function_calls?.length) || last.role === 'function')) { end++; + if (end >= this.messages.length) break; last = this.messages[end]; } diff --git a/packages/openai/src/index.ts b/packages/openai/src/index.ts index 66735df64..37e7b0d05 100644 --- a/packages/openai/src/index.ts +++ b/packages/openai/src/index.ts @@ -1,2 +1,3 @@ export * from './chat'; export * from './audio'; +export * from './responses'; diff --git a/packages/openai/src/responses.ts b/packages/openai/src/responses.ts new file mode 100644 index 000000000..327194c87 --- /dev/null +++ b/packages/openai/src/responses.ts @@ -0,0 +1,468 @@ +import OpenAI, { AzureOpenAI } from 'openai'; +import { Fetch } from 'openai/core'; + +import { + ChatSendOptions, + IChatModel, + LocalMemory, + Message, + ModelMessage, + FunctionMessage, + Function, + FunctionCall, +} from '@microsoft/teams.ai'; +import { ConsoleLogger, ILogger } from '@microsoft/teams.common/logging'; + +// Type aliases for OpenAI Responses API types +type ResponseInputItem = OpenAI.Responses.ResponseInputItem; +type ResponseFunctionToolCall = OpenAI.Responses.ResponseFunctionToolCall; +type FunctionTool = OpenAI.Responses.FunctionTool; +type Tool = OpenAI.Responses.Tool; +type Response = OpenAI.Responses.Response; + +export type ResponsesCreateParams = Omit< + OpenAI.Responses.ResponseCreateParams, + 'model' | 'input' | 'instructions' | 'tools' | 'previous_response_id' +>; + +export type OpenAIResponsesModelOptions = { + readonly model: (string & {}) | OpenAI.Chat.ChatModel; + readonly apiKey?: string; + readonly baseUrl?: string; + readonly organization?: string; + readonly project?: string; + readonly headers?: { [key: string]: string }; + readonly fetch?: Fetch; + readonly timeout?: number; + readonly requestOptions?: ResponsesCreateParams; + readonly logger?: ILogger; + /** + * Enable stateful mode where OpenAI manages conversation context. + * When true, uses previous_response_id to maintain conversation state. + * When false, sends full conversation history with each request. + * @default true + */ + readonly stateful?: boolean; +}; + +export type AzureOpenAIResponsesModelOptions = OpenAIResponsesModelOptions & { + /** + * Defaults to process.env['OPENAI_API_VERSION']. + */ + apiVersion?: string; + + /** + * Your Azure endpoint, including the resource, e.g. `https://example-resource.azure.openai.com/` + */ + endpoint?: string; + + /** + * A function that returns an access token for Microsoft Entra (formerly known as Azure Active Directory), + * which will be invoked on every request. + */ + azureADTokenProvider?: () => Promise; +}; + +/** + * OpenAI Responses API chat model implementation. + * + * The Responses API is stateful and manages conversation context automatically, + * making it simpler for complex multi-turn conversations with tools. + * Supports both stateful (recommended) and stateless modes. + */ +export class OpenAIResponsesModel implements IChatModel { + private readonly _openai: OpenAI; + private readonly _log: ILogger; + private readonly _stateful: boolean; + + constructor(readonly options: OpenAIResponsesModelOptions | AzureOpenAIResponsesModelOptions) { + this._log = + options.logger || + new ConsoleLogger(`@microsoft/teams.openai/responses/${this.options.model}`); + this._stateful = options.stateful ?? true; + this._openai = + 'endpoint' in options + ? new AzureOpenAI({ + apiKey: options.apiKey, + apiVersion: options.apiVersion, + endpoint: options.endpoint?.replace(/\/$/, ''), + deployment: options.model, + azureADTokenProvider: options.azureADTokenProvider, + baseURL: options.baseUrl?.replace(/\/$/, ''), + organization: options.organization, + project: options.project, + defaultHeaders: options.headers, + fetch: options.fetch, + timeout: options.timeout, + }) + : new OpenAI({ + apiKey: options.apiKey, + baseURL: options.baseUrl?.replace(/\/$/, ''), + organization: options.organization, + project: options.project, + defaultHeaders: options.headers, + fetch: options.fetch, + timeout: options.timeout, + }); + } + + async send( + input: Message, + options: ChatSendOptions = {} + ): Promise { + const memory = options.messages || new LocalMemory(); + + // Execute any pending function calls first + const functionResults = await this._executeFunctions(input, options.functions); + + if (this._stateful) { + return this._sendStateful(input, options, memory, functionResults); + } else { + return this._sendStateless(input, options, memory, functionResults); + } + } + + /** + * Handle stateful conversation using OpenAI Responses API state management. + */ + private async _sendStateful( + input: Message, + options: ChatSendOptions, + memory: { push: (m: Message) => Promise; values: () => Promise; set?: (m: Message[]) => Promise }, + functionResults: FunctionMessage[] + ): Promise { + // Get messages from memory + const messages = await memory.values(); + + // Extract previous response ID from memory - look for ModelMessage with ID + let previousResponseId: string | undefined; + for (let i = messages.length - 1; i >= 0; i--) { + const msg = messages[i] as ModelMessage & { id?: string }; + if (msg.role === 'model' && msg.id) { + previousResponseId = msg.id; + break; + } + } + this._log.debug(`Found previous response ID: ${previousResponseId}`); + + // Push function results to memory + for (const result of functionResults) { + await memory.push(result); + messages.push(result); + } + + // Convert to Responses API format + const responsesInput = this._convertToResponsesFormat(input, messages); + + // Convert functions to tools format + const tools = this._convertFunctionsToTools(options.functions); + + this._log.debug(`Making Responses API call (stateful) with input type: ${input.role}`); + + // Make OpenAI Responses API call + const response = await this._openai.responses.create({ + ...this.options.requestOptions, + ...options.request, + model: 'endpoint' in this.options ? '' : this.options.model, + input: responsesInput, + instructions: options.system?.content || undefined, + tools: tools.length > 0 ? tools : undefined, + previous_response_id: previousResponseId, + }); + + this._log.debug(`Response API returned with id: ${response.id}`); + + // Convert response to ModelMessage format + const modelMessage = this._convertFromResponsesFormat(response); + + // Store response ID in the ModelMessage for next call + (modelMessage as ModelMessage & { id?: string }).id = response.id; + + // In stateful mode, replace memory with just the current response + if (memory.set) { + await memory.set([modelMessage]); + } + + // If response has function calls, recursively execute them + if (modelMessage.function_calls?.length) { + this._log.debug( + `Response has ${modelMessage.function_calls.length} function calls, executing recursively` + ); + return this.send(modelMessage, { + ...options, + messages: memory as any, + }); + } + + this._log.debug('Stateful Responses API conversation completed'); + return modelMessage; + } + + /** + * Handle stateless conversation using standard OpenAI API pattern. + */ + private async _sendStateless( + input: Message, + options: ChatSendOptions, + memory: { push: (m: Message) => Promise; values: () => Promise }, + functionResults: FunctionMessage[] + ): Promise { + // Get conversation history from memory + const messages = await memory.values(); + this._log.debug(`Retrieved ${messages.length} messages from memory`); + + // Push current input to memory + await memory.push(input); + messages.push(input); + + // Push function results to memory + for (const result of functionResults) { + await memory.push(result); + messages.push(result); + } + + // Convert to Responses API format + const responsesInput = this._convertToResponsesFormat(input, messages); + + // Convert functions to tools format + const tools = this._convertFunctionsToTools(options.functions); + + this._log.debug(`Making Responses API call (stateless) with input type: ${input.role}`); + + // Make OpenAI Responses API call (stateless - no previous_response_id) + const response = await this._openai.responses.create({ + ...this.options.requestOptions, + ...options.request, + model: 'endpoint' in this.options ? '' : this.options.model, + input: responsesInput, + instructions: options.system?.content || undefined, + tools: tools.length > 0 ? tools : undefined, + }); + + this._log.debug(`Response API returned with id: ${response.id}`); + + // Convert response to ModelMessage format + const modelMessage = this._convertFromResponsesFormat(response); + + // If response has function calls, recursively execute them + if (modelMessage.function_calls?.length) { + if (options.autoFunctionCalling !== false) { + this._log.debug( + `Response has ${modelMessage.function_calls.length} function calls, executing recursively` + ); + return this.send(modelMessage, { + ...options, + messages: memory as any, + }); + } else { + this._log.debug( + `Automatic function calling is disabled, skipping function call execution (total calls: ${modelMessage.function_calls.length})` + ); + } + } + + // Push response to memory + await memory.push(modelMessage); + + // Handle streaming if callback provided + if (options.onChunk && modelMessage.content) { + await options.onChunk(modelMessage.content); + } + + this._log.debug('Stateless Responses API conversation completed'); + return modelMessage; + } + + /** + * Execute any pending function calls in the input message. + */ + private async _executeFunctions( + input: Message, + functions?: Record + ): Promise { + const functionResults: FunctionMessage[] = []; + + if (input.role === 'model' && input.function_calls?.length) { + for (const call of input.function_calls) { + const log = this._log.child(`tools/${call.name}`); + const fn = functions?.[call.name]; + + if (!fn) { + functionResults.push({ + role: 'function', + content: `Error: function ${call.name} not found`, + function_id: call.id, + }); + continue; + } + + try { + log.debug(call.arguments); + const output = await fn.handler(call.arguments); + const content = JSON.stringify(output); + log.debug(content); + + functionResults.push({ + role: 'function', + content, + function_id: call.id, + }); + } catch (err) { + log.error(err); + + let content = 'Error: Unknown error'; + if (err instanceof Error) { + content = `Error: ${err.name} => ${err.message}`; + } + + functionResults.push({ + role: 'function', + content, + function_id: call.id, + }); + } + } + } + + return functionResults; + } + + /** + * Convert messages to Responses API input format. + */ + private _convertToResponsesFormat( + input: Message, + messages: Message[] + ): ResponseInputItem[] { + const inputList: ResponseInputItem[] = []; + + // Build a map of function results by ID for efficient lookup + const resultsById = new Map(); + for (const msg of messages) { + if (msg.role === 'function') { + resultsById.set(msg.function_id, msg); + } + } + + // Include all messages including current input + const allMessages = [...messages]; + if (!messages.includes(input)) { + allMessages.push(input); + } + + for (const message of allMessages) { + if (message.role === 'user') { + const content = + typeof message.content === 'string' + ? message.content + : message.content + .map((p) => (p.type === 'text' ? p.text : '')) + .join('\n'); + + inputList.push({ + type: 'message', + role: 'user', + content, + }); + } else if (message.role === 'system') { + inputList.push({ + type: 'message', + role: 'system', + content: message.content, + }); + } else if (message.role === 'model') { + if (message.function_calls?.length) { + // Add function calls and their results + for (const call of message.function_calls) { + inputList.push({ + type: 'function_call', + call_id: call.id, + name: call.name, + arguments: JSON.stringify(call.arguments), + }); + + // Add the matching function result if available + const result = resultsById.get(call.id); + if (result) { + inputList.push({ + type: 'function_call_output', + call_id: result.function_id, + output: result.content || '', + }); + } else { + this._log.warn(`No associated result found for call id (${call.name} - ${call.id})`); + } + } + } else if (message.content) { + // ModelMessage with content but no function calls + inputList.push({ + type: 'message', + role: 'assistant', + content: message.content, + }); + } + } + // FunctionMessage is handled as part of ModelMessage function calls above + } + + return inputList; + } + + /** + * Convert functions to Responses API tools format. + */ + private _convertFunctionsToTools(functions?: Record): Tool[] { + if (!functions || Object.keys(functions).length === 0) { + return []; + } + + const tools: Tool[] = []; + + for (const fn of Object.values(functions)) { + const tool: FunctionTool = { + type: 'function', + name: fn.name, + description: fn.description, + parameters: fn.parameters as Record, + strict: true, + }; + tools.push(tool); + } + + return tools; + } + + /** + * Convert Responses API response to ModelMessage format. + */ + private _convertFromResponsesFormat(response: Response): ModelMessage { + let content: string | undefined; + let functionCalls: FunctionCall[] | undefined; + + // Extract text content from response output + content = response.output_text || undefined; + + // Handle function calls from response + if (response.output?.length) { + for (const outputItem of response.output) { + if (outputItem.type === 'function_call') { + const fnCall = outputItem as ResponseFunctionToolCall; + if (!functionCalls) { + functionCalls = []; + } + functionCalls.push({ + id: fnCall.call_id, + name: fnCall.name, + arguments: fnCall.arguments ? JSON.parse(fnCall.arguments) : {}, + }); + } + } + } + + return { + role: 'model', + content, + function_calls: functionCalls, + }; + } +}