From b6230d62d80b37b6d414b0a59f7856e9f5a934f4 Mon Sep 17 00:00:00 2001 From: Daniel Johnston Date: Thu, 5 Jun 2025 11:03:55 -0700 Subject: [PATCH 1/2] fix(agents): Introduced a final transcription buffer to manage collected text more effectively, ensuring accurate agent speech commits are added to chatCtx --- agents/src/pipeline/pipeline_agent.ts | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/agents/src/pipeline/pipeline_agent.ts b/agents/src/pipeline/pipeline_agent.ts index f83f4e05..08385dc2 100644 --- a/agents/src/pipeline/pipeline_agent.ts +++ b/agents/src/pipeline/pipeline_agent.ts @@ -6,6 +6,7 @@ import type { NoiseCancellationOptions, RemoteParticipant, Room, + TranscriptionSegment, } from '@livekit/rtc-node'; import { AudioSource, @@ -275,6 +276,7 @@ export class VoicePipelineAgent extends (EventEmitter as new () => TypedEmitter< #lastSpeechTime?: number; #transcriptionId?: string; #agentTranscribedText = ''; + #agentFinalTranscriptionBuffer: TranscriptionSegment[] = []; constructor( /** Voice Activity Detection instance. */ @@ -749,7 +751,9 @@ export class VoicePipelineAgent extends (EventEmitter as new () => TypedEmitter< } commitUserQuestionIfNeeded(); - let collectedText = this.#agentTranscribedText; + let collectedText = this.#agentFinalTranscriptionBuffer + .map((segment) => segment.text) + .join(' '); const isUsingTools = handle.source instanceof LLMStream && !!handle.source.functionCalls.length; const interrupted = handle.interrupted; @@ -763,6 +767,7 @@ export class VoicePipelineAgent extends (EventEmitter as new () => TypedEmitter< const msg = ChatMessage.create({ text: collectedText, role: ChatRole.ASSISTANT }); this.chatCtx.messages.push(msg); + this.#agentFinalTranscriptionBuffer = []; handle.markSpeechCommitted(); if (interrupted) { @@ -925,8 +930,20 @@ export class VoicePipelineAgent extends (EventEmitter as new () => TypedEmitter< ): SynthesisHandle { const synchronizer = new TextAudioSynchronizer(defaultTextSyncOptions); // TODO: where possible we would want to use deltas instead of full text segments, esp for LLM streams over the streamText API + let lastTranscriptionSegmentId: string | undefined; synchronizer.on('textUpdated', async (text) => { this.#agentTranscribedText = text.text; + if (lastTranscriptionSegmentId !== text.id) { + this.#agentFinalTranscriptionBuffer.push(text); + lastTranscriptionSegmentId = text.id; + } else { + const transcriptionSegment = + this.#agentFinalTranscriptionBuffer[this.#agentFinalTranscriptionBuffer.length - 1]; + if (transcriptionSegment) { + transcriptionSegment.text = text.text; + } + } + await this.#publishTranscription( this.#room!.localParticipant!.identity!, this.#agentPublication?.sid ?? '', From fab41892715e6c987269caae2c8565acefcfacfd Mon Sep 17 00:00:00 2001 From: Daniel Johnston Date: Fri, 6 Jun 2025 11:59:18 -0700 Subject: [PATCH 2/2] fix(pipeline): add delay before processing new sentence for proper chatCtx --- agents/src/pipeline/pipeline_agent.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/agents/src/pipeline/pipeline_agent.ts b/agents/src/pipeline/pipeline_agent.ts index 08385dc2..abe686a4 100644 --- a/agents/src/pipeline/pipeline_agent.ts +++ b/agents/src/pipeline/pipeline_agent.ts @@ -750,7 +750,8 @@ export class VoicePipelineAgent extends (EventEmitter as new () => TypedEmitter< if (handle.interrupted) break; } commitUserQuestionIfNeeded(); - + // wait for the new sentence delay + await new Promise((resolve) => setTimeout(resolve, defaultTextSyncOptions.newSentenceDelay)); let collectedText = this.#agentFinalTranscriptionBuffer .map((segment) => segment.text) .join(' ');