From d4cbfcfee80f6c5484d8ece9a3b43fee870e5941 Mon Sep 17 00:00:00 2001 From: Shubhrakanti Ganguly Date: Thu, 29 May 2025 16:32:21 -0400 Subject: [PATCH 1/2] init --- agents/src/stt/stt.ts | 105 +++++++++++++++++++++++++++++------------- 1 file changed, 73 insertions(+), 32 deletions(-) diff --git a/agents/src/stt/stt.ts b/agents/src/stt/stt.ts index ed8a5a47..3d864286 100644 --- a/agents/src/stt/stt.ts +++ b/agents/src/stt/stt.ts @@ -4,12 +4,16 @@ import type { AudioFrame } from '@livekit/rtc-node'; import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter'; import { EventEmitter } from 'node:events'; -import type { ReadableStream } from 'node:stream/web'; +import type { + ReadableStream, + ReadableStreamDefaultReader, + WritableStreamDefaultWriter, +} from 'node:stream/web'; import { log } from '../log.js'; import type { STTMetrics } from '../metrics/base.js'; import { DeferredReadableStream } from '../stream/deferred_stream.js'; +import { IdentityTransform } from '../stream/identity_transform.js'; import type { AudioBuffer } from '../utils.js'; -import { AsyncIterableQueue } from '../utils.js'; /** Indicates start/middle/end of speech */ export enum SpeechEventType { @@ -140,102 +144,139 @@ export abstract class STT extends (EventEmitter as new () => TypedEmitter { protected static readonly FLUSH_SENTINEL = Symbol('FLUSH_SENTINEL'); - protected input = new AsyncIterableQueue(); - protected output = new AsyncIterableQueue(); - protected queue = new AsyncIterableQueue(); + protected input = new IdentityTransform(); + protected output = new IdentityTransform(); + + protected inputReader: ReadableStreamDefaultReader< + AudioFrame | typeof SpeechStream.FLUSH_SENTINEL + >; + protected outputWriter: WritableStreamDefaultWriter; + abstract label: string; - protected closed = false; #stt: STT; private deferredInputStream: DeferredReadableStream; private logger = log(); + private inputWriter: WritableStreamDefaultWriter; + private outputReader: ReadableStreamDefaultReader; + private metricsStream: ReadableStream; + private closed = false; + private inputClosed = false; + constructor(stt: STT) { this.#stt = stt; this.deferredInputStream = new DeferredReadableStream(); + + this.inputWriter = this.input.writable.getWriter(); + this.inputReader = this.input.readable.getReader(); + this.outputWriter = this.output.writable.getWriter(); + + const [outputStream, metricsStream] = this.output.readable.tee(); + this.metricsStream = metricsStream; + this.outputReader = outputStream.getReader(); + + this.pumpDeferredStream(); this.monitorMetrics(); - this.mainTask(); } - protected async mainTask() { - // TODO(AJS-35): Implement STT with webstreams API + /** + * Reads from the deferred input stream and forwards chunks to the input writer. + * + * Note: we can't just do this.deferredInputStream.stream.pipeTo(this.input.writable) + * because the inputWriter locks the this.input.writable stream. All writes must go through + * the inputWriter. + */ + private async pumpDeferredStream() { + const reader = this.deferredInputStream.stream.getReader(); try { - const inputStream = this.deferredInputStream.stream; - const reader = inputStream.getReader(); while (true) { const { done, value } = await reader.read(); if (done) break; - this.pushFrame(value); + await this.inputWriter.write(value); } - } catch (error) { - this.logger.error('Error in STTStream mainTask:', error); + } catch (e) { + this.logger.error(`Error pumping deferred stream: ${e}`); + throw e; + } finally { + reader.releaseLock(); } } protected async monitorMetrics() { const startTime = process.hrtime.bigint(); + const metricsReader = this.metricsStream.getReader(); + + while (true) { + const { done, value } = await metricsReader.read(); + if (done) { + break; + } + + if (value.type !== SpeechEventType.RECOGNITION_USAGE) continue; - for await (const event of this.queue) { - this.output.put(event); - if (event.type !== SpeechEventType.RECOGNITION_USAGE) continue; const duration = process.hrtime.bigint() - startTime; const metrics: STTMetrics = { timestamp: Date.now(), - requestId: event.requestId!, + requestId: value.requestId!, duration: Math.trunc(Number(duration / BigInt(1000000))), label: this.label, - audioDuration: event.recognitionUsage!.audioDuration, + audioDuration: value.recognitionUsage!.audioDuration, streamed: true, }; this.#stt.emit(SpeechEventType.METRICS_COLLECTED, metrics); } - this.output.close(); } updateInputStream(audioStream: ReadableStream) { this.deferredInputStream.setSource(audioStream); } - /** Push an audio frame to the STT */ + /** @deprecated Use `updateInputStream` instead */ pushFrame(frame: AudioFrame) { - if (this.input.closed) { + // TODO: remove this method in future version + if (this.inputClosed) { throw new Error('Input is closed'); } if (this.closed) { throw new Error('Stream is closed'); } - this.input.put(frame); + this.inputWriter.write(frame); } /** Flush the STT, causing it to process all pending text */ flush() { - if (this.input.closed) { + if (this.inputClosed) { throw new Error('Input is closed'); } if (this.closed) { throw new Error('Stream is closed'); } - this.input.put(SpeechStream.FLUSH_SENTINEL); + this.inputWriter.write(SpeechStream.FLUSH_SENTINEL); } /** Mark the input as ended and forbid additional pushes */ endInput() { - if (this.input.closed) { + if (this.inputClosed) { throw new Error('Input is closed'); } if (this.closed) { throw new Error('Stream is closed'); } - this.input.close(); + this.inputClosed = true; + this.inputWriter.close(); } - next(): Promise> { - return this.output.next(); + async next(): Promise> { + return this.outputReader.read().then(({ done, value }) => { + if (done) { + return { done: true, value: undefined }; + } + return { done: false, value }; + }); } /** Close both the input and output of the STT stream */ close() { - this.input.close(); - this.queue.close(); - this.output.close(); + this.input.writable.close(); this.closed = true; } From 60bc9f19b200a9606437fa13fc13f156b247a21d Mon Sep 17 00:00:00 2001 From: Shubhrakanti Ganguly Date: Thu, 29 May 2025 16:58:52 -0400 Subject: [PATCH 2/2] finalize --- agents/src/stt/stream_adapter.ts | 15 +++++++++------ agents/src/stt/stt.ts | 5 ++--- plugins/deepgram/src/stt.ts | 20 +++++++++++--------- 3 files changed, 22 insertions(+), 18 deletions(-) diff --git a/agents/src/stt/stream_adapter.ts b/agents/src/stt/stream_adapter.ts index 0368ff3b..279de653 100644 --- a/agents/src/stt/stream_adapter.ts +++ b/agents/src/stt/stream_adapter.ts @@ -53,11 +53,14 @@ export class StreamAdapterWrapper extends SpeechStream { async #run() { const forwardInput = async () => { - for await (const input of this.input) { - if (input === SpeechStream.FLUSH_SENTINEL) { + while (true) { + const { done, value } = await this.inputReader.read(); + if (done) break; + + if (value === SpeechStream.FLUSH_SENTINEL) { this.#vadStream.flush(); } else { - this.#vadStream.pushFrame(input); + this.#vadStream.pushFrame(value); } } this.#vadStream.endInput(); @@ -67,10 +70,10 @@ export class StreamAdapterWrapper extends SpeechStream { for await (const ev of this.#vadStream) { switch (ev.type) { case VADEventType.START_OF_SPEECH: - this.output.put({ type: SpeechEventType.START_OF_SPEECH }); + this.outputWriter.write({ type: SpeechEventType.START_OF_SPEECH }); break; case VADEventType.END_OF_SPEECH: - this.output.put({ type: SpeechEventType.END_OF_SPEECH }); + this.outputWriter.write({ type: SpeechEventType.END_OF_SPEECH }); try { const event = await this.#stt.recognize(ev.frames); @@ -78,7 +81,7 @@ export class StreamAdapterWrapper extends SpeechStream { continue; } - this.output.put(event); + this.outputWriter.write(event); break; } catch (error) { let logger = log(); diff --git a/agents/src/stt/stt.ts b/agents/src/stt/stt.ts index 3d864286..ec11f196 100644 --- a/agents/src/stt/stt.ts +++ b/agents/src/stt/stt.ts @@ -151,7 +151,8 @@ export abstract class SpeechStream implements AsyncIterableIterator AudioFrame | typeof SpeechStream.FLUSH_SENTINEL >; protected outputWriter: WritableStreamDefaultWriter; - + protected closed = false; + protected inputClosed = false; abstract label: string; #stt: STT; private deferredInputStream: DeferredReadableStream; @@ -159,8 +160,6 @@ export abstract class SpeechStream implements AsyncIterableIterator private inputWriter: WritableStreamDefaultWriter; private outputReader: ReadableStreamDefaultReader; private metricsStream: ReadableStream; - private closed = false; - private inputClosed = false; constructor(stt: STT) { this.#stt = stt; diff --git a/plugins/deepgram/src/stt.ts b/plugins/deepgram/src/stt.ts index 560b26a7..5fe838da 100644 --- a/plugins/deepgram/src/stt.ts +++ b/plugins/deepgram/src/stt.ts @@ -125,7 +125,6 @@ export class SpeechStream extends stt.SpeechStream { constructor(stt: STT, opts: STTOptions) { super(stt); this.#opts = opts; - this.closed = false; this.#audioEnergyFilter = new AudioEnergyFilter(); this.#run(); @@ -134,7 +133,7 @@ export class SpeechStream extends stt.SpeechStream { async #run(maxRetry = 32) { let retries = 0; let ws: WebSocket; - while (!this.input.closed) { + while (!this.inputClosed) { const streamURL = new URL(API_BASE_URL_V1); const params = { model: this.#opts.model, @@ -193,7 +192,7 @@ export class SpeechStream extends stt.SpeechStream { } } - this.closed = true; + this.close(); } updateOptions(opts: Partial) { @@ -222,7 +221,10 @@ export class SpeechStream extends stt.SpeechStream { samples100Ms, ); - for await (const data of this.input) { + while (true) { + const { done, value: data } = await this.inputReader.read(); + if (done) break; + let frames: AudioFrame[]; if (data === SpeechStream.FLUSH_SENTINEL) { frames = stream.flush(); @@ -270,7 +272,7 @@ export class SpeechStream extends stt.SpeechStream { // It's also possible we receive a transcript without a SpeechStarted event. if (this.#speaking) return; this.#speaking = true; - this.queue.put({ type: stt.SpeechEventType.START_OF_SPEECH }); + this.outputWriter.write({ type: stt.SpeechEventType.START_OF_SPEECH }); break; } // see this page: @@ -288,16 +290,16 @@ export class SpeechStream extends stt.SpeechStream { if (alternatives[0] && alternatives[0].text) { if (!this.#speaking) { this.#speaking = true; - this.queue.put({ type: stt.SpeechEventType.START_OF_SPEECH }); + this.outputWriter.write({ type: stt.SpeechEventType.START_OF_SPEECH }); } if (isFinal) { - this.queue.put({ + this.outputWriter.write({ type: stt.SpeechEventType.FINAL_TRANSCRIPT, alternatives: [alternatives[0], ...alternatives.slice(1)], }); } else { - this.queue.put({ + this.outputWriter.write({ type: stt.SpeechEventType.INTERIM_TRANSCRIPT, alternatives: [alternatives[0], ...alternatives.slice(1)], }); @@ -309,7 +311,7 @@ export class SpeechStream extends stt.SpeechStream { // a non-empty transcript (deepgram doesn't have a SpeechEnded event) if (isEndpoint && this.#speaking) { this.#speaking = false; - this.queue.put({ type: stt.SpeechEventType.END_OF_SPEECH }); + this.outputWriter.write({ type: stt.SpeechEventType.END_OF_SPEECH }); } break;