Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples-backend/product-roadmap-backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"@mastra/libsql": "^0.14.1",
"@mastra/loggers": "^0.10.11",
"@mastra/memory": "^0.15.1",
"@mastra/voice-openai": "^0.11.6",
"zod": "^3.25.76",
"zod-to-json-schema": "^3.24.6"
},
Expand Down
36 changes: 35 additions & 1 deletion examples-backend/product-roadmap-backend/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
import { z } from 'zod';
import { zodToJsonSchema } from 'zod-to-json-schema';
import { createSSEStream, streamJSONEvent } from '../utils/streamUtils';
import { handleVoiceStream } from './voiceStreamHandler';

export const ChatThreadSchema = z.object({
id: z.string(),
Expand Down Expand Up @@ -278,4 +279,12 @@ export const apiRoutes = [
}
},
}),

// -------------------- Voice API --------------------

// Voice transcription to workflow (streaming)
registerApiRoute('/voice/stream', {
method: 'POST',
handler: handleVoiceStream,
}),
];
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import { Context } from 'hono';
import { Readable } from 'stream';
import { createSSEStream, streamJSONEvent } from '../utils/streamUtils';
import { chatWorkflow } from './workflows/chatWorkflow';
import { OpenAIVoice } from '@mastra/voice-openai';

export const voiceProvider = new OpenAIVoice({
speechModel: { apiKey: process.env.OPENAI_API_KEY!, name: 'tts-1' },
listeningModel: {
apiKey: process.env.OPENAI_API_KEY!,
name: 'whisper-1',
},
Comment on lines +8 to +12
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: missing environment variable validation will cause runtime errors if API key is not set

Prompt To Fix With AI
This is a comment left during a code review.
Path: examples-backend/product-roadmap-backend/src/mastra/voiceStreamHandler.ts
Line: 8:12

Comment:
logic: missing environment variable validation will cause runtime errors if API key is not set

How can I resolve this? If you propose a fix, please make it concise.

});

/**
* Create workflow input data from the voice streaming parameters
*/
function createWorkflowInput(
baseInput: {
prompt: string;
additionalContext?: unknown;
temperature?: number;
maxTokens?: number;
systemPrompt?: string;
resourceId?: string;
threadId?: string;
},
controller: ReadableStreamDefaultController<Uint8Array>,
isStreaming: boolean = true,
isVoice: boolean = false
) {
return {
...baseInput,
streamController: isStreaming ? controller : undefined,
isVoice,
};
}

/**
* Handle voice streaming request
* Transcribes audio, then streams the LLM response back
*/
export async function handleVoiceStream(c: Context) {
try {
const form = await c.req.formData();
const audioFile = form.get('audio') as File;
const additionalContext = form.get('context') as string | null;
const settings = form.get('settings') as string | null;

let parsedAdditionalContext: unknown = undefined;
let parsedSettings: {
temperature?: number;
maxTokens?: number;
systemPrompt?: string;
resourceId?: string;
threadId?: string;
} = {};

// Parse additional context if provided
if (additionalContext) {
try {
parsedAdditionalContext = JSON.parse(additionalContext);
} catch {
// leave undefined if not valid JSON
}
}

// Parse voice settings if provided
if (settings) {
try {
parsedSettings = JSON.parse(settings);
} catch {
// use empty object if not valid JSON
}
}

if (!audioFile) {
return c.json({ error: 'audio required' }, 400);
}

// Convert audio file to buffer and then to stream
const buf = Buffer.from(await audioFile.arrayBuffer());

// Transcribe the audio
const transcription = await voiceProvider.listen(Readable.from(buf), {
filetype: 'webm',
});
Comment on lines +85 to +87
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: hardcoded filetype: 'webm' assumes input format - consider making dynamic

Suggested change
const transcription = await voiceProvider.listen(Readable.from(buf), {
filetype: 'webm',
});
const transcription = await voiceProvider.listen(Readable.from(buf), {
filetype: audioFile.type.includes('webm') ? 'webm' : 'wav',
});
Prompt To Fix With AI
This is a comment left during a code review.
Path: examples-backend/product-roadmap-backend/src/mastra/voiceStreamHandler.ts
Line: 85:87

Comment:
style: hardcoded `filetype: 'webm'` assumes input format - consider making dynamic

```suggestion
		const transcription = await voiceProvider.listen(Readable.from(buf), {
			filetype: audioFile.type.includes('webm') ? 'webm' : 'wav',
		});
```

How can I resolve this? If you propose a fix, please make it concise.


// Create SSE stream for real-time response
return createSSEStream(async (controller) => {
// Emit the transcription in the format that Cedar OS voice streaming expects
console.log('Emitting voice transcription:', transcription);
streamJSONEvent(controller, 'transcription', {
type: 'transcription',
transcription: transcription,
});

// Start the chat workflow with the transcription
const run = await chatWorkflow.createRunAsync();
const result = await run.start({
inputData: createWorkflowInput(
{
prompt: transcription,
additionalContext: parsedAdditionalContext ?? additionalContext,
temperature: parsedSettings.temperature,
maxTokens: parsedSettings.maxTokens,
systemPrompt: parsedSettings.systemPrompt,
resourceId: parsedSettings.resourceId,
threadId: parsedSettings.threadId,
},
controller,
true,
true
),
});

if (result.status !== 'success') {
console.error('Workflow failed:', result.status);
streamJSONEvent(controller, 'error', {
type: 'error',
error: `Workflow failed: ${result.status}`,
});
}

// Emit completion event
console.log('Voice stream completed successfully');
streamJSONEvent(controller, 'done', {
type: 'done',
completedItems: [],
});

// The workflow handles streaming the response through the controller
// No need to manually close here as the workflow will handle completion
});
} catch (error) {
console.error('Voice stream error:', error);
return c.json(
{ error: error instanceof Error ? error.message : 'Internal error' },
500
);
}
}
34 changes: 34 additions & 0 deletions examples-backend/product-roadmap-backend/src/mastra/voiceUtils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { OpenAIVoice } from '@mastra/voice-openai';
import { streamAudioFromText } from '../utils/streamUtils';

