diff --git a/examples-backend/product-roadmap-backend/package.json b/examples-backend/product-roadmap-backend/package.json index 8d783e48..68da7864 100644 --- a/examples-backend/product-roadmap-backend/package.json +++ b/examples-backend/product-roadmap-backend/package.json @@ -17,6 +17,7 @@ "@mastra/libsql": "^0.14.1", "@mastra/loggers": "^0.10.11", "@mastra/memory": "^0.15.1", + "@mastra/voice-openai": "^0.11.6", "zod": "^3.25.76", "zod-to-json-schema": "^3.24.6" }, diff --git a/examples-backend/product-roadmap-backend/pnpm-lock.yaml b/examples-backend/product-roadmap-backend/pnpm-lock.yaml index 51e52df6..e19ff0dc 100644 --- a/examples-backend/product-roadmap-backend/pnpm-lock.yaml +++ b/examples-backend/product-roadmap-backend/pnpm-lock.yaml @@ -12,7 +12,7 @@ importers: specifier: ^2.0.23 version: 2.0.30(zod@3.25.76) '@mastra/core': - specifier: 0.16.0 + specifier: ^0.16.0 version: 0.16.0(openapi-types@12.1.3)(react@19.1.1)(zod@3.25.76) '@mastra/libsql': specifier: ^0.14.1 @@ -23,6 +23,9 @@ importers: '@mastra/memory': specifier: ^0.15.1 version: 0.15.1(@mastra/core@0.16.0(openapi-types@12.1.3)(react@19.1.1)(zod@3.25.76))(react@19.1.1)(zod@3.25.76) + '@mastra/voice-openai': + specifier: ^0.11.6 + version: 0.11.6(@mastra/core@0.16.0(openapi-types@12.1.3)(react@19.1.1)(zod@3.25.76))(ws@8.18.3)(zod@3.25.76) zod: specifier: ^3.25.76 version: 3.25.76 @@ -551,6 +554,12 @@ packages: '@mastra/core': '>=0.16.3-0 <0.17.0-0' zod: ^3.25.0 || ^4.0.0 + '@mastra/voice-openai@0.11.6': + resolution: {integrity: sha512-Nss/Jqeg+1TTaRIa+G3Y2UoNEtCSt9UYCOict1SvCegqA2D7ZP2rguzpQFhFFaPwA53jqUmEEd/2jBVLfjl/Yg==} + peerDependencies: + '@mastra/core': '>=0.15.3-0 <0.19.0-0' + zod: ^3.25.0 || ^4.0.0 + '@modelcontextprotocol/sdk@1.18.0': resolution: {integrity: sha512-JvKyB6YwS3quM+88JPR0axeRgvdDu3Pv6mdZUy+w4qVkCzGgumb9bXG/TmtDRQv+671yaofVfXSQmFLlWU5qPQ==} engines: {node: '>=18'} @@ -2328,6 +2337,18 @@ packages: resolution: {integrity: sha512-YgBpdJHPyQ2UE5x+hlSXcnejzAvD0b22U2OuAP+8OnlJT+PjWPxtgmGqKKc+RgTM63U9gN0YzrYc71R2WT/hTA==} engines: {node: '>=18'} + openai@5.23.1: + resolution: {integrity: sha512-APxMtm5mln4jhKhAr0d5zP9lNsClx4QwJtg8RUvYSSyxYCTHLNJnLEcSHbJ6t0ori8Pbr9HZGfcPJ7LEy73rvQ==} + hasBin: true + peerDependencies: + ws: ^8.18.0 + zod: ^3.23.8 + peerDependenciesMeta: + ws: + optional: true + zod: + optional: true + openapi-types@12.1.3: resolution: {integrity: sha512-N4YtSYJqghVu4iek2ZUvcN/0aqH1kRDuNqzcycDxhOUpg7GdvLa2F3DgS6yBNhInhv2r/6I0Flkn7CqL8+nIcw==} @@ -3535,6 +3556,14 @@ snapshots: '@mastra/core': 0.16.0(openapi-types@12.1.3)(react@19.1.1)(zod@3.25.76) zod: 3.25.76 + '@mastra/voice-openai@0.11.6(@mastra/core@0.16.0(openapi-types@12.1.3)(react@19.1.1)(zod@3.25.76))(ws@8.18.3)(zod@3.25.76)': + dependencies: + '@mastra/core': 0.16.0(openapi-types@12.1.3)(react@19.1.1)(zod@3.25.76) + openai: 5.23.1(ws@8.18.3)(zod@3.25.76) + zod: 3.25.76 + transitivePeerDependencies: + - ws + '@modelcontextprotocol/sdk@1.18.0': dependencies: ajv: 6.12.6 @@ -5491,6 +5520,11 @@ snapshots: is-inside-container: 1.0.0 wsl-utils: 0.1.0 + openai@5.23.1(ws@8.18.3)(zod@3.25.76): + optionalDependencies: + ws: 8.18.3 + zod: 3.25.76 + openapi-types@12.1.3: {} p-map@7.0.3: {} diff --git a/examples-backend/product-roadmap-backend/src/mastra/apiRegistry.ts b/examples-backend/product-roadmap-backend/src/mastra/apiRegistry.ts index 59b97658..4a36a5ad 100644 --- a/examples-backend/product-roadmap-backend/src/mastra/apiRegistry.ts +++ b/examples-backend/product-roadmap-backend/src/mastra/apiRegistry.ts @@ -7,6 +7,7 @@ import { import { z } from 'zod'; import { zodToJsonSchema } from 'zod-to-json-schema'; import { createSSEStream, streamJSONEvent } from '../utils/streamUtils'; +import { handleVoiceStream } from './voiceStreamHandler'; export const ChatThreadSchema = z.object({ id: z.string(), @@ -278,4 +279,12 @@ export const apiRoutes = [ } }, }), + + // -------------------- Voice API -------------------- + + // Voice transcription to workflow (streaming) + registerApiRoute('/voice/stream', { + method: 'POST', + handler: handleVoiceStream, + }), ]; diff --git a/examples-backend/product-roadmap-backend/src/mastra/voiceStreamHandler.ts b/examples-backend/product-roadmap-backend/src/mastra/voiceStreamHandler.ts new file mode 100644 index 00000000..26f4aeed --- /dev/null +++ b/examples-backend/product-roadmap-backend/src/mastra/voiceStreamHandler.ts @@ -0,0 +1,142 @@ +import { Context } from 'hono'; +import { Readable } from 'stream'; +import { createSSEStream, streamJSONEvent } from '../utils/streamUtils'; +import { chatWorkflow } from './workflows/chatWorkflow'; +import { OpenAIVoice } from '@mastra/voice-openai'; + +export const voiceProvider = new OpenAIVoice({ + speechModel: { apiKey: process.env.OPENAI_API_KEY!, name: 'tts-1' }, + listeningModel: { + apiKey: process.env.OPENAI_API_KEY!, + name: 'whisper-1', + }, +}); + +/** + * Create workflow input data from the voice streaming parameters + */ +function createWorkflowInput( + baseInput: { + prompt: string; + additionalContext?: unknown; + temperature?: number; + maxTokens?: number; + systemPrompt?: string; + resourceId?: string; + threadId?: string; + }, + controller: ReadableStreamDefaultController, + isStreaming: boolean = true, + isVoice: boolean = false +) { + return { + ...baseInput, + streamController: isStreaming ? controller : undefined, + isVoice, + }; +} + +/** + * Handle voice streaming request + * Transcribes audio, then streams the LLM response back + */ +export async function handleVoiceStream(c: Context) { + try { + const form = await c.req.formData(); + const audioFile = form.get('audio') as File; + const additionalContext = form.get('context') as string | null; + const settings = form.get('settings') as string | null; + + let parsedAdditionalContext: unknown = undefined; + let parsedSettings: { + temperature?: number; + maxTokens?: number; + systemPrompt?: string; + resourceId?: string; + threadId?: string; + } = {}; + + // Parse additional context if provided + if (additionalContext) { + try { + parsedAdditionalContext = JSON.parse(additionalContext); + } catch { + // leave undefined if not valid JSON + } + } + + // Parse voice settings if provided + if (settings) { + try { + parsedSettings = JSON.parse(settings); + } catch { + // use empty object if not valid JSON + } + } + + if (!audioFile) { + return c.json({ error: 'audio required' }, 400); + } + + // Convert audio file to buffer and then to stream + const buf = Buffer.from(await audioFile.arrayBuffer()); + + // Transcribe the audio + const transcription = await voiceProvider.listen(Readable.from(buf), { + filetype: 'webm', + }); + + // Create SSE stream for real-time response + return createSSEStream(async (controller) => { + // Emit the transcription in the format that Cedar OS voice streaming expects + console.log('Emitting voice transcription:', transcription); + streamJSONEvent(controller, 'transcription', { + type: 'transcription', + transcription: transcription, + }); + + // Start the chat workflow with the transcription + const run = await chatWorkflow.createRunAsync(); + const result = await run.start({ + inputData: createWorkflowInput( + { + prompt: transcription, + additionalContext: parsedAdditionalContext ?? additionalContext, + temperature: parsedSettings.temperature, + maxTokens: parsedSettings.maxTokens, + systemPrompt: parsedSettings.systemPrompt, + resourceId: parsedSettings.resourceId, + threadId: parsedSettings.threadId, + }, + controller, + true, + true + ), + }); + + if (result.status !== 'success') { + console.error('Workflow failed:', result.status); + streamJSONEvent(controller, 'error', { + type: 'error', + error: `Workflow failed: ${result.status}`, + }); + } + + // Emit completion event + console.log('Voice stream completed successfully'); + streamJSONEvent(controller, 'done', { + type: 'done', + completedItems: [], + }); + + // The workflow handles streaming the response through the controller + // No need to manually close here as the workflow will handle completion + }); + } catch (error) { + console.error('Voice stream error:', error); + return c.json( + { error: error instanceof Error ? error.message : 'Internal error' }, + 500 + ); + } +} diff --git a/examples-backend/product-roadmap-backend/src/mastra/voiceUtils.ts b/examples-backend/product-roadmap-backend/src/mastra/voiceUtils.ts new file mode 100644 index 00000000..5620e276 --- /dev/null +++ b/examples-backend/product-roadmap-backend/src/mastra/voiceUtils.ts @@ -0,0 +1,34 @@ +import { OpenAIVoice } from '@mastra/voice-openai'; +import { streamAudioFromText } from '../utils/streamUtils'; + +export const voiceProvider = new OpenAIVoice({ + speechModel: { apiKey: process.env.OPENAI_API_KEY!, name: 'tts-1' }, + listeningModel: { + apiKey: process.env.OPENAI_API_KEY!, + name: 'whisper-1', + }, +}); + +export function createSpeakFunction() { + return (t: string, options?: Record) => + voiceProvider.speak( + t, + options as { speaker?: string; speed?: number } + ) as unknown as Promise; +} + +export async function handleVoiceOutput( + streamController: ReadableStreamDefaultController, + pendingText: string, + options: { voice?: string; speed?: number; eventType?: string } = {} +) { + if (!pendingText) return; + + const speakFn = createSpeakFunction(); + await streamAudioFromText(streamController, speakFn, pendingText, { + voice: 'alloy', + speed: 1.0, + eventType: 'audio', + ...options, + }); +} diff --git a/examples-backend/product-roadmap-backend/src/mastra/workflows/chatWorkflow.ts b/examples-backend/product-roadmap-backend/src/mastra/workflows/chatWorkflow.ts index 30d8d771..b02e2b59 100644 --- a/examples-backend/product-roadmap-backend/src/mastra/workflows/chatWorkflow.ts +++ b/examples-backend/product-roadmap-backend/src/mastra/workflows/chatWorkflow.ts @@ -8,6 +8,7 @@ import { z } from 'zod'; import { productRoadmapAgent } from '../agents/productRoadmapAgent'; import { handleTextStreamV2, streamJSONEvent } from '../../utils/streamUtils'; import { RuntimeContext } from '@mastra/core/runtime-context'; +import { handleVoiceOutput } from '../voiceUtils'; // --------------------------------------------- // Mastra nested streaming – emit placeholder events @@ -162,6 +163,8 @@ export const ChatInputSchema = z.object({ resourceId: z.string().optional(), threadId: z.string().optional(), streamController: z.any().optional(), + // Voice support + isVoice: z.boolean().optional(), // For structured output output: z.any().optional(), }); @@ -212,6 +215,7 @@ const buildAgentContext = createStep({ resourceId, threadId, additionalContext, + isVoice, } = inputData; const message = prompt; @@ -225,6 +229,7 @@ const buildAgentContext = createStep({ streamController, resourceId, threadId, + isVoice, }; return result; @@ -260,6 +265,7 @@ const callAgent = createStep({ resourceId, threadId, additionalContext, + isVoice, } = inputData; const runtimeContext = new RuntimeContext(); @@ -280,16 +286,35 @@ const callAgent = createStep({ ); let finalText = ''; + let pendingText = ''; for await (const chunk of streamResult.fullStream) { if (chunk.type === 'text-delta') { finalText += chunk.payload.text; - await handleTextStreamV2(chunk.payload.text, streamController); + + if (isVoice && streamController) { + // Accumulate text for voice synthesis + pendingText += chunk.payload.text; + } else { + // Regular text streaming + await handleTextStreamV2(chunk.payload.text, streamController); + } } else if (chunk.type === 'tool-result' || chunk.type === 'tool-call') { + // Handle any pending text before tool events for voice + if (isVoice && streamController && pendingText) { + await handleVoiceOutput(streamController, pendingText); + pendingText = ''; + } + streamJSONEvent(streamController, chunk.type, chunk); } } + // Handle any remaining pending text for voice + if (isVoice && streamController && pendingText) { + await handleVoiceOutput(streamController, pendingText); + } + return { content: finalText }; }, }); diff --git a/examples-backend/product-roadmap-backend/src/utils/streamUtils.ts b/examples-backend/product-roadmap-backend/src/utils/streamUtils.ts index 2f864350..4351f3a1 100644 --- a/examples-backend/product-roadmap-backend/src/utils/streamUtils.ts +++ b/examples-backend/product-roadmap-backend/src/utils/streamUtils.ts @@ -73,3 +73,71 @@ export async function handleTextStreamV2( return chunk; } + +// ------------------- Voice-Specific Event Types ------------------- + +/** + * Voice transcription result event. + * Sent at the beginning of voice streams to show what was heard. + */ +export interface TranscriptionEvent { + type: 'transcription'; + transcription: string; +} + +/** + * Audio output event. + * Contains base64-encoded audio data for voice responses. + */ +export interface AudioEvent { + type: 'audio'; + audioData: string; // base64 encoded + audioFormat: 'audio/mpeg'; + content: string; // original text that was spoken +} + +/** + * Generate audio from provided text using a speak function and emit an 'audio' event via SSE. + * The speak function should return a NodeJS.ReadableStream of audio data. + */ +export async function streamAudioFromText( + controller: ReadableStreamDefaultController, + // Accept either Node.js Readable or Web ReadableStream for broader compatibility + speakFn: ( + text: string, + options?: Record + ) => Promise, + text: string, + options: { voice?: string; speed?: number; eventType?: string } = {} +) { + const { voice = 'alloy', speed = 1.0, eventType = 'audio' } = options; + const speechStream = await speakFn(text, { voice, speed }); + + // Convert stream to buffer for response (support Web ReadableStream and Node Readable) + let audioResponse: Buffer; + if (typeof (speechStream as ReadableStream).getReader === 'function') { + // Web ReadableStream + const reader = (speechStream as ReadableStream).getReader(); + const parts: Uint8Array[] = []; + for (;;) { + const { value, done } = await reader.read(); + if (done) break; + if (value) parts.push(value); + } + audioResponse = Buffer.concat(parts.map((u8) => Buffer.from(u8))); + } else { + // Node Readable + const chunks: Buffer[] = []; + for await (const chunk of speechStream as unknown as NodeJS.ReadableStream) { + chunks.push(Buffer.from(chunk as Buffer)); + } + audioResponse = Buffer.concat(chunks); + } + + streamJSONEvent(controller, eventType, { + type: eventType as 'audio', + audioData: audioResponse.toString('base64'), + audioFormat: 'audio/mpeg', + content: text, + }); +} diff --git a/packages/cedar-os/src/store/agentConnection/AgentConnectionTypes.ts b/packages/cedar-os/src/store/agentConnection/AgentConnectionTypes.ts index 8f15c0b2..46da3236 100644 --- a/packages/cedar-os/src/store/agentConnection/AgentConnectionTypes.ts +++ b/packages/cedar-os/src/store/agentConnection/AgentConnectionTypes.ts @@ -55,7 +55,15 @@ export type StreamEvent = | { type: 'error'; error: Error } | { type: 'metadata'; data: unknown }; +export type VoiceStreamEvent = + | StreamEvent + | { type: 'transcription'; transcription: string } + | { type: 'audio'; audioData: string; audioFormat?: string; content: string }; + export type StreamHandler = (event: StreamEvent) => void | Promise; +export type VoiceStreamHandler = ( + event: VoiceStreamEvent +) => void | Promise; export interface StreamResponse { abort: () => void; @@ -223,6 +231,11 @@ export interface ProviderImplementation< handler: StreamHandler ) => StreamResponse; voiceLLM: (params: VoiceParams, config: TConfig) => Promise; + voiceStreamLLM?: ( + params: VoiceParams, + config: TConfig, + handler: VoiceStreamHandler + ) => StreamResponse; handleResponse: (response: Response) => Promise; } diff --git a/packages/cedar-os/src/store/agentConnection/agentConnectionSlice.ts b/packages/cedar-os/src/store/agentConnection/agentConnectionSlice.ts index 20034509..008888c2 100644 --- a/packages/cedar-os/src/store/agentConnection/agentConnectionSlice.ts +++ b/packages/cedar-os/src/store/agentConnection/agentConnectionSlice.ts @@ -15,6 +15,7 @@ import type { StructuredResponseType, VoiceLLMResponse, VoiceParams, + VoiceStreamHandler, } from '@/store/agentConnection/AgentConnectionTypes'; import { getProviderImplementation } from '@/store/agentConnection/providers/index'; import type { CedarStore } from '@/store/CedarOSTypes'; @@ -97,6 +98,12 @@ export interface AgentConnectionSlice { // Voice LLM method voiceLLM: (params: VoiceParams) => Promise; + // Voice streaming LLM method + voiceStreamLLM: ( + params: VoiceParams, + handler: VoiceStreamHandler + ) => StreamResponse; + // High-level methods that use callLLM/streamLLM sendMessage: < T extends Record = Record, @@ -134,7 +141,7 @@ export interface AgentConnectionSlice { // Create a typed version of the slice that knows about the provider export type TypedAgentConnectionSlice = Omit< AgentConnectionSlice, - 'callLLM' | 'streamLLM' | 'callLLMStructured' | 'voiceLLM' + 'callLLM' | 'streamLLM' | 'callLLMStructured' | 'voiceLLM' | 'voiceStreamLLM' > & { callLLM: (params: GetParamsForConfig) => Promise; callLLMStructured: ( @@ -145,6 +152,10 @@ export type TypedAgentConnectionSlice = Omit< handler: StreamHandler ) => StreamResponse; voiceLLM: (params: VoiceParams) => Promise; + voiceStreamLLM: ( + params: VoiceParams, + handler: VoiceStreamHandler + ) => StreamResponse; }; export const createAgentConnectionSlice: StateCreator< @@ -434,6 +445,95 @@ export const createAgentConnectionSlice: StateCreator< } }, + // Voice streaming LLM method + voiceStreamLLM: (params: VoiceParams, handler: VoiceStreamHandler) => { + const config = get().providerConfig; + if (!config) { + throw new Error('No LLM provider configured'); + } + + const provider = getProviderImplementation(config); + if (!provider.voiceStreamLLM) { + throw new Error( + `Provider ${config.provider} does not support voice streaming` + ); + } + + // Augment params for Mastra provider to include resourceId & threadId + let voiceParams: VoiceParams = params; + if (config.provider === 'mastra') { + const resourceId = getCedarState('userId') as string | undefined; + const threadId = get().mainThreadId; + voiceParams = { + ...params, + resourceId, + threadId, + } as typeof voiceParams; + } + + // Log the stream start + const streamId = get().logStreamStart( + voiceParams as BaseParams, + config.provider + ); + + // Set current request ID to the stream ID + set({ currentRequestId: streamId }); + + const abortController = new AbortController(); + set({ currentAbortController: abortController, isStreaming: true }); + + // Wrap the handler to log stream events + const wrappedHandler: VoiceStreamHandler = (event) => { + if (event.type === 'chunk') { + get().logStreamChunk(streamId, event.content); + } else if (event.type === 'done') { + get().logStreamEnd(streamId, event.completedItems); + // Clear current request ID when stream ends + set({ currentRequestId: null }); + } else if (event.type === 'error') { + get().logAgentError(streamId, event.error); + // Clear current request ID on error + set({ currentRequestId: null }); + } else if (event.type === 'object') { + get().logStreamObject(streamId, event.object); + } else if (event.type === 'transcription') { + // Log transcription events + get().logStreamChunk( + streamId, + `[Transcription] ${event.transcription}` + ); + } else if (event.type === 'audio') { + // Log audio events + get().logStreamChunk( + streamId, + `[Audio] ${event.audioFormat || 'unknown format'}` + ); + } + handler(event); + }; + + // Call the provider's voiceStreamLLM method + const originalResponse = provider.voiceStreamLLM( + voiceParams as unknown as never, + config as never, + wrappedHandler + ); + + // Wrap the completion to update state when done + const wrappedCompletion = originalResponse.completion.finally(() => { + set({ isStreaming: false, currentAbortController: null }); + }); + + return { + abort: () => { + originalResponse.abort(); + abortController.abort(); + }, + completion: wrappedCompletion, + }; + }, + // Handle LLM response handleLLMResponse: async (itemsToProcess) => { const state = get(); diff --git a/packages/cedar-os/src/store/agentConnection/providers/ai-sdk.ts b/packages/cedar-os/src/store/agentConnection/providers/ai-sdk.ts index e6b9aefb..7a700444 100644 --- a/packages/cedar-os/src/store/agentConnection/providers/ai-sdk.ts +++ b/packages/cedar-os/src/store/agentConnection/providers/ai-sdk.ts @@ -7,6 +7,7 @@ import type { StreamResponse, VoiceParams, VoiceLLMResponse, + VoiceStreamHandler, } from '@/store/agentConnection/AgentConnectionTypes'; import type { StructuredResponseType } from '@/store/agentConnection/AgentConnectionTypes'; import { @@ -42,6 +43,11 @@ export interface AISDKProviderImplementation { params: VoiceParams, config: AISDKConfig ) => Promise; + voiceStreamLLM?: ( + params: VoiceParams, + config: AISDKConfig, + handler: VoiceStreamHandler + ) => StreamResponse; handleResponse: (response: Response) => Promise; } diff --git a/packages/cedar-os/src/store/agentConnection/providers/mastra.ts b/packages/cedar-os/src/store/agentConnection/providers/mastra.ts index 3e849537..e2d4f900 100644 --- a/packages/cedar-os/src/store/agentConnection/providers/mastra.ts +++ b/packages/cedar-os/src/store/agentConnection/providers/mastra.ts @@ -3,11 +3,141 @@ import type { MastraParams, ProviderImplementation, StructuredParams, + VoiceStreamHandler, } from '@/store/agentConnection/AgentConnectionTypes'; import { handleEventStream } from '@/store/agentConnection/agentUtils'; type MastraConfig = InferProviderConfig<'mastra'>; +/** + * Handle voice streaming response from Mastra voice endpoint + */ +async function handleVoiceEventStream( + response: Response, + handler: VoiceStreamHandler +): Promise { + const reader = response.body?.getReader(); + if (!reader) { + throw new Error('Response body is not readable'); + } + + const decoder = new TextDecoder(); + let buffer = ''; // Buffer to accumulate incomplete chunks + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + const chunk = decoder.decode(value, { stream: true }); + buffer += chunk; + + // Process complete lines + const lines = buffer.split('\n'); + // Keep the last potentially incomplete line in the buffer + buffer = lines.pop() || ''; + + for (const line of lines) { + const trimmedLine = line.trim(); + if (!trimmedLine || !trimmedLine.startsWith('data: ')) continue; + + const data = trimmedLine.slice(6); // Remove 'data: ' prefix + + try { + const parsed = JSON.parse(data); + + // Handle different event types + if (parsed.type === 'transcription' && parsed.transcription) { + handler({ + type: 'transcription', + transcription: parsed.transcription, + }); + } else if (parsed.type === 'audio' && parsed.audioData) { + handler({ + type: 'audio', + audioData: parsed.audioData, + audioFormat: parsed.audioFormat, + content: parsed.content, + }); + } else if (parsed.type === 'chunk' && parsed.content) { + handler({ + type: 'chunk', + content: parsed.content, + }); + } else if (parsed.type === 'object' && parsed.object) { + handler({ + type: 'object', + object: parsed.object, + }); + } else if (parsed.type === 'error') { + handler({ + type: 'error', + error: new Error(parsed.error || 'Stream error'), + }); + } else if (parsed.type === 'done') { + handler({ + type: 'done', + completedItems: parsed.completedItems || [], + }); + } + } catch (parseError) { + console.warn('Failed to parse voice stream event:', parseError); + console.warn('Problematic data length:', data.length); + } + } + } + + // Process any remaining data in the buffer + if (buffer.trim()) { + const trimmedLine = buffer.trim(); + if (trimmedLine.startsWith('data: ')) { + const data = trimmedLine.slice(6); + try { + const parsed = JSON.parse(data); + // Handle the final event (same logic as above) + if (parsed.type === 'transcription' && parsed.transcription) { + handler({ + type: 'transcription', + transcription: parsed.transcription, + }); + } else if (parsed.type === 'audio' && parsed.audioData) { + handler({ + type: 'audio', + audioData: parsed.audioData, + audioFormat: parsed.audioFormat, + content: parsed.content, + }); + } else if (parsed.type === 'chunk' && parsed.content) { + handler({ + type: 'chunk', + content: parsed.content, + }); + } else if (parsed.type === 'object' && parsed.object) { + handler({ + type: 'object', + object: parsed.object, + }); + } else if (parsed.type === 'error') { + handler({ + type: 'error', + error: new Error(parsed.error || 'Stream error'), + }); + } else if (parsed.type === 'done') { + handler({ + type: 'done', + completedItems: parsed.completedItems || [], + }); + } + } catch (parseError) { + console.warn('Failed to parse final voice stream event:', parseError); + } + } + } + } finally { + reader.releaseLock(); + } +} + export const mastraProvider: ProviderImplementation< MastraParams, MastraConfig @@ -211,6 +341,74 @@ export const mastraProvider: ProviderImplementation< } }, + voiceStreamLLM: (params, config, handler) => { + const abortController = new AbortController(); + + const completion = (async () => { + try { + const { audioData, voiceSettings, context, ...rest } = params; + + const headers: Record = {}; + + // Only add Authorization header if apiKey is provided + if (config.apiKey) { + headers.Authorization = `Bearer ${config.apiKey}`; + } + + // Use the endpoint from voiceSettings if provided, otherwise use voiceRoute from config + const voiceEndpoint = + voiceSettings.endpoint || config.voiceRoute || '/voice'; + const fullUrl = voiceEndpoint.startsWith('http') + ? voiceEndpoint + : `${config.baseURL}${voiceEndpoint}`; + + // Add /stream suffix for streaming endpoint + const streamUrl = fullUrl.endsWith('/stream') + ? fullUrl + : `${fullUrl}/stream`; + + const formData = new FormData(); + formData.append('audio', audioData, 'recording.webm'); + formData.append('settings', JSON.stringify(voiceSettings)); + if (context) { + formData.append('context', JSON.stringify(context)); + } + + for (const [key, value] of Object.entries(rest)) { + if (value === undefined || value === null) continue; + if (typeof value === 'object') { + formData.append(key, JSON.stringify(value)); + } else { + formData.append(key, String(value)); + } + } + + const response = await fetch(streamUrl, { + method: 'POST', + headers, + body: formData, + signal: abortController.signal, + }); + + if (!response.ok) { + throw new Error(`Voice stream endpoint returned ${response.status}`); + } + + // Handle streaming response + await handleVoiceEventStream(response, handler); + } catch (error) { + if (error instanceof Error && error.name !== 'AbortError') { + handler({ type: 'error', error }); + } + } + })(); + + return { + abort: () => abortController.abort(), + completion, + }; + }, + handleResponse: async (response) => { if (!response.ok) { throw new Error(`HTTP error! status: ${response.status}`); diff --git a/packages/cedar-os/src/store/voice/voiceSlice.ts b/packages/cedar-os/src/store/voice/voiceSlice.ts index 2db43486..f7031c9a 100644 --- a/packages/cedar-os/src/store/voice/voiceSlice.ts +++ b/packages/cedar-os/src/store/voice/voiceSlice.ts @@ -26,6 +26,7 @@ export interface VoiceState { useBrowserTTS?: boolean; autoAddToMessages?: boolean; endpoint?: string; // Voice endpoint URL + stream?: boolean; // Whether to use streaming voice processing }; } @@ -41,6 +42,7 @@ export interface VoiceActions { // Audio streaming streamAudioToEndpoint: (audioData: Blob) => Promise; + streamAudioToEndpointStream: (audioData: Blob) => Promise; handleLLMVoice: (response: VoiceLLMResponse) => Promise; playAudioResponse: (audioUrl: string | ArrayBuffer) => Promise; @@ -71,6 +73,7 @@ const initialVoiceState: VoiceState = { volume: 1.0, useBrowserTTS: false, autoAddToMessages: true, // Default to true for automatic message integration + stream: false, // Default to non-streaming }, }; @@ -152,7 +155,14 @@ export const createVoiceSlice: StateCreator = ( mediaRecorder.onstop = async () => { const audioBlob = new Blob(audioChunks, { type: 'audio/webm' }); - await get().streamAudioToEndpoint(audioBlob); + const { voiceSettings } = get(); + + // Choose between streaming and non-streaming based on settings + if (voiceSettings.stream) { + await get().streamAudioToEndpointStream(audioBlob); + } else { + await get().streamAudioToEndpoint(audioBlob); + } }; mediaRecorder.start(); @@ -236,6 +246,126 @@ export const createVoiceSlice: StateCreator = ( } }, + streamAudioToEndpointStream: async (audioData: Blob) => { + const { voiceSettings } = get(); + + try { + set({ isSpeaking: false }); + + // Set processing state to true when starting voice processing + get().setIsProcessing(true); + + // Check if we have a provider configured + const providerConfig = get().providerConfig; + if (!providerConfig) { + throw new Error('No provider configured for voice'); + } + + // For Mastra/custom providers with explicit endpoints, check if endpoint is configured + if ( + (providerConfig.provider === 'mastra' || + providerConfig.provider === 'custom') && + !voiceSettings.endpoint + ) { + throw new Error('Voice endpoint not configured'); + } + + // Get the stringified additional context from the store + const contextString = get().compileAdditionalContext(); + + // Use the agent connection's voiceStreamLLM method + const streamResponse = get().voiceStreamLLM( + { + audioData, + voiceSettings, + context: contextString, + prompt: '', + }, + async (event) => { + // Handle streaming voice events + switch (event.type) { + case 'transcription': + // Handle transcription events + if (voiceSettings.autoAddToMessages && event.transcription) { + const { addMessage } = get(); + addMessage({ + type: 'text', + role: 'user', + content: event.transcription, + metadata: { + source: 'voice', + timestamp: new Date().toISOString(), + }, + }); + } + break; + case 'audio': + // Handle audio streaming + if (event.audioData && event.audioFormat) { + const binaryString = atob(event.audioData); + const bytes = new Uint8Array(binaryString.length); + for (let i = 0; i < binaryString.length; i++) { + bytes[i] = binaryString.charCodeAt(i); + } + const audioBuffer = bytes.buffer; + await get().playAudioResponse(audioBuffer); + } + + // Also add the text content to the chat + if (event.content) { + get().addMessage({ + type: 'text', + role: 'bot', + content: event.content, + metadata: { + source: 'voice', + timestamp: new Date().toISOString(), + }, + }); + } + break; + case 'chunk': + // Handle text content chunks + await get().handleLLMResponse([event.content]); + break; + case 'object': + // Handle structured objects + await get().handleLLMResponse( + Array.isArray(event.object) ? event.object : [event.object] + ); + break; + case 'done': + // Stream completed + get().setIsProcessing(false); + break; + case 'error': + console.error('Voice stream error:', event.error); + set({ + voiceError: + event.error instanceof Error + ? event.error.message + : 'Voice stream error', + }); + get().setIsProcessing(false); + break; + } + } + ); + + // Wait for stream to complete + await streamResponse.completion; + } catch (error) { + set({ + voiceError: + error instanceof Error + ? error.message + : 'Failed to process voice stream', + }); + // Set processing state to false on error + get().setIsProcessing(false); + } + }, + handleLLMVoice: async (response: VoiceLLMResponse) => { const { voiceSettings } = get(); diff --git a/src/app/examples/product-roadmap/layout.tsx b/src/app/examples/product-roadmap/layout.tsx index 0b43e9c2..d7b03f1b 100644 --- a/src/app/examples/product-roadmap/layout.tsx +++ b/src/app/examples/product-roadmap/layout.tsx @@ -39,7 +39,7 @@ export default function ProductRoadmapLayout({ provider: 'mastra', baseURL: 'http://localhost:4111', chatPath: '/chat', - voiceRoute: '/chat', + voiceRoute: '/voice', resumePath: '/chat/resume', }; @@ -53,7 +53,8 @@ export default function ProductRoadmapLayout({ // }; const voiceSettings = { - useBrowserTTS: true, + useBrowserTTS: false, + stream: true, }; const localStorageConfig: MessageStorageConfig = {