diff --git a/Releases/v2.4/.claude/VoiceServer/server.ts b/Releases/v2.4/.claude/VoiceServer/server.ts index 314eb9ff7..ba0a8a526 100755 --- a/Releases/v2.4/.claude/VoiceServer/server.ts +++ b/Releases/v2.4/.claude/VoiceServer/server.ts @@ -411,10 +411,123 @@ async function sendNotification( } } +// ═══════════════════════════════════════════════════════════════════════════ +// FIFO Message Queue - Prevents overlapping audio from parallel subagents +// ═══════════════════════════════════════════════════════════════════════════ +// Queue is in-memory only. Server restart clears pending messages. +// This is intentional - voice notifications are ephemeral and time-sensitive. + +interface QueuedMessage { + title: string; + message: string; + voiceEnabled: boolean; + voiceId: string | null; + voiceSettings?: Partial; + seq: number; +} + +const MAX_QUEUE_SIZE = 100; +const LOG_PREVIEW_LENGTH = 30; + +const messageQueue: QueuedMessage[] = []; +let isProcessingQueue = false; +let lastAssignedSequence = 0; + +async function drainQueueInOrder(): Promise { + while (messageQueue.length > 0) { + const item = messageQueue.shift(); + if (!item) break; + + const remaining = messageQueue.length; + console.log(`🔊 Playing #${item.seq} (${remaining} queued): "${item.message.slice(0, LOG_PREVIEW_LENGTH)}..."`); + + try { + await sendNotification( + item.title, + item.message, + item.voiceEnabled, + item.voiceId, + item.voiceSettings + ); + } catch (error) { + console.error('Queue processing error:', error); + } + } +} + +async function processQueue(): Promise { + if (isProcessingQueue || messageQueue.length === 0) { + return; + } + + isProcessingQueue = true; + try { + await drainQueueInOrder(); + } finally { + isProcessingQueue = false; + } +} + +function enqueueMessage( + title: string, + message: string, + voiceEnabled: boolean, + voiceId: string | null, + voiceSettings?: Partial, + seq?: number +): { position: number; queueDepth: number } | { error: string; queueDepth: number } { + if (messageQueue.length >= MAX_QUEUE_SIZE) { + console.warn(`⚠️ Queue full (${MAX_QUEUE_SIZE}), rejecting message`); + return { error: 'Queue full, try again later', queueDepth: messageQueue.length }; + } + + const item: QueuedMessage = { + title, + message, + voiceEnabled, + voiceId, + voiceSettings, + seq: seq ?? ++lastAssignedSequence, + }; + + // Insert in sorted order to maintain FIFO + const insertIndex = messageQueue.findIndex(m => m.seq > item.seq); + if (insertIndex === -1) { + messageQueue.push(item); + } else { + messageQueue.splice(insertIndex, 0, item); + } + + const position = messageQueue.indexOf(item) + 1; + console.log(`📥 Queued #${item.seq} at position ${position}: "${message.slice(0, LOG_PREVIEW_LENGTH)}..."`); + + processQueue().catch(err => console.error('Queue error:', err)); + + return { position, queueDepth: messageQueue.length }; +} + +// ═══════════════════════════════════════════════════════════════════════════ + // Rate limiting const requestCounts = new Map(); const RATE_LIMIT = 10; const RATE_WINDOW = 60000; +const RATE_CLEANUP_INTERVAL = 5 * 60 * 1000; + +// Periodic cleanup of stale rate limit entries to prevent memory leak +setInterval(() => { + const now = Date.now(); + let cleaned = 0; + for (const [ip, record] of requestCounts) { + if (now > record.resetTime) { + requestCounts.delete(ip); + cleaned++; + } + } + if (cleaned > 0) { + console.log(`🧹 Rate limit cleanup: removed ${cleaned} stale entries`); + } +}, RATE_CLEANUP_INTERVAL); function checkRateLimit(ip: string): boolean { const now = Date.now(); @@ -462,15 +575,14 @@ const server = serve({ } if (url.pathname === "/notify" && req.method === "POST") { + const seq = ++lastAssignedSequence; // Capture before async for FIFO ordering try { const data = await req.json(); const title = data.title || "PAI Notification"; const message = data.message || "Task completed"; const voiceEnabled = data.voice_enabled !== false; - const voiceId = data.voice_id || data.voice_name || null; // Support both voice_id and voice_name + const voiceId = data.voice_id || data.voice_name || null; - // Accept prosody settings directly in request (for custom agents) - // Also accept volume at top level for convenience const voiceSettings: Partial | undefined = data.voice_settings ? { ...data.voice_settings, volume: data.volume ?? data.voice_settings.volume } : data.volume !== undefined @@ -483,10 +595,22 @@ const server = serve({ console.log(`📨 Notification: "${title}" - "${message}" (voice: ${voiceEnabled}, voiceId: ${voiceId || DEFAULT_VOICE_ID})`); - await sendNotification(title, message, voiceEnabled, voiceId, voiceSettings); + const result = enqueueMessage(title, message, voiceEnabled, voiceId, voiceSettings, seq); + + if ('error' in result) { + return new Response( + JSON.stringify({ status: "error", message: result.error, queue_depth: result.queueDepth }), + { headers: { ...corsHeaders, "Content-Type": "application/json" }, status: 503 } + ); + } return new Response( - JSON.stringify({ status: "success", message: "Notification sent" }), + JSON.stringify({ + status: "success", + message: "Notification queued", + queue_position: result.position, + queue_depth: result.queueDepth, + }), { headers: { ...corsHeaders, "Content-Type": "application/json" }, status: 200 @@ -505,6 +629,7 @@ const server = serve({ } if (url.pathname === "/pai" && req.method === "POST") { + const seq = ++lastAssignedSequence; // Capture before async for FIFO ordering try { const data = await req.json(); const title = data.title || "PAI Assistant"; @@ -512,10 +637,22 @@ const server = serve({ console.log(`🤖 PAI notification: "${title}" - "${message}"`); - await sendNotification(title, message, true, null); + const result = enqueueMessage(title, message, true, null, undefined, seq); + + if ('error' in result) { + return new Response( + JSON.stringify({ status: "error", message: result.error, queue_depth: result.queueDepth }), + { headers: { ...corsHeaders, "Content-Type": "application/json" }, status: 503 } + ); + } return new Response( - JSON.stringify({ status: "success", message: "PAI notification sent" }), + JSON.stringify({ + status: "success", + message: "PAI notification queued", + queue_position: result.position, + queue_depth: result.queueDepth, + }), { headers: { ...corsHeaders, "Content-Type": "application/json" }, status: 200 @@ -540,7 +677,11 @@ const server = serve({ port: PORT, voice_system: "ElevenLabs", default_voice_id: DEFAULT_VOICE_ID, - api_key_configured: !!ELEVENLABS_API_KEY + api_key_configured: !!ELEVENLABS_API_KEY, + queue: { + depth: messageQueue.length, + is_processing: isProcessingQueue, + }, }), { headers: { ...corsHeaders, "Content-Type": "application/json" }, @@ -561,3 +702,4 @@ console.log(`🎙️ Using ElevenLabs TTS (default voice: ${DEFAULT_VOICE_ID})` console.log(`📡 POST to http://localhost:${PORT}/notify`); console.log(`🔒 Security: CORS restricted to localhost, rate limiting enabled`); console.log(`🔑 API Key: ${ELEVENLABS_API_KEY ? '✅ Configured' : '❌ Missing'}`); +console.log(`📋 Message Queue: FIFO enabled (sequential playback, no overlap)`);