diff --git a/README.md b/README.md index ca121362..98aa768d 100644 --- a/README.md +++ b/README.md @@ -100,6 +100,9 @@ ngrok http 8081 Make note of the `Forwarding` URL. (e.g. `https://54c5-35-170-32-42.ngrok-free.app`) +cloudflared tunnel --url http://localhost:8081 + + ### Websocket URL Your server should now be accessible at the `Forwarding` URL when run, so set the `PUBLIC_URL` in `websocket-server/.env`. See `websocket-server/.env.example` for reference. @@ -107,3 +110,5 @@ Your server should now be accessible at the `Forwarding` URL when run, so set th # Additional Notes This repo isn't polished, and the security practices leave some to be desired. Please only use this as reference, and make sure to audit your app with security and engineering before deploying! + + diff --git a/webapp/components/checklist-and-config.tsx b/webapp/components/checklist-and-config.tsx index 21e47d5c..3929dad4 100644 --- a/webapp/components/checklist-and-config.tsx +++ b/webapp/components/checklist-and-config.tsx @@ -61,6 +61,7 @@ export default function ChecklistAndConfig({ // 2. Fetch numbers res = await fetch("/api/twilio/numbers"); + console.log("Fetched phone numbers:", res); if (!res.ok) throw new Error("Failed to fetch phone numbers"); const numbersData = await res.json(); if (Array.isArray(numbersData) && numbersData.length > 0) { diff --git a/websocket-server/src/configs/envs.ts b/websocket-server/src/configs/envs.ts new file mode 100644 index 00000000..431c9cf8 --- /dev/null +++ b/websocket-server/src/configs/envs.ts @@ -0,0 +1,61 @@ +import dotenv from "dotenv"; + +const NODE_ENV = process.env.NODE_ENV || "development"; +dotenv.config({ path: `.env.${NODE_ENV}` }); +dotenv.config(); + +const requiredEnvs: { + OPENAI_API_KEY: string; +} = { + OPENAI_API_KEY: process.env.OPENAI_API_KEY || "", +}; + + +for (const [key, value] of Object.entries(requiredEnvs)) { + if (!value) { + console.error(`❌ Missing required environment variable: ${key}`); + process.exit(1); + } +} + +export const envs: { + OPENAI_API_KEY: string; + OPENAI_MODEL: string; + PORT: string | number; + HOST: string; + PUBLIC_URL: string; + IS_PRODUCTION: boolean; + IS_DEVELOPMENT: boolean; + WS_HEARTBEAT_INTERVAL: string | number; + MAX_RECONNECT_ATTEMPTS: string | number; + TWILIO_ACCOUNT_SID: string; + TWILIO_AUTH_TOKEN: string; + CALENDLY_ACCESS_TOKEN: string; +} = { + + OPENAI_API_KEY: requiredEnvs.OPENAI_API_KEY, + OPENAI_MODEL: process.env.OPENAI_MODEL || "gpt-4o-realtime-preview-2024-12-17", + + + PORT: process.env.PORT || 8081, + HOST: process.env.HOST || "localhost", + PUBLIC_URL: process.env.PUBLIC_URL || `http://${process.env.HOST || "localhost"}:${process.env.PORT || 8081}`, + + + IS_PRODUCTION: NODE_ENV === "production", + IS_DEVELOPMENT: NODE_ENV === "development", + + + WS_HEARTBEAT_INTERVAL: process.env.WS_HEARTBEAT_INTERVAL || 30000, + MAX_RECONNECT_ATTEMPTS: process.env.MAX_RECONNECT_ATTEMPTS || 3, + + + + TWILIO_ACCOUNT_SID: process.env.TWILIO_ACCOUNT_SID || "", + TWILIO_AUTH_TOKEN: process.env.TWILIO_AUTH_TOKEN || "", + + + CALENDLY_ACCESS_TOKEN: process.env.CALENDLY_ACCESS_TOKEN || "", + +}; + diff --git a/websocket-server/src/configs/index.ts b/websocket-server/src/configs/index.ts new file mode 100644 index 00000000..3a9a31ff --- /dev/null +++ b/websocket-server/src/configs/index.ts @@ -0,0 +1 @@ +export * from "./envs"; \ No newline at end of file diff --git a/websocket-server/src/interfaces/index.ts b/websocket-server/src/interfaces/index.ts new file mode 100644 index 00000000..0a6fdb97 --- /dev/null +++ b/websocket-server/src/interfaces/index.ts @@ -0,0 +1,68 @@ +import { WebSocket } from "ws"; + +export interface Session { + twilioConn?: WebSocket; + frontendConn?: WebSocket; + modelConn?: WebSocket; + config?: any; + streamSid?: string; +} + +export interface FunctionCallItem { + name: string; + arguments: string; + call_id?: string; +} + +export interface FunctionSchema { + name: string; + type: "function"; + description?: string; + parameters: { + type: string; + properties: Record; + required: string[]; + }; +} + +export interface FunctionHandler { + schema: FunctionSchema; + handler: (args: any) => Promise; +} + + + +export enum ConnectionType { + TWILIO = 'twilio', + FRONTEND = 'frontend', + MODEL = 'model' +} + +export enum SessionEvent { + CONNECTION_ESTABLISHED = 'connection_established', + CONNECTION_CLOSED = 'connection_closed', + FUNCTION_CALL_STARTED = 'function_call_started', + FUNCTION_CALL_COMPLETED = 'function_call_completed', + FUNCTION_CALL_ERROR = 'function_call_error', + SESSION_RESET = 'session_reset', + ERROR = 'error' +} + +export interface SessionData { + twilioConn?: WebSocket; + frontendConn?: WebSocket; + modelConn?: WebSocket; + streamSid?: string; + saved_config?: any; + lastAssistantItem?: string; + responseStartTimestamp?: number; + latestMediaTimestamp?: number; + openAIApiKey?: string; +} + +export interface ConnectionStats { + totalConnections: number; + activeConnections: number; + connectionDuration: number; + lastActivity: Date; +} \ No newline at end of file diff --git a/websocket-server/src/server.ts b/websocket-server/src/server.ts index 75286971..abff853d 100644 --- a/websocket-server/src/server.ts +++ b/websocket-server/src/server.ts @@ -1,27 +1,17 @@ import express from "express"; import { WebSocketServer, WebSocket } from "ws"; import { IncomingMessage } from "http"; -import dotenv from "dotenv"; import http from "http"; import { readFileSync } from "fs"; import { join } from "path"; import cors from "cors"; -import { - handleCallConnection, - handleFrontendConnection, -} from "./sessionManager"; -import functions from "./functionHandlers"; -dotenv.config(); -const PORT = parseInt(process.env.PORT || "8081", 10); -const PUBLIC_URL = process.env.PUBLIC_URL || ""; -const OPENAI_API_KEY = process.env.OPENAI_API_KEY || ""; +import { WebSocketSessionManager } from "./services"; +import { envs } from "./configs"; +import functionHandlers from "./services/function-handlers"; -if (!OPENAI_API_KEY) { - console.error("OPENAI_API_KEY environment variable is required"); - process.exit(1); -} +const sessionManager = new WebSocketSessionManager(); const app = express(); app.use(cors()); @@ -34,21 +24,20 @@ const twimlPath = join(__dirname, "twiml.xml"); const twimlTemplate = readFileSync(twimlPath, "utf-8"); app.get("/public-url", (req, res) => { - res.json({ publicUrl: PUBLIC_URL }); + res.json({ publicUrl: envs.PUBLIC_URL }); }); + app.all("/twiml", (req, res) => { - const wsUrl = new URL(PUBLIC_URL); + const wsUrl = new URL(envs.PUBLIC_URL); wsUrl.protocol = "wss:"; wsUrl.pathname = `/call`; - const twimlContent = twimlTemplate.replace("{{WS_URL}}", wsUrl.toString()); res.type("text/xml").send(twimlContent); }); -// New endpoint to list available tools (schemas) app.get("/tools", (req, res) => { - res.json(functions.map((f) => f.schema)); + res.json(functionHandlers.map((f) => f.schema)); }); let currentCall: WebSocket | null = null; @@ -68,16 +57,17 @@ wss.on("connection", (ws: WebSocket, req: IncomingMessage) => { if (type === "call") { if (currentCall) currentCall.close(); currentCall = ws; - handleCallConnection(currentCall, OPENAI_API_KEY); + sessionManager.handleCallConnection(currentCall, envs.OPENAI_API_KEY); } else if (type === "logs") { if (currentLogs) currentLogs.close(); currentLogs = ws; - handleFrontendConnection(currentLogs); + sessionManager.handleFrontendConnection(currentLogs); } else { ws.close(); } }); -server.listen(PORT, () => { - console.log(`Server running on http://localhost:${PORT}`); -}); + +server.listen(envs.PORT, () => { + console.log(`Server running on http://localhost:${envs.PORT}`); +}); \ No newline at end of file diff --git a/websocket-server/src/services/function-handlers/calendly.ts b/websocket-server/src/services/function-handlers/calendly.ts new file mode 100644 index 00000000..b926093e --- /dev/null +++ b/websocket-server/src/services/function-handlers/calendly.ts @@ -0,0 +1,92 @@ +import { envs } from "../../configs"; +import { FunctionHandler } from "../../interfaces"; + +export const scheduleCalendlyMeeting: FunctionHandler = { + schema: { + name: "schedule_calendly_meeting", + type: "function", + description: "Schedule a meeting using Calendly", + parameters: { + type: "object", + properties: { + eventType: { + type: "string", + description: "Calendly event type URI (e.g., 'https://api.calendly.com/event_types/AAAAAAAAAAAAAAAA')" + }, + startTime: { + type: "string", + description: "Start time in ISO format (e.g., '2024-01-15T10:00:00Z')" + }, + inviteeEmail: { + type: "string", + description: "Email of the person being invited" + }, + inviteeName: { + type: "string", + description: "Name of the person being invited" + }, + timezone: { + type: "string", + description: "Timezone (e.g., 'America/Sao_Paulo')" + }, + }, + required: ["eventType", "startTime", "inviteeEmail", "inviteeName"] + } + }, + handler: async (args: { + eventType: string; + startTime: string; + inviteeEmail: string; + inviteeName: string; + timezone?: string; + questions?: Array<{ question: string; answer: string }>; + }) => { + try { + const response = await fetch('https://api.calendly.com/scheduled_events', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Authorization': `Bearer ${envs.CALENDLY_ACCESS_TOKEN}` + }, + body: JSON.stringify({ + event_type: args.eventType, + start_time: args.startTime, + invitee: { + email: args.inviteeEmail, + name: args.inviteeName, + timezone: args.timezone || 'UTC' + }, + questions_and_responses: args.questions?.map(q => ({ + question: q.question, + response: q.answer + })) || [] + }) + }); + + if (!response.ok) { + const errorData = await response.json(); + throw new Error(`Calendly API Error: ${errorData.message || response.statusText}`); + } + const data = await response.json(); + return JSON.stringify({ + success: true, + eventId: data.resource.uri, + meetingLink: data.resource.location?.join_url || data.resource.location?.location, + startTime: data.resource.start_time, + endTime: data.resource.end_time, + status: data.resource.status, + invitee: { + name: args.inviteeName, + email: args.inviteeEmail + } + }); + } catch (error) { + console.error('Error creating Calendly meeting:', error); + return JSON.stringify({ + success: false, + error: error instanceof Error ? error.message : 'Unknown error occurred' + }); + } + } +}; + diff --git a/websocket-server/src/services/function-handlers/index.ts b/websocket-server/src/services/function-handlers/index.ts new file mode 100644 index 00000000..3cb7d9c0 --- /dev/null +++ b/websocket-server/src/services/function-handlers/index.ts @@ -0,0 +1,11 @@ +import { FunctionHandler } from "../../interfaces"; +import { getWeatherFromCoords } from "./weather"; +import { scheduleCalendlyMeeting } from "./calendly"; + +const functionHandlers: FunctionHandler[] = []; + +functionHandlers.push(getWeatherFromCoords); +functionHandlers.push(scheduleCalendlyMeeting); + + +export default functionHandlers; \ No newline at end of file diff --git a/websocket-server/src/functionHandlers.ts b/websocket-server/src/services/function-handlers/weather.ts similarity index 86% rename from websocket-server/src/functionHandlers.ts rename to websocket-server/src/services/function-handlers/weather.ts index 512a7af8..7e874a78 100644 --- a/websocket-server/src/functionHandlers.ts +++ b/websocket-server/src/services/function-handlers/weather.ts @@ -1,8 +1,7 @@ -import { FunctionHandler } from "./types"; +import { FunctionHandler } from "../../interfaces"; -const functions: FunctionHandler[] = []; -functions.push({ +export const getWeatherFromCoords: FunctionHandler = { schema: { name: "get_weather_from_coords", type: "function", @@ -28,6 +27,6 @@ functions.push({ const currentTemp = data.current?.temperature_2m; return JSON.stringify({ temp: currentTemp }); }, -}); +}; + -export default functions; diff --git a/websocket-server/src/services/index.ts b/websocket-server/src/services/index.ts new file mode 100644 index 00000000..c8a21db7 --- /dev/null +++ b/websocket-server/src/services/index.ts @@ -0,0 +1 @@ +export * from "./session-manager"; \ No newline at end of file diff --git a/websocket-server/src/services/session-manager.ts b/websocket-server/src/services/session-manager.ts new file mode 100644 index 00000000..c8346228 --- /dev/null +++ b/websocket-server/src/services/session-manager.ts @@ -0,0 +1,502 @@ +import { RawData, WebSocket } from "ws"; +import { EventEmitter } from "events"; +import { ConnectionStats, ConnectionType, SessionData, SessionEvent } from "../interfaces"; +import functionHandlers from "./function-handlers"; + + +export class WebSocketSessionManager extends EventEmitter { + private session: SessionData = {}; + private connectionStartTime?: Date; + private readonly maxReconnectAttempts = 3; + private reconnectAttempts = 0; + private reconnectTimer?: NodeJS.Timeout; + + constructor() { + super(); + this.setupErrorHandling(); + } + + private setupErrorHandling(): void { + this.on('error', (error) => { + console.error('SessionManager Error:', error); + }); + } + + public async handleCallConnection(ws: WebSocket, openAIApiKey: string): Promise { + try { + await this.cleanupConnection(this.session.twilioConn); + this.session.twilioConn = ws; + this.session.openAIApiKey = openAIApiKey; + this.connectionStartTime = new Date(); + + this.setupWebSocketHandlers(ws, ConnectionType.TWILIO); + this.emit(SessionEvent.CONNECTION_ESTABLISHED, ConnectionType.TWILIO); + } catch (error) { + this.emit(SessionEvent.ERROR, { type: 'call_connection', error }); + throw error; + } + } + + public async handleFrontendConnection(ws: WebSocket): Promise { + try { + await this.cleanupConnection(this.session.frontendConn); + this.session.frontendConn = ws; + + this.setupWebSocketHandlers(ws, ConnectionType.FRONTEND); + this.emit(SessionEvent.CONNECTION_ESTABLISHED, ConnectionType.FRONTEND); + } catch (error) { + this.emit(SessionEvent.ERROR, { type: 'frontend_connection', error }); + throw error; + } + } + + private setupWebSocketHandlers(ws: WebSocket, type: ConnectionType): void { + ws.on("message", (data: RawData) => { + try { + if (type === ConnectionType.TWILIO) { + this.handleTwilioMessage(data); + } else if (type === ConnectionType.FRONTEND) { + this.handleFrontendMessage(data); + } + } catch (error) { + this.emit(SessionEvent.ERROR, { type: 'message_handling', error, connectionType: type }); + } + }); + + ws.on("error", (error) => { + this.emit(SessionEvent.ERROR, { type: 'websocket_error', error, connectionType: type }); + this.handleConnectionError(type, error); + }); + + ws.on("close", (code, reason) => { + this.handleConnectionClose(type, code, reason?.toString()); + }); + } + + private handleConnectionError(type: ConnectionType, error: Error): void { + console.error(`${type} connection error:`, error); + + if (type === ConnectionType.MODEL && this.reconnectAttempts < this.maxReconnectAttempts) { + this.scheduleReconnect(); + } + } + + private handleConnectionClose(type: ConnectionType, code?: number, reason?: string): void { + console.log(`${type} connection closed. Code: ${code}, Reason: ${reason}`); + + switch (type) { + case ConnectionType.TWILIO: + this.cleanupConnection(this.session.modelConn); + this.cleanupConnection(this.session.twilioConn); + this.session.twilioConn = undefined; + this.session.modelConn = undefined; + this.resetSessionData(); + break; + + case ConnectionType.FRONTEND: + this.cleanupConnection(this.session.frontendConn); + this.session.frontendConn = undefined; + break; + + case ConnectionType.MODEL: + this.closeModel(); + break; + } + + this.emit(SessionEvent.CONNECTION_CLOSED, { type, code, reason }); + + if (!this.hasActiveConnections()) { + this.resetSession(); + } + } + + private scheduleReconnect(): void { + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer); + } + + const delay = Math.pow(2, this.reconnectAttempts) * 1000; // Exponential backoff + + this.reconnectTimer = setTimeout(() => { + this.reconnectAttempts++; + this.tryConnectModel(); + }, delay); + } + + private async handleFunctionCall(item: { name: string; arguments: string; call_id?: string }): Promise { + this.emit(SessionEvent.FUNCTION_CALL_STARTED, { name: item.name, call_id: item.call_id }); + try { + console.log("Handling function call:", item); + const fnDef = functionHandlers.find((f) => f.schema.name === item.name); + if (!fnDef) { + throw new Error(`No handler found for function: ${item.name}`); + } + let args: unknown; + try { + args = JSON.parse(item.arguments); + } catch { + const errorResult = { error: "Invalid JSON arguments for function call." }; + this.emit(SessionEvent.FUNCTION_CALL_ERROR, { name: item.name, error: errorResult }); + return JSON.stringify(errorResult); + } + + console.log("Calling function:", fnDef.schema.name, args); + const result = await fnDef.handler(args as any); + + this.emit(SessionEvent.FUNCTION_CALL_COMPLETED, { + name: item.name, + call_id: item.call_id, + result + }); + + return result; + } catch (err: any) { + console.error("Error running function:", err); + const errorResult = { + error: `Error running function ${item.name}: ${err.message}`, + }; + + this.emit(SessionEvent.FUNCTION_CALL_ERROR, { + name: item.name, + call_id: item.call_id, + error: errorResult + }); + + return JSON.stringify(errorResult); + } + } + + private handleTwilioMessage(data: RawData): void { + const msg = this.parseMessage(data); + if (!msg) return; + + switch (msg.event) { + case "start": + this.session.streamSid = msg.start.streamSid; + this.session.latestMediaTimestamp = 0; + this.session.lastAssistantItem = undefined; + this.session.responseStartTimestamp = undefined; + this.tryConnectModel(); + break; + + case "media": + this.session.latestMediaTimestamp = msg.media.timestamp; + if (this.isOpen(this.session.modelConn)) { + this.jsonSend(this.session.modelConn, { + type: "input_audio_buffer.append", + audio: msg.media.payload, + }); + } + break; + + case "close": + this.closeAllConnections(); + break; + } + } + + private handleFrontendMessage(data: RawData): void { + const msg = this.parseMessage(data); + if (!msg) return; + + if (this.isOpen(this.session.modelConn)) { + this.jsonSend(this.session.modelConn, msg); + } + + if (msg.type === "session.update") { + this.session.saved_config = msg.session; + } + } + + private async tryConnectModel(): Promise { + if (!this.session.twilioConn || !this.session.streamSid || !this.session.openAIApiKey) { + return; + } + + if (this.isOpen(this.session.modelConn)) { + return; + } + + try { + this.session.modelConn = new WebSocket( + "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-12-17", + { + headers: { + Authorization: `Bearer ${this.session.openAIApiKey}`, + "OpenAI-Beta": "realtime=v1", + }, + } + ); + + this.setupModelConnection(); + this.reconnectAttempts = 0; + } catch (error) { + console.error('Failed to connect to model:', error); + this.emit(SessionEvent.ERROR, { type: 'model_connection', error }); + throw error; + } + } + + private setupModelConnection(): void { + if (!this.session.modelConn) return; + + this.session.modelConn.on("open", () => { + const config = this.session.saved_config || {}; + this.jsonSend(this.session.modelConn, { + type: "session.update", + session: { + modalities: ["text", "audio"], + turn_detection: { type: "server_vad" }, + voice: "ash", + input_audio_transcription: { model: "whisper-1" }, + input_audio_format: "g711_ulaw", + output_audio_format: "g711_ulaw", + ...config, + }, + }); + this.emit(SessionEvent.CONNECTION_ESTABLISHED, ConnectionType.MODEL); + }); + + this.session.modelConn.on("message", (data: RawData) => this.handleModelMessage(data)); + this.session.modelConn.on("error", (error) => this.handleConnectionError(ConnectionType.MODEL, error)); + this.session.modelConn.on("close", (code, reason) => + this.handleConnectionClose(ConnectionType.MODEL, code, reason?.toString()) + ); + } + + private handleModelMessage(data: RawData): void { + const event = this.parseMessage(data); + if (!event) return; + + this.jsonSend(this.session.frontendConn, event); + + switch (event.type) { + case "input_audio_buffer.speech_started": + this.handleTruncation(); + break; + + case "response.audio.delta": + this.handleAudioDelta(event); + break; + + case "response.output_item.done": + this.handleOutputItemDone(event); + break; + } + } + + private handleAudioDelta(event: any): void { + if (this.session.twilioConn && this.session.streamSid) { + if (this.session.responseStartTimestamp === undefined) { + this.session.responseStartTimestamp = this.session.latestMediaTimestamp || 0; + } + + if (event.item_id) { + this.session.lastAssistantItem = event.item_id; + } + + this.jsonSend(this.session.twilioConn, { + event: "media", + streamSid: this.session.streamSid, + media: { payload: event.delta }, + }); + + this.jsonSend(this.session.twilioConn, { + event: "mark", + streamSid: this.session.streamSid, + }); + } + } + + private async handleOutputItemDone(event: any): Promise { + const { item } = event; + if (item.type === "function_call") { + try { + const output = await this.handleFunctionCall(item); + if (this.session.modelConn) { + this.jsonSend(this.session.modelConn, { + type: "conversation.item.create", + item: { + type: "function_call_output", + call_id: item.call_id, + output: JSON.stringify(output), + }, + }); + this.jsonSend(this.session.modelConn, { type: "response.create" }); + } + } catch (err) { + console.error("Error handling function call:", err); + this.emit(SessionEvent.ERROR, { type: 'function_call', error: err }); + } + } + } + + private handleTruncation(): void { + if (!this.session.lastAssistantItem || this.session.responseStartTimestamp === undefined) { + return; + } + + const elapsedMs = (this.session.latestMediaTimestamp || 0) - (this.session.responseStartTimestamp || 0); + const audio_end_ms = Math.max(0, elapsedMs); + + if (this.isOpen(this.session.modelConn)) { + this.jsonSend(this.session.modelConn, { + type: "conversation.item.truncate", + item_id: this.session.lastAssistantItem, + content_index: 0, + audio_end_ms, + }); + } + + if (this.session.twilioConn && this.session.streamSid) { + this.jsonSend(this.session.twilioConn, { + event: "clear", + streamSid: this.session.streamSid, + }); + } + + this.resetSessionData(); + } + + private resetSessionData(): void { + this.session.lastAssistantItem = undefined; + this.session.responseStartTimestamp = undefined; + } + + private closeModel(): void { + this.cleanupConnection(this.session.modelConn); + this.session.modelConn = undefined; + + if (!this.hasActiveConnections()) { + this.resetSession(); + } + } + + public async closeAllConnections(): Promise { + const closingPromises: Promise[] = []; + + if (this.session.twilioConn) { + closingPromises.push(this.cleanupConnection(this.session.twilioConn)); + this.session.twilioConn = undefined; + } + + if (this.session.modelConn) { + closingPromises.push(this.cleanupConnection(this.session.modelConn)); + this.session.modelConn = undefined; + } + + if (this.session.frontendConn) { + closingPromises.push(this.cleanupConnection(this.session.frontendConn)); + this.session.frontendConn = undefined; + } + + await Promise.all(closingPromises); + this.resetSession(); + } + + private resetSession(): void { + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer); + this.reconnectTimer = undefined; + } + + this.session = {}; + this.connectionStartTime = undefined; + this.reconnectAttempts = 0; + this.emit(SessionEvent.SESSION_RESET); + } + + private async cleanupConnection(ws?: WebSocket): Promise { + if (!this.isOpen(ws)) return; + + return new Promise((resolve) => { + ws.once('close', () => resolve()); + ws.close(); + + // Force close after timeout + setTimeout(() => { + if (ws.readyState !== WebSocket.CLOSED) { + ws.terminate(); + } + resolve(); + }, 5000); + }); + } + + private parseMessage(data: RawData): any { + try { + const message = JSON.parse(data.toString()); + return message; + } catch (error) { + console.warn('Failed to parse message:', error); + return null; + } + } + + private jsonSend(ws: WebSocket | undefined, obj: unknown): boolean { + if (!this.isOpen(ws)) return false; + try { + ws.send(JSON.stringify(obj)); + return true; + } catch (error) { + console.error('Failed to send JSON:', error); + this.emit(SessionEvent.ERROR, { type: 'json_send', error }); + return false; + } + } + + private isOpen(ws?: WebSocket): ws is WebSocket { + return !!ws && ws.readyState === WebSocket.OPEN; + } + + private hasActiveConnections(): boolean { + return !!(this.session.twilioConn || this.session.frontendConn || this.session.modelConn); + } + + public getSessionInfo(): Readonly { + return { ...this.session }; + } + + public isConnected(): boolean { + return this.hasActiveConnections(); + } + + public getConnectionStats(): ConnectionStats { + const activeConnections = [ + this.session.twilioConn, + this.session.frontendConn, + this.session.modelConn + ].filter(conn => this.isOpen(conn)).length; + + return { + totalConnections: activeConnections, + activeConnections, + connectionDuration: this.connectionStartTime + ? Date.now() - this.connectionStartTime.getTime() + : 0, + lastActivity: new Date() + }; + } + + public async healthCheck(): Promise<{ status: 'healthy' | 'unhealthy'; details: any }> { + const stats = this.getConnectionStats(); + const isHealthy = this.hasActiveConnections(); + return { + status: isHealthy ? 'healthy' : 'unhealthy', + details: { + connections: stats, + session: { + streamSid: !!this.session.streamSid, + hasApiKey: !!this.session.openAIApiKey, + hasConfig: !!this.session.saved_config + }, + reconnectAttempts: this.reconnectAttempts + } + }; + } + + public async destroy(): Promise { + this.removeAllListeners(); + await this.closeAllConnections(); + } +} \ No newline at end of file diff --git a/websocket-server/src/sessionManager.ts b/websocket-server/src/sessionManager.ts deleted file mode 100644 index 16a22766..00000000 --- a/websocket-server/src/sessionManager.ts +++ /dev/null @@ -1,287 +0,0 @@ -import { RawData, WebSocket } from "ws"; -import functions from "./functionHandlers"; - -interface Session { - twilioConn?: WebSocket; - frontendConn?: WebSocket; - modelConn?: WebSocket; - streamSid?: string; - saved_config?: any; - lastAssistantItem?: string; - responseStartTimestamp?: number; - latestMediaTimestamp?: number; - openAIApiKey?: string; -} - -let session: Session = {}; - -export function handleCallConnection(ws: WebSocket, openAIApiKey: string) { - cleanupConnection(session.twilioConn); - session.twilioConn = ws; - session.openAIApiKey = openAIApiKey; - - ws.on("message", handleTwilioMessage); - ws.on("error", ws.close); - ws.on("close", () => { - cleanupConnection(session.modelConn); - cleanupConnection(session.twilioConn); - session.twilioConn = undefined; - session.modelConn = undefined; - session.streamSid = undefined; - session.lastAssistantItem = undefined; - session.responseStartTimestamp = undefined; - session.latestMediaTimestamp = undefined; - if (!session.frontendConn) session = {}; - }); -} - -export function handleFrontendConnection(ws: WebSocket) { - cleanupConnection(session.frontendConn); - session.frontendConn = ws; - - ws.on("message", handleFrontendMessage); - ws.on("close", () => { - cleanupConnection(session.frontendConn); - session.frontendConn = undefined; - if (!session.twilioConn && !session.modelConn) session = {}; - }); -} - -async function handleFunctionCall(item: { name: string; arguments: string }) { - console.log("Handling function call:", item); - const fnDef = functions.find((f) => f.schema.name === item.name); - if (!fnDef) { - throw new Error(`No handler found for function: ${item.name}`); - } - - let args: unknown; - try { - args = JSON.parse(item.arguments); - } catch { - return JSON.stringify({ - error: "Invalid JSON arguments for function call.", - }); - } - - try { - console.log("Calling function:", fnDef.schema.name, args); - const result = await fnDef.handler(args as any); - return result; - } catch (err: any) { - console.error("Error running function:", err); - return JSON.stringify({ - error: `Error running function ${item.name}: ${err.message}`, - }); - } -} - -function handleTwilioMessage(data: RawData) { - const msg = parseMessage(data); - if (!msg) return; - - switch (msg.event) { - case "start": - session.streamSid = msg.start.streamSid; - session.latestMediaTimestamp = 0; - session.lastAssistantItem = undefined; - session.responseStartTimestamp = undefined; - tryConnectModel(); - break; - case "media": - session.latestMediaTimestamp = msg.media.timestamp; - if (isOpen(session.modelConn)) { - jsonSend(session.modelConn, { - type: "input_audio_buffer.append", - audio: msg.media.payload, - }); - } - break; - case "close": - closeAllConnections(); - break; - } -} - -function handleFrontendMessage(data: RawData) { - const msg = parseMessage(data); - if (!msg) return; - - if (isOpen(session.modelConn)) { - jsonSend(session.modelConn, msg); - } - - if (msg.type === "session.update") { - session.saved_config = msg.session; - } -} - -function tryConnectModel() { - if (!session.twilioConn || !session.streamSid || !session.openAIApiKey) - return; - if (isOpen(session.modelConn)) return; - - session.modelConn = new WebSocket( - "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-12-17", - { - headers: { - Authorization: `Bearer ${session.openAIApiKey}`, - "OpenAI-Beta": "realtime=v1", - }, - } - ); - - session.modelConn.on("open", () => { - const config = session.saved_config || {}; - jsonSend(session.modelConn, { - type: "session.update", - session: { - modalities: ["text", "audio"], - turn_detection: { type: "server_vad" }, - voice: "ash", - input_audio_transcription: { model: "whisper-1" }, - input_audio_format: "g711_ulaw", - output_audio_format: "g711_ulaw", - ...config, - }, - }); - }); - - session.modelConn.on("message", handleModelMessage); - session.modelConn.on("error", closeModel); - session.modelConn.on("close", closeModel); -} - -function handleModelMessage(data: RawData) { - const event = parseMessage(data); - if (!event) return; - - jsonSend(session.frontendConn, event); - - switch (event.type) { - case "input_audio_buffer.speech_started": - handleTruncation(); - break; - - case "response.audio.delta": - if (session.twilioConn && session.streamSid) { - if (session.responseStartTimestamp === undefined) { - session.responseStartTimestamp = session.latestMediaTimestamp || 0; - } - if (event.item_id) session.lastAssistantItem = event.item_id; - - jsonSend(session.twilioConn, { - event: "media", - streamSid: session.streamSid, - media: { payload: event.delta }, - }); - - jsonSend(session.twilioConn, { - event: "mark", - streamSid: session.streamSid, - }); - } - break; - - case "response.output_item.done": { - const { item } = event; - if (item.type === "function_call") { - handleFunctionCall(item) - .then((output) => { - if (session.modelConn) { - jsonSend(session.modelConn, { - type: "conversation.item.create", - item: { - type: "function_call_output", - call_id: item.call_id, - output: JSON.stringify(output), - }, - }); - jsonSend(session.modelConn, { type: "response.create" }); - } - }) - .catch((err) => { - console.error("Error handling function call:", err); - }); - } - break; - } - } -} - -function handleTruncation() { - if ( - !session.lastAssistantItem || - session.responseStartTimestamp === undefined - ) - return; - - const elapsedMs = - (session.latestMediaTimestamp || 0) - (session.responseStartTimestamp || 0); - const audio_end_ms = elapsedMs > 0 ? elapsedMs : 0; - - if (isOpen(session.modelConn)) { - jsonSend(session.modelConn, { - type: "conversation.item.truncate", - item_id: session.lastAssistantItem, - content_index: 0, - audio_end_ms, - }); - } - - if (session.twilioConn && session.streamSid) { - jsonSend(session.twilioConn, { - event: "clear", - streamSid: session.streamSid, - }); - } - - session.lastAssistantItem = undefined; - session.responseStartTimestamp = undefined; -} - -function closeModel() { - cleanupConnection(session.modelConn); - session.modelConn = undefined; - if (!session.twilioConn && !session.frontendConn) session = {}; -} - -function closeAllConnections() { - if (session.twilioConn) { - session.twilioConn.close(); - session.twilioConn = undefined; - } - if (session.modelConn) { - session.modelConn.close(); - session.modelConn = undefined; - } - if (session.frontendConn) { - session.frontendConn.close(); - session.frontendConn = undefined; - } - session.streamSid = undefined; - session.lastAssistantItem = undefined; - session.responseStartTimestamp = undefined; - session.latestMediaTimestamp = undefined; - session.saved_config = undefined; -} - -function cleanupConnection(ws?: WebSocket) { - if (isOpen(ws)) ws.close(); -} - -function parseMessage(data: RawData): any { - try { - return JSON.parse(data.toString()); - } catch { - return null; - } -} - -function jsonSend(ws: WebSocket | undefined, obj: unknown) { - if (!isOpen(ws)) return; - ws.send(JSON.stringify(obj)); -} - -function isOpen(ws?: WebSocket): ws is WebSocket { - return !!ws && ws.readyState === WebSocket.OPEN; -} diff --git a/websocket-server/src/types.ts b/websocket-server/src/types.ts deleted file mode 100644 index 6c544c91..00000000 --- a/websocket-server/src/types.ts +++ /dev/null @@ -1,31 +0,0 @@ -import { WebSocket } from "ws"; - -export interface Session { - twilioConn?: WebSocket; - frontendConn?: WebSocket; - modelConn?: WebSocket; - config?: any; - streamSid?: string; -} - -export interface FunctionCallItem { - name: string; - arguments: string; - call_id?: string; -} - -export interface FunctionSchema { - name: string; - type: "function"; - description?: string; - parameters: { - type: string; - properties: Record; - required: string[]; - }; -} - -export interface FunctionHandler { - schema: FunctionSchema; - handler: (args: any) => Promise; -}