Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 150 additions & 8 deletions Releases/v2.4/.claude/VoiceServer/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -411,10 +411,123 @@ async function sendNotification(
}
}

// ═══════════════════════════════════════════════════════════════════════════
// FIFO Message Queue - Prevents overlapping audio from parallel subagents
// ═══════════════════════════════════════════════════════════════════════════
// Queue is in-memory only. Server restart clears pending messages.
// This is intentional - voice notifications are ephemeral and time-sensitive.

interface QueuedMessage {
title: string;
message: string;
voiceEnabled: boolean;
voiceId: string | null;
voiceSettings?: Partial<ProsodySettings>;
seq: number;
}

const MAX_QUEUE_SIZE = 100;
const LOG_PREVIEW_LENGTH = 30;

const messageQueue: QueuedMessage[] = [];
let isProcessingQueue = false;
let lastAssignedSequence = 0;

async function drainQueueInOrder(): Promise<void> {
while (messageQueue.length > 0) {
const item = messageQueue.shift();
if (!item) break;

const remaining = messageQueue.length;
console.log(`🔊 Playing #${item.seq} (${remaining} queued): "${item.message.slice(0, LOG_PREVIEW_LENGTH)}..."`);

try {
await sendNotification(
item.title,
item.message,
item.voiceEnabled,
item.voiceId,
item.voiceSettings
);
} catch (error) {
console.error('Queue processing error:', error);
}
}
}

async function processQueue(): Promise<void> {
if (isProcessingQueue || messageQueue.length === 0) {
return;
}

isProcessingQueue = true;
try {
await drainQueueInOrder();
} finally {
isProcessingQueue = false;
}
}

function enqueueMessage(
title: string,
message: string,
voiceEnabled: boolean,
voiceId: string | null,
voiceSettings?: Partial<ProsodySettings>,
seq?: number
): { position: number; queueDepth: number } | { error: string; queueDepth: number } {
if (messageQueue.length >= MAX_QUEUE_SIZE) {
console.warn(`⚠️ Queue full (${MAX_QUEUE_SIZE}), rejecting message`);
return { error: 'Queue full, try again later', queueDepth: messageQueue.length };
}

const item: QueuedMessage = {
title,
message,
voiceEnabled,
voiceId,
voiceSettings,
seq: seq ?? ++lastAssignedSequence,
};

// Insert in sorted order to maintain FIFO
const insertIndex = messageQueue.findIndex(m => m.seq > item.seq);
if (insertIndex === -1) {
messageQueue.push(item);
} else {
messageQueue.splice(insertIndex, 0, item);
}

const position = messageQueue.indexOf(item) + 1;
console.log(`📥 Queued #${item.seq} at position ${position}: "${message.slice(0, LOG_PREVIEW_LENGTH)}..."`);

processQueue().catch(err => console.error('Queue error:', err));

return { position, queueDepth: messageQueue.length };
}

// ═══════════════════════════════════════════════════════════════════════════

// Rate limiting
const requestCounts = new Map<string, { count: number; resetTime: number }>();
const RATE_LIMIT = 10;
const RATE_WINDOW = 60000;
const RATE_CLEANUP_INTERVAL = 5 * 60 * 1000;

// Periodic cleanup of stale rate limit entries to prevent memory leak
setInterval(() => {
const now = Date.now();
let cleaned = 0;
for (const [ip, record] of requestCounts) {
if (now > record.resetTime) {
requestCounts.delete(ip);
cleaned++;
}
}
if (cleaned > 0) {
console.log(`🧹 Rate limit cleanup: removed ${cleaned} stale entries`);
}
}, RATE_CLEANUP_INTERVAL);

function checkRateLimit(ip: string): boolean {
const now = Date.now();
Expand Down Expand Up @@ -462,15 +575,14 @@ const server = serve({
}

if (url.pathname === "/notify" && req.method === "POST") {
const seq = ++lastAssignedSequence; // Capture before async for FIFO ordering
try {
const data = await req.json();
const title = data.title || "PAI Notification";
const message = data.message || "Task completed";
const voiceEnabled = data.voice_enabled !== false;
const voiceId = data.voice_id || data.voice_name || null; // Support both voice_id and voice_name
const voiceId = data.voice_id || data.voice_name || null;

// Accept prosody settings directly in request (for custom agents)
// Also accept volume at top level for convenience
const voiceSettings: Partial<ProsodySettings> | undefined = data.voice_settings
? { ...data.voice_settings, volume: data.volume ?? data.voice_settings.volume }
: data.volume !== undefined
Expand All @@ -483,10 +595,22 @@ const server = serve({

console.log(`📨 Notification: "${title}" - "${message}" (voice: ${voiceEnabled}, voiceId: ${voiceId || DEFAULT_VOICE_ID})`);

await sendNotification(title, message, voiceEnabled, voiceId, voiceSettings);
const result = enqueueMessage(title, message, voiceEnabled, voiceId, voiceSettings, seq);

if ('error' in result) {
return new Response(
JSON.stringify({ status: "error", message: result.error, queue_depth: result.queueDepth }),
{ headers: { ...corsHeaders, "Content-Type": "application/json" }, status: 503 }
);
}

return new Response(
JSON.stringify({ status: "success", message: "Notification sent" }),
JSON.stringify({
status: "success",
message: "Notification queued",
queue_position: result.position,
queue_depth: result.queueDepth,
}),
{
headers: { ...corsHeaders, "Content-Type": "application/json" },
status: 200
Expand All @@ -505,17 +629,30 @@ const server = serve({
}

if (url.pathname === "/pai" && req.method === "POST") {
const seq = ++lastAssignedSequence; // Capture before async for FIFO ordering
try {
const data = await req.json();
const title = data.title || "PAI Assistant";
const message = data.message || "Task completed";

console.log(`🤖 PAI notification: "${title}" - "${message}"`);

await sendNotification(title, message, true, null);
const result = enqueueMessage(title, message, true, null, undefined, seq);

if ('error' in result) {
return new Response(
JSON.stringify({ status: "error", message: result.error, queue_depth: result.queueDepth }),
{ headers: { ...corsHeaders, "Content-Type": "application/json" }, status: 503 }
);
}

return new Response(
JSON.stringify({ status: "success", message: "PAI notification sent" }),
JSON.stringify({
status: "success",
message: "PAI notification queued",
queue_position: result.position,
queue_depth: result.queueDepth,
}),
{
headers: { ...corsHeaders, "Content-Type": "application/json" },
status: 200
Expand All @@ -540,7 +677,11 @@ const server = serve({
port: PORT,
voice_system: "ElevenLabs",
default_voice_id: DEFAULT_VOICE_ID,
api_key_configured: !!ELEVENLABS_API_KEY
api_key_configured: !!ELEVENLABS_API_KEY,
queue: {
depth: messageQueue.length,
is_processing: isProcessingQueue,
},
}),
{
headers: { ...corsHeaders, "Content-Type": "application/json" },
Expand All @@ -561,3 +702,4 @@ console.log(`🎙️ Using ElevenLabs TTS (default voice: ${DEFAULT_VOICE_ID})`
console.log(`📡 POST to http://localhost:${PORT}/notify`);
console.log(`🔒 Security: CORS restricted to localhost, rate limiting enabled`);
console.log(`🔑 API Key: ${ELEVENLABS_API_KEY ? '✅ Configured' : '❌ Missing'}`);
console.log(`📋 Message Queue: FIFO enabled (sequential playback, no overlap)`);