| title | category | tags | difficulty | description | demonstrates | |||||
|---|---|---|---|---|---|---|---|---|---|---|
TTS Node Override |
pipeline-tts |
|
intermediate |
Shows how to override the default TTS node to do replacements on the output. |
|
This example overrides tts_node to intercept LLM output before it reaches TTS. The agent swaps casual "lol" responses with a <laugh> tag so the TTS voice reads them differently.
- Add a
.envin this directory with your LiveKit credentials:LIVEKIT_URL=your_livekit_url LIVEKIT_API_KEY=your_api_key LIVEKIT_API_SECRET=your_api_secret DEEPGRAM_API_KEY=your_deepgram_key OPENAI_API_KEY=your_openai_key RIME_API_KEY=your_rime_key - Install dependencies:
pip install "livekit-agents[silero]" python-dotenv livekit-plugins-deepgram livekit-plugins-openai livekit-plugins-rime
Load your environment variables and set up logging to see the before/after replacements. Create an AgentServer to manage sessions.
import logging
from typing import AsyncIterable
from dotenv import load_dotenv
from livekit.agents import JobContext, JobProcess, AgentServer, cli, Agent, AgentSession, ModelSettings
from livekit.plugins import deepgram, openai, silero, rime
load_dotenv()
logger = logging.getLogger("tts_node")
logger.setLevel(logging.INFO)
server = AgentServer()Preload the VAD model once per process to reduce connection latency.
def prewarm(proc: JobProcess):
proc.userdata["vad"] = silero.VAD.load()
server.setup_fnc = prewarmUse Deepgram STT, GPT-4o, and Rime TTS. Override tts_node to wrap the outgoing text stream and modify it before synthesis.
class TtsNodeOverrideAgent(Agent):
def __init__(self, vad) -> None:
super().__init__(
instructions="""
You are a helpful assistant communicating through voice.
Feel free to use "lol" in your responses when something is funny.
""",
stt=deepgram.STT(),
llm=openai.LLM(model="gpt-4o"),
tts=rime.TTS(model="arcana"),
vad=vad
)
async def tts_node(self, text: AsyncIterable[str], model_settings: ModelSettings):
"""Modify the TTS output by replacing 'lol' with '<laugh>'."""Iterate over the async text stream, replace "lol"/"LOL", log the swap, and delegate to the default TTS node with the modified stream.
async def process_text():
async for chunk in text:
original_chunk = chunk
modified_chunk = chunk.replace("lol", "<laugh>").replace("LOL", "<laugh>")
if original_chunk != modified_chunk:
logger.info(f"TTS original: '{original_chunk}'")
logger.info(f"TTS modified: '{modified_chunk}'")
yield modified_chunk
return Agent.default.tts_node(self, process_text(), model_settings)Send a short greeting so you can immediately hear the behavior when you respond with something funny.
async def on_enter(self):
await self.session.say("Hi there! Is there anything I can help you with? If you say something funny, I might respond with lol.")Start the agent in your LiveKit room; any "lol" in responses will be replaced before TTS renders audio.
@server.rtc_session()
async def entrypoint(ctx: JobContext):
ctx.log_context_fields = {"room": ctx.room.name}
session = AgentSession()
await session.start(
agent=TtsNodeOverrideAgent(vad=ctx.proc.userdata["vad"]),
room=ctx.room
)
await ctx.connect()python tts_node_modifications.py consoletts_nodelets you intercept the text stream headed to TTS.- A wrapper coroutine replaces target phrases and logs the changes.
- The modified stream is passed to the default TTS pipeline so timing/buffering are preserved.
- Rime TTS renders the adjusted text to audio.
import logging
from typing import AsyncIterable
from dotenv import load_dotenv
from livekit.agents import JobContext, JobProcess, AgentServer, cli, Agent, AgentSession, ModelSettings
from livekit.plugins import deepgram, openai, silero, rime
load_dotenv()
logger = logging.getLogger("tts_node")
logger.setLevel(logging.INFO)
class TtsNodeOverrideAgent(Agent):
def __init__(self, vad) -> None:
super().__init__(
instructions="""
You are a helpful assistant communicating through voice.
Feel free to use "lol" in your responses when something is funny.
""",
stt=deepgram.STT(),
llm=openai.LLM(model="gpt-4o"),
tts=rime.TTS(model="arcana"),
vad=vad
)
async def tts_node(self, text: AsyncIterable[str], model_settings: ModelSettings):
"""Modify the TTS output by replacing 'lol' with '<laugh>'."""
async def process_text():
async for chunk in text:
original_chunk = chunk
modified_chunk = chunk.replace("lol", "<laugh>").replace("LOL", "<laugh>")
if original_chunk != modified_chunk:
logger.info(f"TTS original: '{original_chunk}'")
logger.info(f"TTS modified: '{modified_chunk}'")
yield modified_chunk
return Agent.default.tts_node(self, process_text(), model_settings)
async def on_enter(self):
await self.session.say(f"Hi there! Is there anything I can help you with? If you say something funny, I might respond with lol.")
server = AgentServer()
def prewarm(proc: JobProcess):
proc.userdata["vad"] = silero.VAD.load()
server.setup_fnc = prewarm
@server.rtc_session()
async def entrypoint(ctx: JobContext):
ctx.log_context_fields = {"room": ctx.room.name}
session = AgentSession()
await session.start(
agent=TtsNodeOverrideAgent(vad=ctx.proc.userdata["vad"]),
room=ctx.room
)
await ctx.connect()
if __name__ == "__main__":
cli.run_app(server)