export const voiceProvider = new OpenAIVoice({
speechModel: { apiKey: process.env.OPENAI_API_KEY!, name: 'tts-1' },
listeningModel: {
apiKey: process.env.OPENAI_API_KEY!,
name: 'whisper-1',
},
});

export function createSpeakFunction() {
return (t: string, options?: Record<string, unknown>) =>
voiceProvider.speak(
t,
options as { speaker?: string; speed?: number }
) as unknown as Promise<ReadableStream>;
}

export async function handleVoiceOutput(
streamController: ReadableStreamDefaultController<Uint8Array>,
pendingText: string,
options: { voice?: string; speed?: number; eventType?: string } = {}
) {
if (!pendingText) return;

const speakFn = createSpeakFunction();
await streamAudioFromText(streamController, speakFn, pendingText, {
voice: 'alloy',
speed: 1.0,
eventType: 'audio',
...options,
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { z } from 'zod';
import { productRoadmapAgent } from '../agents/productRoadmapAgent';
import { handleTextStreamV2, streamJSONEvent } from '../../utils/streamUtils';
import { RuntimeContext } from '@mastra/core/runtime-context';
import { handleVoiceOutput } from '../voiceUtils';

// ---------------------------------------------
// Mastra nested streaming – emit placeholder events
Expand Down Expand Up @@ -162,6 +163,8 @@ export const ChatInputSchema = z.object({
resourceId: z.string().optional(),
threadId: z.string().optional(),
streamController: z.any().optional(),
// Voice support
isVoice: z.boolean().optional(),
// For structured output
output: z.any().optional(),
});
Expand Down Expand Up @@ -212,6 +215,7 @@ const buildAgentContext = createStep({
resourceId,
threadId,
additionalContext,
isVoice,
} = inputData;

const message = prompt;
Expand All @@ -225,6 +229,7 @@ const buildAgentContext = createStep({
streamController,
resourceId,
threadId,
isVoice,
};

return result;
Expand Down Expand Up @@ -260,6 +265,7 @@ const callAgent = createStep({
resourceId,
threadId,
additionalContext,
isVoice,
} = inputData;

const runtimeContext = new RuntimeContext();
Expand All @@ -280,16 +286,35 @@ const callAgent = createStep({
);

let finalText = '';
let pendingText = '';

for await (const chunk of streamResult.fullStream) {
if (chunk.type === 'text-delta') {
finalText += chunk.payload.text;
await handleTextStreamV2(chunk.payload.text, streamController);

if (isVoice && streamController) {
// Accumulate text for voice synthesis
pendingText += chunk.payload.text;
} else {
// Regular text streaming
await handleTextStreamV2(chunk.payload.text, streamController);
}
} else if (chunk.type === 'tool-result' || chunk.type === 'tool-call') {
// Handle any pending text before tool events for voice
if (isVoice && streamController && pendingText) {
await handleVoiceOutput(streamController, pendingText);
pendingText = '';
}

streamJSONEvent(streamController, chunk.type, chunk);
}
}

// Handle any remaining pending text for voice
if (isVoice && streamController && pendingText) {
await handleVoiceOutput(streamController, pendingText);
}

return { content: finalText };
},
});
Expand Down
Loading
Loading