From e00dcc853284a8f77aaaea14e9fb296c72690999 Mon Sep 17 00:00:00 2001 From: David Chen Date: Mon, 8 Sep 2025 13:19:22 -0700 Subject: [PATCH 01/23] init media devices --- examples/local-audio/full_duplex.py | 118 ++++++ examples/local-audio/publish_mic.py | 51 +++ livekit-rtc/livekit/rtc/__init__.py | 5 + livekit-rtc/livekit/rtc/media_devices.py | 448 +++++++++++++++++++++++ 4 files changed, 622 insertions(+) create mode 100644 examples/local-audio/full_duplex.py create mode 100644 examples/local-audio/publish_mic.py create mode 100644 livekit-rtc/livekit/rtc/media_devices.py diff --git a/examples/local-audio/full_duplex.py b/examples/local-audio/full_duplex.py new file mode 100644 index 00000000..44364258 --- /dev/null +++ b/examples/local-audio/full_duplex.py @@ -0,0 +1,118 @@ +import os +import asyncio +import logging + +from livekit import rtc + + +async def main() -> None: + logging.basicConfig(level=logging.INFO) + + url = os.getenv("LIVEKIT_URL") + token = os.getenv("LIVEKIT_TOKEN") + if not url or not token: + raise RuntimeError("LIVEKIT_URL and LIVEKIT_TOKEN must be set in env") + + room = rtc.Room() + devices = rtc.MediaDevices() + + # Open microphone with AEC and prepare a player for remote audio feeding AEC reverse stream + mic = devices.open_microphone(enable_aec=True) + player = devices.open_output_player(apm_for_reverse=mic.apm) + + # Mixer for all remote audio streams + mixer = rtc.AudioMixer(sample_rate=48000, num_channels=1) + + # Track stream bookkeeping for cleanup + streams_by_pub: dict[str, rtc.AudioStream] = {} + streams_by_participant: dict[str, set[rtc.AudioStream]] = {} + + async def _remove_stream(stream: rtc.AudioStream, participant_sid: str | None = None, pub_sid: str | None = None) -> None: + try: + mixer.remove_stream(stream) + except Exception: + pass + try: + await stream.aclose() + except Exception: + pass + if participant_sid and participant_sid in streams_by_participant: + streams_by_participant.get(participant_sid, set()).discard(stream) + if not streams_by_participant.get(participant_sid): + streams_by_participant.pop(participant_sid, None) + if pub_sid is not None: + streams_by_pub.pop(pub_sid, None) + + async def on_track_subscribed(track: rtc.Track, publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant): + if track.kind == rtc.TrackKind.KIND_AUDIO: + stream = rtc.AudioStream(track, sample_rate=48000, num_channels=1) + streams_by_pub[publication.sid] = stream + streams_by_participant.setdefault(participant.sid, set()).add(stream) + mixer.add_stream(stream) + logging.info("subscribed to audio from %s", participant.identity) + + room.on("track_subscribed", on_track_subscribed) + + def on_track_unsubscribed(track: rtc.Track, publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant): + stream = streams_by_pub.get(publication.sid) + if stream is not None: + asyncio.create_task(_remove_stream(stream, participant.sid, publication.sid)) + logging.info("unsubscribed from audio of %s", participant.identity) + + room.on("track_unsubscribed", on_track_unsubscribed) + + def on_track_unpublished(publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant): + stream = streams_by_pub.get(publication.sid) + if stream is not None: + asyncio.create_task(_remove_stream(stream, participant.sid, publication.sid)) + logging.info("track unpublished: %s from %s", publication.sid, participant.identity) + + room.on("track_unpublished", on_track_unpublished) + + def on_participant_disconnected(participant: rtc.RemoteParticipant): + streams = list(streams_by_participant.pop(participant.sid, set())) + for stream in streams: + # Best-effort discover publication sid + pub_sid = None + for k, v in list(streams_by_pub.items()): + if v is stream: + pub_sid = k + break + asyncio.create_task(_remove_stream(stream, participant.sid, pub_sid)) + logging.info("participant disconnected: %s", participant.identity) + + room.on("participant_disconnected", on_participant_disconnected) + + try: + await room.connect(url, token) + logging.info("connected to room %s", room.name) + + # Publish microphone + track = rtc.LocalAudioTrack.create_audio_track("mic", mic.source) + pub_opts = rtc.TrackPublishOptions() + pub_opts.source = rtc.TrackSource.SOURCE_MICROPHONE + await room.local_participant.publish_track(track, pub_opts) + logging.info("published local microphone") + + # Start playing mixed remote audio + play_task = asyncio.create_task(player.play(mixer)) + + # Run until Ctrl+C + while True: + await asyncio.sleep(1) + except KeyboardInterrupt: + pass + finally: + await mic.aclose() + await mixer.aclose() + await player.aclose() + try: + await room.disconnect() + except Exception: + pass + + +if __name__ == "__main__": + asyncio.run(main()) + + diff --git a/examples/local-audio/publish_mic.py b/examples/local-audio/publish_mic.py new file mode 100644 index 00000000..a49fc415 --- /dev/null +++ b/examples/local-audio/publish_mic.py @@ -0,0 +1,51 @@ +import os +import sys +import asyncio +import logging + +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "livekit-rtc"))) + +from livekit import rtc +from livekit.rtc import MediaDevices + +async def main() -> None: + logging.basicConfig(level=logging.INFO) + + url = os.getenv("LIVEKIT_URL") + token = os.getenv("LIVEKIT_TOKEN") + if not url or not token: + raise RuntimeError("LIVEKIT_URL and LIVEKIT_TOKEN must be set in env") + + room = rtc.Room() + + # Create media devices helper and open default microphone with AEC enabled + devices = MediaDevices() + mic = devices.open_microphone(enable_aec=True) + + try: + await room.connect(url, token) + logging.info("connected to room %s", room.name) + + track = rtc.LocalAudioTrack.create_audio_track("mic", mic.source) + pub_opts = rtc.TrackPublishOptions() + pub_opts.source = rtc.TrackSource.SOURCE_MICROPHONE + await room.local_participant.publish_track(track, pub_opts) + logging.info("published local microphone") + + # Run until Ctrl+C + while True: + await asyncio.sleep(1) + except KeyboardInterrupt: + pass + finally: + await mic.aclose() + try: + await room.disconnect() + except Exception: + pass + + +if __name__ == "__main__": + asyncio.run(main()) + + diff --git a/livekit-rtc/livekit/rtc/__init__.py b/livekit-rtc/livekit/rtc/__init__.py index bc051494..89e74426 100644 --- a/livekit-rtc/livekit/rtc/__init__.py +++ b/livekit-rtc/livekit/rtc/__init__.py @@ -90,6 +90,11 @@ from .audio_resampler import AudioResampler, AudioResamplerQuality from .audio_mixer import AudioMixer from .apm import AudioProcessingModule +try: + from .media_devices import MediaDevices + _HAS_MEDIA_DEVICES = True +except Exception: # pragma: no cover - optional dependency (sounddevice) + _HAS_MEDIA_DEVICES = False from .utils import combine_audio_frames from .rpc import RpcError, RpcInvocationData from .synchronizer import AVSynchronizer diff --git a/livekit-rtc/livekit/rtc/media_devices.py b/livekit-rtc/livekit/rtc/media_devices.py new file mode 100644 index 00000000..c365a1a8 --- /dev/null +++ b/livekit-rtc/livekit/rtc/media_devices.py @@ -0,0 +1,448 @@ +# Copyright 2025 LiveKit, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import asyncio +from dataclasses import dataclass +from typing import Any, AsyncIterator, Optional + +import numpy as np +import sounddevice as sd +import threading + +from . import AudioSource +from .audio_frame import AudioFrame +from .apm import AudioProcessingModule + +""" +Media device helpers built on top of the `sounddevice` library. + +This module provides a small, Pythonic helper around native audio I/O for +LiveKit RTC usage: + +- Capture the default microphone and feed frames into `rtc.AudioSource`. +- Optionally enable audio processing via `rtc.AudioProcessingModule` (AEC, + noise suppression, high-pass filter, AGC). Frames are processed in 10 ms + chunks as required by APM. +- Play arbitrary audio frames to the default speaker. When AEC is enabled on + the microphone, the `OutputPlayer` can feed the APM reverse stream so echo + cancellation has access to render (speaker) audio. + +Notes on AEC wiring: +- AEC requires feeding both capture (mic) and reverse (speaker) paths into + the same APM instance. This module does not automatically capture output from + other players. To enable AEC, the output player feeds APM's reverse stream + and we set stream delays derived from PortAudio timing. +""" + + +DEFAULT_SAMPLE_RATE = 48000 +DEFAULT_CHANNELS = 1 +FRAME_SAMPLES = 480 # 10 ms at 48 kHz +BLOCKSIZE = 4800 # 100 ms I/O buffer size for sounddevice + + +def _ensure_loop(loop: Optional[asyncio.AbstractEventLoop]) -> asyncio.AbstractEventLoop: + return loop or asyncio.get_event_loop() + + +class _APMDelayEstimator: + """Thread-safe store for last known output (render) delay in seconds. + + The sounddevice callbacks are invoked on PortAudio's threads. This helper allows + sharing the latest output delay measurement with the input callback so we can set + APM's combined stream delay (render + capture), following the approach in + stream_example.py. + """ + + def __init__(self) -> None: + self._lock = threading.Lock() + self._output_delay_sec: float = 0.0 + + def set_output_delay(self, delay_sec: float) -> None: + with self._lock: + self._output_delay_sec = float(delay_sec) + + def get_output_delay(self) -> float: + with self._lock: + return self._output_delay_sec + + +@dataclass +class MicrophoneCapture: + """Holds resources for an active microphone capture. + + Attributes: + source: `rtc.AudioSource` that receives captured frames. This can be + published as a `LocalAudioTrack`. + input_stream: Underlying `sounddevice.InputStream`. + task: Async task that drains a queue and calls `source.capture_frame`. + apm: Optional `rtc.AudioProcessingModule` used to process 10 ms frames + (AEC, NS, HPF, AGC). When performing echo cancellation, pass this + instance to `open_output_player` so reverse frames are provided. + delay_estimator: Internal helper used to combine capture and render delays. + """ + source: AudioSource + input_stream: sd.InputStream + task: asyncio.Task + apm: Optional[AudioProcessingModule] + delay_estimator: Optional[_APMDelayEstimator] + + async def aclose(self) -> None: + """Stop capture and close underlying resources.""" + if self.task and not self.task.done(): + self.task.cancel() + try: + await self.task + except asyncio.CancelledError: + pass + try: + self.input_stream.stop() + self.input_stream.close() + except Exception: + pass + + +class OutputPlayer: + """Simple audio output helper using `sounddevice.OutputStream`. + + When `apm_for_reverse` is provided, this player will feed the same PCM it + renders (in 10 ms frames) into the APM reverse path so that echo + cancellation can correlate mic input with speaker output. + """ + def __init__( + self, + *, + sample_rate: int = DEFAULT_SAMPLE_RATE, + num_channels: int = DEFAULT_CHANNELS, + blocksize: int = BLOCKSIZE, + apm_for_reverse: Optional[AudioProcessingModule] = None, + output_device: Optional[int] = None, + delay_estimator: Optional[_APMDelayEstimator] = None, + ) -> None: + self._sample_rate = sample_rate + self._num_channels = num_channels + self._blocksize = blocksize + self._apm = apm_for_reverse + self._buffer = bytearray() + self._buffer_lock = asyncio.Lock() + self._play_task: Optional[asyncio.Task] = None + self._running = False + self._delay_estimator = delay_estimator + + def _callback(outdata: np.ndarray, frame_count: int, time_info: Any, status: Any) -> None: + # Pull PCM int16 from buffer; zero if not enough + bytes_needed = frame_count * 2 + # Important: Do not take asyncio locks in realtime callbacks. We keep the + # critical section minimal and tolerate occasional underruns. + available = len(self._buffer) + if available >= bytes_needed: + chunk = self._buffer[:bytes_needed] + outdata[:, 0] = np.frombuffer(chunk, dtype=np.int16, count=frame_count) + del self._buffer[:bytes_needed] + elif available > 0: + outdata[: available // 2, 0] = np.frombuffer( + self._buffer[:available], dtype=np.int16, count=available // 2 + ) + outdata[available // 2 :, 0] = 0 + del self._buffer[:available] + else: + outdata.fill(0) + + # Measure render (output) delay: time until DAC from current callback time + try: + output_delay_sec = float(time_info.outputBufferDacTime - time_info.currentTime) + if self._delay_estimator is not None: + self._delay_estimator.set_output_delay(output_delay_sec) + except Exception: + pass + + if self._apm is not None: + # Feed reverse stream in 10 ms frames for AEC + num_chunks = frame_count // FRAME_SAMPLES + for i in range(num_chunks): + start = i * FRAME_SAMPLES + end = start + FRAME_SAMPLES + if end > frame_count: + break + render_chunk = outdata[start:end, 0] + render_frame = AudioFrame( + render_chunk.tobytes(), FRAME_SAMPLES, 1, FRAME_SAMPLES + ) + try: + self._apm.process_reverse_stream(render_frame) + except Exception: + # Ignore reverse stream errors in callback + pass + + self._stream = sd.OutputStream( + callback=_callback, + dtype="int16", + channels=num_channels, + device=output_device, + samplerate=sample_rate, + blocksize=blocksize, + ) + + async def play(self, stream: AsyncIterator[AudioFrame]) -> None: + """Render an async iterator of `AudioFrame` to the output device. + + The raw PCM data is appended to an internal buffer consumed by the + realtime callback. If an APM was supplied, reverse frames are fed for AEC. + """ + self._running = True + self._stream.start() + try: + async for frame in stream: + if not self._running: + break + # Append raw PCM bytes for callback consumption + self._buffer.extend(frame.data.tobytes()) + finally: + self._running = False + try: + self._stream.stop() + self._stream.close() + except Exception: + pass + + async def aclose(self) -> None: + """Stop playback and close the output stream.""" + self._running = False + try: + self._stream.stop() + self._stream.close() + except Exception: + pass + + +class MediaDevices: + """High-level interface to native audio devices. + + This class is inspired by the browser `MediaDevices` concept but uses Python + conventions and the `sounddevice` library. It provides: + + - Device enumeration helpers. + - Microphone capture into `rtc.AudioSource` with optional APM processing. + - Output player that can feed APM reverse stream for AEC. + + Design notes: + - APM operates on 10 ms frames; this module slices input/output audio into + `FRAME_SAMPLES` for processing calls. + - For AEC to be effective, render audio that could leak back into the mic + should be played through `OutputPlayer` with the same `apm` instance. + - Timing alignment: this helper does not attempt to set device latency on + APM; for most setups the default behavior is acceptable. + """ + + def __init__( + self, + *, + loop: Optional[asyncio.AbstractEventLoop] = None, + input_sample_rate: int = DEFAULT_SAMPLE_RATE, + output_sample_rate: int = DEFAULT_SAMPLE_RATE, + num_channels: int = DEFAULT_CHANNELS, + blocksize: int = BLOCKSIZE, + ) -> None: + self._loop = _ensure_loop(loop) + self._in_sr = input_sample_rate + self._out_sr = output_sample_rate + self._channels = num_channels + self._blocksize = blocksize + self._delay_estimator: Optional[_APMDelayEstimator] = None + + # Device enumeration + def list_input_devices(self) -> list[dict[str, Any]]: + """List available input devices. + + Returns a list of dictionaries with the `sounddevice` metadata and an + added `index` key corresponding to the device index. + """ + devices = sd.query_devices() + result: list[dict[str, Any]] = [] + for idx, dev in enumerate(devices): + if dev.get("max_input_channels", 0) > 0: + result.append({"index": idx, **dev}) + return result + + def list_output_devices(self) -> list[dict[str, Any]]: + """List available output devices with indices.""" + devices = sd.query_devices() + result: list[dict[str, Any]] = [] + for idx, dev in enumerate(devices): + if dev.get("max_output_channels", 0) > 0: + result.append({"index": idx, **dev}) + return result + + def default_input_device(self) -> Optional[int]: + """Return the default input device index (or None).""" + dev = sd.default.device + return dev[0] if isinstance(dev, (list, tuple)) else None + + def default_output_device(self) -> Optional[int]: + """Return the default output device index (or None).""" + dev = sd.default.device + return dev[1] if isinstance(dev, (list, tuple)) else None + + # Capture / Playback + def open_microphone( + self, + *, + enable_aec: bool = True, + noise_suppression: bool = True, + high_pass_filter: bool = True, + auto_gain_control: bool = True, + input_device: Optional[int] = None, + queue_capacity: int = 200, + input_channel_index: Optional[int] = None, + ) -> MicrophoneCapture: + """Open the default (or chosen) microphone and start capture. + + Frames are sliced into 10 ms chunks. If any processing option is enabled, + an `AudioProcessingModule` is created and applied to each frame before it + is queued for `AudioSource.capture_frame`. + + To enable AEC end-to-end, pass the returned `apm` to + `open_output_player(apm_for_reverse=...)` and route remote audio through + that player so reverse frames are provided to APM. + + Args: + enable_aec: Enable acoustic echo cancellation. + noise_suppression: Enable noise suppression. + high_pass_filter: Enable high-pass filtering. + auto_gain_control: Enable automatic gain control. + input_device: Optional input device index (default system device if None). + queue_capacity: Max queued frames between callback and async pump. + input_channel_index: Optional zero-based device channel to capture. If provided, + only that channel is opened (via sounddevice mapping) and used as mono input. + + Returns: + MicrophoneCapture: Holder with `source`, `apm`, and `aclose()`. + """ + loop = self._loop + source = AudioSource(self._in_sr, self._channels, loop=loop) + apm: Optional[AudioProcessingModule] = None + if enable_aec or noise_suppression or high_pass_filter or auto_gain_control: + apm = AudioProcessingModule( + echo_cancellation=enable_aec, + noise_suppression=noise_suppression, + high_pass_filter=high_pass_filter, + auto_gain_control=auto_gain_control, + ) + delay_estimator: Optional[_APMDelayEstimator] = _APMDelayEstimator() if apm is not None else None + # Store the shared estimator on the device helper so the output player can reuse it + self._delay_estimator = delay_estimator + + # Queue from callback to async task + q: asyncio.Queue[AudioFrame] = asyncio.Queue(maxsize=queue_capacity) + + def _input_callback(indata: np.ndarray, frame_count: int, time_info: Any, status: Any) -> None: + # Slice into 10 ms frames, optionally APM, enqueue for async capture + # Compute input (capture) delay using PortAudio timing; combine with last + # measured output delay to provide APM stream delay in milliseconds. + if apm is not None: + try: + input_delay_sec = float(time_info.currentTime - time_info.inputBufferAdcTime) + output_delay_sec = float(delay_estimator.get_output_delay()) if delay_estimator else 0.0 + total_delay_ms = int(max((input_delay_sec + output_delay_sec) * 1000.0, 0.0)) + try: + apm.set_stream_delay_ms(total_delay_ms) + except Exception: + pass + except Exception: + pass + num_frames = frame_count // FRAME_SAMPLES + for i in range(num_frames): + start = i * FRAME_SAMPLES + end = start + FRAME_SAMPLES + if end > frame_count: + break + chunk = indata[start:end, 0] + frame = AudioFrame( + data=chunk.tobytes(), + samples_per_channel=FRAME_SAMPLES, + sample_rate=self._in_sr, + num_channels=self._channels, + ) + if apm is not None: + try: + apm.process_stream(frame) + except Exception: + # Continue even if APM processing fails + pass + try: + # Non-blocking: drop if full + if not q.full(): + loop.call_soon_threadsafe(q.put_nowait, frame) + except Exception: + pass + + # If a specific device channel is requested, map to that channel only. + # sounddevice's channel mapping is 1-based (PortAudio convention). + mapping = None + channels_arg = self._channels + if input_channel_index is not None: + channels_arg = 1 + mapping = [int(input_channel_index) + 1] + + input_stream = sd.InputStream( + callback=_input_callback, + dtype="int16", + channels=channels_arg, + device=input_device, + samplerate=self._in_sr, + blocksize=self._blocksize, + mapping=mapping, + ) + input_stream.start() + + async def _pump() -> None: + # Drain queue into AudioSource + while True: + try: + frame = await q.get() + except asyncio.CancelledError: + break + try: + await source.capture_frame(frame) + except Exception: + # Ignore capture errors to keep the pump alive + pass + + task = asyncio.create_task(_pump()) + return MicrophoneCapture(source=source, input_stream=input_stream, task=task, apm=apm, delay_estimator=delay_estimator) + + def open_output_player( + self, + *, + apm_for_reverse: Optional[AudioProcessingModule] = None, + output_device: Optional[int] = None, + ) -> OutputPlayer: + """Create an `OutputPlayer` for rendering and (optionally) AEC reverse. + + Args: + apm_for_reverse: Pass the APM used by the microphone to enable AEC. + output_device: Optional output device index (default system device if None). + """ + return OutputPlayer( + sample_rate=self._out_sr, + num_channels=self._channels, + blocksize=self._blocksize, + apm_for_reverse=apm_for_reverse, + output_device=output_device, + delay_estimator=self._delay_estimator, + ) + + From 8f13bbdd4922b693dd84a7c8a2784c7a3e80fa46 Mon Sep 17 00:00:00 2001 From: David Chen Date: Tue, 9 Sep 2025 11:38:56 -0700 Subject: [PATCH 02/23] add MediaDevices to rtc/__init__.py --- livekit-rtc/livekit/rtc/__init__.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/livekit-rtc/livekit/rtc/__init__.py b/livekit-rtc/livekit/rtc/__init__.py index 89e74426..1004f666 100644 --- a/livekit-rtc/livekit/rtc/__init__.py +++ b/livekit-rtc/livekit/rtc/__init__.py @@ -184,3 +184,7 @@ "AudioProcessingModule", "__version__", ] + +# add MediaDevices if available +if _HAS_MEDIA_DEVICES: + __all__.append("MediaDevices") From cd9d87330b6e74afe8e776d2a3dbd83b5fa5ac74 Mon Sep 17 00:00:00 2001 From: David Chen Date: Tue, 9 Sep 2025 12:24:43 -0700 Subject: [PATCH 03/23] clean up examples --- examples/local-audio/full_duplex.py | 2 +- examples/local-audio/publish_mic.py | 4 ++++ livekit-rtc/rust-sdks | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/examples/local-audio/full_duplex.py b/examples/local-audio/full_duplex.py index 44364258..2b32bd90 100644 --- a/examples/local-audio/full_duplex.py +++ b/examples/local-audio/full_duplex.py @@ -95,7 +95,7 @@ def on_participant_disconnected(participant: rtc.RemoteParticipant): logging.info("published local microphone") # Start playing mixed remote audio - play_task = asyncio.create_task(player.play(mixer)) + asyncio.create_task(player.play(mixer)) # Run until Ctrl+C while True: diff --git a/examples/local-audio/publish_mic.py b/examples/local-audio/publish_mic.py index a49fc415..03c5c131 100644 --- a/examples/local-audio/publish_mic.py +++ b/examples/local-audio/publish_mic.py @@ -2,6 +2,7 @@ import sys import asyncio import logging +from dotenv import load_dotenv, find_dotenv sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "livekit-rtc"))) @@ -11,6 +12,9 @@ async def main() -> None: logging.basicConfig(level=logging.INFO) + # Load environment variables from a .env file if present + load_dotenv(find_dotenv()) + url = os.getenv("LIVEKIT_URL") token = os.getenv("LIVEKIT_TOKEN") if not url or not token: diff --git a/livekit-rtc/rust-sdks b/livekit-rtc/rust-sdks index 5ded9c72..68ea1426 160000 --- a/livekit-rtc/rust-sdks +++ b/livekit-rtc/rust-sdks @@ -1 +1 @@ -Subproject commit 5ded9c724dd22339f4ef1bd32383002d594c700a +Subproject commit 68ea1426f825841a5c2870e9cde2ef917227d7d8 From b58dd7de68a261e70daceb8cac463be0a9aa0f8d Mon Sep 17 00:00:00 2001 From: David Chen Date: Wed, 10 Sep 2025 14:11:36 -0700 Subject: [PATCH 04/23] fix syntax to create inputstream --- livekit-rtc/livekit/rtc/media_devices.py | 40 ++++++++++++++++++------ 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/livekit-rtc/livekit/rtc/media_devices.py b/livekit-rtc/livekit/rtc/media_devices.py index c365a1a8..12f939d8 100644 --- a/livekit-rtc/livekit/rtc/media_devices.py +++ b/livekit-rtc/livekit/rtc/media_devices.py @@ -16,6 +16,8 @@ import asyncio from dataclasses import dataclass +import inspect +import logging from typing import Any, AsyncIterator, Optional import numpy as np @@ -397,15 +399,35 @@ def _input_callback(indata: np.ndarray, frame_count: int, time_info: Any, status channels_arg = 1 mapping = [int(input_channel_index) + 1] - input_stream = sd.InputStream( - callback=_input_callback, - dtype="int16", - channels=channels_arg, - device=input_device, - samplerate=self._in_sr, - blocksize=self._blocksize, - mapping=mapping, - ) + # Build kwargs and conditionally include 'mapping' based on sounddevice version + stream_kwargs: dict[str, Any] = { + "callback": _input_callback, + "dtype": "int16", + "channels": channels_arg, + "device": input_device, + "samplerate": self._in_sr, + "blocksize": self._blocksize, + } + try: + init_params = inspect.signature(sd.InputStream.__init__).parameters + if "mapping" in init_params and mapping is not None: + stream_kwargs["mapping"] = mapping + elif mapping is not None: + logging.getLogger(__name__).warning( + "sounddevice.InputStream does not support 'mapping' in this version; " + "ignoring input_channel_index=%s", + input_channel_index, + ) + except Exception: + # If inspection fails for any reason, fall back without mapping + if mapping is not None: + logging.getLogger(__name__).warning( + "Unable to inspect sounddevice.InputStream.__init__; " + "ignoring input_channel_index=%s", + input_channel_index, + ) + + input_stream = sd.InputStream(**stream_kwargs) input_stream.start() async def _pump() -> None: From 825e9d52799a180cd4d1f2d839937e3186caa159 Mon Sep 17 00:00:00 2001 From: David Chen Date: Thu, 11 Sep 2025 12:19:54 -0700 Subject: [PATCH 05/23] fix audio output thru mixer --- examples/local-audio/full_duplex.py | 7 ++++++- examples/local-audio/publish_mic.py | 5 +---- livekit-rtc/livekit/rtc/audio_mixer.py | 4 ++++ livekit-rtc/livekit/rtc/media_devices.py | 2 +- 4 files changed, 12 insertions(+), 6 deletions(-) diff --git a/examples/local-audio/full_duplex.py b/examples/local-audio/full_duplex.py index 2b32bd90..4b926e98 100644 --- a/examples/local-audio/full_duplex.py +++ b/examples/local-audio/full_duplex.py @@ -1,6 +1,7 @@ import os import asyncio import logging +from dotenv import load_dotenv, find_dotenv from livekit import rtc @@ -8,12 +9,16 @@ async def main() -> None: logging.basicConfig(level=logging.INFO) + # Load environment variables from a .env file if present + load_dotenv(find_dotenv()) + url = os.getenv("LIVEKIT_URL") token = os.getenv("LIVEKIT_TOKEN") if not url or not token: raise RuntimeError("LIVEKIT_URL and LIVEKIT_TOKEN must be set in env") room = rtc.Room() + devices = rtc.MediaDevices() # Open microphone with AEC and prepare a player for remote audio feeding AEC reverse stream @@ -43,7 +48,7 @@ async def _remove_stream(stream: rtc.AudioStream, participant_sid: str | None = if pub_sid is not None: streams_by_pub.pop(pub_sid, None) - async def on_track_subscribed(track: rtc.Track, publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant): + def on_track_subscribed(track: rtc.Track, publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant): if track.kind == rtc.TrackKind.KIND_AUDIO: stream = rtc.AudioStream(track, sample_rate=48000, num_channels=1) streams_by_pub[publication.sid] = stream diff --git a/examples/local-audio/publish_mic.py b/examples/local-audio/publish_mic.py index 03c5c131..1e3aa665 100644 --- a/examples/local-audio/publish_mic.py +++ b/examples/local-audio/publish_mic.py @@ -4,10 +4,7 @@ import logging from dotenv import load_dotenv, find_dotenv -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "livekit-rtc"))) - from livekit import rtc -from livekit.rtc import MediaDevices async def main() -> None: logging.basicConfig(level=logging.INFO) @@ -23,7 +20,7 @@ async def main() -> None: room = rtc.Room() # Create media devices helper and open default microphone with AEC enabled - devices = MediaDevices() + devices = rtc.MediaDevices() mic = devices.open_microphone(enable_aec=True) try: diff --git a/livekit-rtc/livekit/rtc/audio_mixer.py b/livekit-rtc/livekit/rtc/audio_mixer.py index e2f28c6b..aa3437bb 100644 --- a/livekit-rtc/livekit/rtc/audio_mixer.py +++ b/livekit-rtc/livekit/rtc/audio_mixer.py @@ -184,6 +184,10 @@ async def _get_contribution( except StopAsyncIteration: exhausted = True break + # AudioStream may yield either AudioFrame or AudioFrameEvent; unwrap if needed + if hasattr(frame, "frame"): + frame = frame.frame # type: ignore[assignment] + new_data = np.frombuffer(frame.data.tobytes(), dtype=np.int16).reshape( -1, self._num_channels ) diff --git a/livekit-rtc/livekit/rtc/media_devices.py b/livekit-rtc/livekit/rtc/media_devices.py index 12f939d8..446e09c2 100644 --- a/livekit-rtc/livekit/rtc/media_devices.py +++ b/livekit-rtc/livekit/rtc/media_devices.py @@ -181,7 +181,7 @@ def _callback(outdata: np.ndarray, frame_count: int, time_info: Any, status: Any break render_chunk = outdata[start:end, 0] render_frame = AudioFrame( - render_chunk.tobytes(), FRAME_SAMPLES, 1, FRAME_SAMPLES + render_chunk.tobytes(), self._sample_rate, 1, FRAME_SAMPLES ) try: self._apm.process_reverse_stream(render_frame) From 74582ec686a35d491cf66f6caf9cfa0bf6c87460 Mon Sep 17 00:00:00 2001 From: David Chen Date: Thu, 11 Sep 2025 12:21:22 -0700 Subject: [PATCH 06/23] remove unused import --- examples/local-audio/publish_mic.py | 1 - 1 file changed, 1 deletion(-) diff --git a/examples/local-audio/publish_mic.py b/examples/local-audio/publish_mic.py index 1e3aa665..b1585d79 100644 --- a/examples/local-audio/publish_mic.py +++ b/examples/local-audio/publish_mic.py @@ -1,5 +1,4 @@ import os -import sys import asyncio import logging from dotenv import load_dotenv, find_dotenv From 9b2f4663127002d66d68e1bf008e4ed60ec429d6 Mon Sep 17 00:00:00 2001 From: David Chen Date: Thu, 11 Sep 2025 12:25:34 -0700 Subject: [PATCH 07/23] fix linter error --- livekit-rtc/livekit/rtc/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/livekit-rtc/livekit/rtc/__init__.py b/livekit-rtc/livekit/rtc/__init__.py index 1004f666..193b6f75 100644 --- a/livekit-rtc/livekit/rtc/__init__.py +++ b/livekit-rtc/livekit/rtc/__init__.py @@ -91,7 +91,7 @@ from .audio_mixer import AudioMixer from .apm import AudioProcessingModule try: - from .media_devices import MediaDevices + from .media_devices import MediaDevices as MediaDevices _HAS_MEDIA_DEVICES = True except Exception: # pragma: no cover - optional dependency (sounddevice) _HAS_MEDIA_DEVICES = False From efb5473200c32753b3d412332b22ab5ff7e2b4d1 Mon Sep 17 00:00:00 2001 From: David Chen Date: Thu, 11 Sep 2025 13:31:45 -0700 Subject: [PATCH 08/23] ruff format --- examples/local-audio/full_duplex.py | 49 ++++++++++++++++++------ examples/local-audio/publish_mic.py | 3 +- livekit-rtc/livekit/rtc/__init__.py | 2 + livekit-rtc/livekit/rtc/audio_mixer.py | 3 -- livekit-rtc/livekit/rtc/media_devices.py | 24 +++++++++--- 5 files changed, 58 insertions(+), 23 deletions(-) diff --git a/examples/local-audio/full_duplex.py b/examples/local-audio/full_duplex.py index 4b926e98..1ea7550c 100644 --- a/examples/local-audio/full_duplex.py +++ b/examples/local-audio/full_duplex.py @@ -11,14 +11,14 @@ async def main() -> None: # Load environment variables from a .env file if present load_dotenv(find_dotenv()) - + url = os.getenv("LIVEKIT_URL") token = os.getenv("LIVEKIT_TOKEN") if not url or not token: raise RuntimeError("LIVEKIT_URL and LIVEKIT_TOKEN must be set in env") room = rtc.Room() - + devices = rtc.MediaDevices() # Open microphone with AEC and prepare a player for remote audio feeding AEC reverse stream @@ -32,7 +32,9 @@ async def main() -> None: streams_by_pub: dict[str, rtc.AudioStream] = {} streams_by_participant: dict[str, set[rtc.AudioStream]] = {} - async def _remove_stream(stream: rtc.AudioStream, participant_sid: str | None = None, pub_sid: str | None = None) -> None: + async def _remove_stream( + stream: rtc.AudioStream, participant_sid: str | None = None, pub_sid: str | None = None + ) -> None: try: mixer.remove_stream(stream) except Exception: @@ -48,17 +50,40 @@ async def _remove_stream(stream: rtc.AudioStream, participant_sid: str | None = if pub_sid is not None: streams_by_pub.pop(pub_sid, None) - def on_track_subscribed(track: rtc.Track, publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant): + class _FrameOnlyStream: + def __init__(self, inner: rtc.AudioStream) -> None: + self._inner = inner + + def __aiter__(self): + return self + + async def __anext__(self) -> rtc.AudioFrame: + event = await self._inner.__anext__() + return event.frame + + async def aclose(self) -> None: + await self._inner.aclose() + + def on_track_subscribed( + track: rtc.Track, + publication: rtc.RemoteTrackPublication, + participant: rtc.RemoteParticipant, + ): if track.kind == rtc.TrackKind.KIND_AUDIO: - stream = rtc.AudioStream(track, sample_rate=48000, num_channels=1) - streams_by_pub[publication.sid] = stream - streams_by_participant.setdefault(participant.sid, set()).add(stream) - mixer.add_stream(stream) + event_stream = rtc.AudioStream(track, sample_rate=48000, num_channels=1) + frame_stream = _FrameOnlyStream(event_stream) + streams_by_pub[publication.sid] = frame_stream + streams_by_participant.setdefault(participant.sid, set()).add(frame_stream) + mixer.add_stream(frame_stream) logging.info("subscribed to audio from %s", participant.identity) room.on("track_subscribed", on_track_subscribed) - def on_track_unsubscribed(track: rtc.Track, publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant): + def on_track_unsubscribed( + track: rtc.Track, + publication: rtc.RemoteTrackPublication, + participant: rtc.RemoteParticipant, + ): stream = streams_by_pub.get(publication.sid) if stream is not None: asyncio.create_task(_remove_stream(stream, participant.sid, publication.sid)) @@ -66,7 +91,9 @@ def on_track_unsubscribed(track: rtc.Track, publication: rtc.RemoteTrackPublicat room.on("track_unsubscribed", on_track_unsubscribed) - def on_track_unpublished(publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant): + def on_track_unpublished( + publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant + ): stream = streams_by_pub.get(publication.sid) if stream is not None: asyncio.create_task(_remove_stream(stream, participant.sid, publication.sid)) @@ -119,5 +146,3 @@ def on_participant_disconnected(participant: rtc.RemoteParticipant): if __name__ == "__main__": asyncio.run(main()) - - diff --git a/examples/local-audio/publish_mic.py b/examples/local-audio/publish_mic.py index b1585d79..4a6853f5 100644 --- a/examples/local-audio/publish_mic.py +++ b/examples/local-audio/publish_mic.py @@ -5,6 +5,7 @@ from livekit import rtc + async def main() -> None: logging.basicConfig(level=logging.INFO) @@ -47,5 +48,3 @@ async def main() -> None: if __name__ == "__main__": asyncio.run(main()) - - diff --git a/livekit-rtc/livekit/rtc/__init__.py b/livekit-rtc/livekit/rtc/__init__.py index 193b6f75..565b8882 100644 --- a/livekit-rtc/livekit/rtc/__init__.py +++ b/livekit-rtc/livekit/rtc/__init__.py @@ -90,8 +90,10 @@ from .audio_resampler import AudioResampler, AudioResamplerQuality from .audio_mixer import AudioMixer from .apm import AudioProcessingModule + try: from .media_devices import MediaDevices as MediaDevices + _HAS_MEDIA_DEVICES = True except Exception: # pragma: no cover - optional dependency (sounddevice) _HAS_MEDIA_DEVICES = False diff --git a/livekit-rtc/livekit/rtc/audio_mixer.py b/livekit-rtc/livekit/rtc/audio_mixer.py index aa3437bb..31078d6d 100644 --- a/livekit-rtc/livekit/rtc/audio_mixer.py +++ b/livekit-rtc/livekit/rtc/audio_mixer.py @@ -184,9 +184,6 @@ async def _get_contribution( except StopAsyncIteration: exhausted = True break - # AudioStream may yield either AudioFrame or AudioFrameEvent; unwrap if needed - if hasattr(frame, "frame"): - frame = frame.frame # type: ignore[assignment] new_data = np.frombuffer(frame.data.tobytes(), dtype=np.int16).reshape( -1, self._num_channels diff --git a/livekit-rtc/livekit/rtc/media_devices.py b/livekit-rtc/livekit/rtc/media_devices.py index 446e09c2..35bdab12 100644 --- a/livekit-rtc/livekit/rtc/media_devices.py +++ b/livekit-rtc/livekit/rtc/media_devices.py @@ -96,6 +96,7 @@ class MicrophoneCapture: instance to `open_output_player` so reverse frames are provided. delay_estimator: Internal helper used to combine capture and render delays. """ + source: AudioSource input_stream: sd.InputStream task: asyncio.Task @@ -124,6 +125,7 @@ class OutputPlayer: renders (in 10 ms frames) into the APM reverse path so that echo cancellation can correlate mic input with speaker output. """ + def __init__( self, *, @@ -343,21 +345,27 @@ def open_microphone( high_pass_filter=high_pass_filter, auto_gain_control=auto_gain_control, ) - delay_estimator: Optional[_APMDelayEstimator] = _APMDelayEstimator() if apm is not None else None + delay_estimator: Optional[_APMDelayEstimator] = ( + _APMDelayEstimator() if apm is not None else None + ) # Store the shared estimator on the device helper so the output player can reuse it self._delay_estimator = delay_estimator # Queue from callback to async task q: asyncio.Queue[AudioFrame] = asyncio.Queue(maxsize=queue_capacity) - def _input_callback(indata: np.ndarray, frame_count: int, time_info: Any, status: Any) -> None: + def _input_callback( + indata: np.ndarray, frame_count: int, time_info: Any, status: Any + ) -> None: # Slice into 10 ms frames, optionally APM, enqueue for async capture # Compute input (capture) delay using PortAudio timing; combine with last # measured output delay to provide APM stream delay in milliseconds. if apm is not None: try: input_delay_sec = float(time_info.currentTime - time_info.inputBufferAdcTime) - output_delay_sec = float(delay_estimator.get_output_delay()) if delay_estimator else 0.0 + output_delay_sec = ( + float(delay_estimator.get_output_delay()) if delay_estimator else 0.0 + ) total_delay_ms = int(max((input_delay_sec + output_delay_sec) * 1000.0, 0.0)) try: apm.set_stream_delay_ms(total_delay_ms) @@ -444,7 +452,13 @@ async def _pump() -> None: pass task = asyncio.create_task(_pump()) - return MicrophoneCapture(source=source, input_stream=input_stream, task=task, apm=apm, delay_estimator=delay_estimator) + return MicrophoneCapture( + source=source, + input_stream=input_stream, + task=task, + apm=apm, + delay_estimator=delay_estimator, + ) def open_output_player( self, @@ -466,5 +480,3 @@ def open_output_player( output_device=output_device, delay_estimator=self._delay_estimator, ) - - From 7f1d59e26d6355b23fe24c8ef77fd9db3261470c Mon Sep 17 00:00:00 2001 From: David Chen Date: Thu, 11 Sep 2025 15:44:45 -0700 Subject: [PATCH 09/23] allow AudioMixer to unwrap AudioFrameEvent --- examples/local-audio/full_duplex.py | 23 ++++------------------- livekit-rtc/livekit/rtc/audio_mixer.py | 3 +++ 2 files changed, 7 insertions(+), 19 deletions(-) diff --git a/examples/local-audio/full_duplex.py b/examples/local-audio/full_duplex.py index 1ea7550c..80147db4 100644 --- a/examples/local-audio/full_duplex.py +++ b/examples/local-audio/full_duplex.py @@ -50,31 +50,16 @@ async def _remove_stream( if pub_sid is not None: streams_by_pub.pop(pub_sid, None) - class _FrameOnlyStream: - def __init__(self, inner: rtc.AudioStream) -> None: - self._inner = inner - - def __aiter__(self): - return self - - async def __anext__(self) -> rtc.AudioFrame: - event = await self._inner.__anext__() - return event.frame - - async def aclose(self) -> None: - await self._inner.aclose() - def on_track_subscribed( track: rtc.Track, publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant, ): if track.kind == rtc.TrackKind.KIND_AUDIO: - event_stream = rtc.AudioStream(track, sample_rate=48000, num_channels=1) - frame_stream = _FrameOnlyStream(event_stream) - streams_by_pub[publication.sid] = frame_stream - streams_by_participant.setdefault(participant.sid, set()).add(frame_stream) - mixer.add_stream(frame_stream) + stream = rtc.AudioStream(track, sample_rate=48000, num_channels=1) + streams_by_pub[publication.sid] = stream + streams_by_participant.setdefault(participant.sid, set()).add(stream) + mixer.add_stream(stream) logging.info("subscribed to audio from %s", participant.identity) room.on("track_subscribed", on_track_subscribed) diff --git a/livekit-rtc/livekit/rtc/audio_mixer.py b/livekit-rtc/livekit/rtc/audio_mixer.py index 31078d6d..aa3437bb 100644 --- a/livekit-rtc/livekit/rtc/audio_mixer.py +++ b/livekit-rtc/livekit/rtc/audio_mixer.py @@ -184,6 +184,9 @@ async def _get_contribution( except StopAsyncIteration: exhausted = True break + # AudioStream may yield either AudioFrame or AudioFrameEvent; unwrap if needed + if hasattr(frame, "frame"): + frame = frame.frame # type: ignore[assignment] new_data = np.frombuffer(frame.data.tobytes(), dtype=np.int16).reshape( -1, self._num_channels From c8f8c0c4cb6a5e8b5bc55e63e0c6abb10c161071 Mon Sep 17 00:00:00 2001 From: David Chen Date: Thu, 11 Sep 2025 15:45:57 -0700 Subject: [PATCH 10/23] rename dir to match convention --- examples/{local-audio => local_audio}/full_duplex.py | 0 examples/{local-audio => local_audio}/publish_mic.py | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename examples/{local-audio => local_audio}/full_duplex.py (100%) rename examples/{local-audio => local_audio}/publish_mic.py (100%) diff --git a/examples/local-audio/full_duplex.py b/examples/local_audio/full_duplex.py similarity index 100% rename from examples/local-audio/full_duplex.py rename to examples/local_audio/full_duplex.py diff --git a/examples/local-audio/publish_mic.py b/examples/local_audio/publish_mic.py similarity index 100% rename from examples/local-audio/publish_mic.py rename to examples/local_audio/publish_mic.py From 30ee1838b53bcf2247da4ef6ff4a1b177018219c Mon Sep 17 00:00:00 2001 From: David Chen Date: Thu, 11 Sep 2025 16:05:56 -0700 Subject: [PATCH 11/23] rename methods to be more clear --- examples/local_audio/full_duplex.py | 4 ++-- examples/local_audio/publish_mic.py | 2 +- livekit-rtc/livekit/rtc/media_devices.py | 30 ++++++++++++------------ 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/examples/local_audio/full_duplex.py b/examples/local_audio/full_duplex.py index 80147db4..f65d8eb8 100644 --- a/examples/local_audio/full_duplex.py +++ b/examples/local_audio/full_duplex.py @@ -22,8 +22,8 @@ async def main() -> None: devices = rtc.MediaDevices() # Open microphone with AEC and prepare a player for remote audio feeding AEC reverse stream - mic = devices.open_microphone(enable_aec=True) - player = devices.open_output_player(apm_for_reverse=mic.apm) + mic = devices.open_input(enable_aec=True) + player = devices.open_output(apm_for_reverse=mic.apm) # Mixer for all remote audio streams mixer = rtc.AudioMixer(sample_rate=48000, num_channels=1) diff --git a/examples/local_audio/publish_mic.py b/examples/local_audio/publish_mic.py index 4a6853f5..a556173a 100644 --- a/examples/local_audio/publish_mic.py +++ b/examples/local_audio/publish_mic.py @@ -21,7 +21,7 @@ async def main() -> None: # Create media devices helper and open default microphone with AEC enabled devices = rtc.MediaDevices() - mic = devices.open_microphone(enable_aec=True) + mic = devices.open_input(enable_aec=True) try: await room.connect(url, token) diff --git a/livekit-rtc/livekit/rtc/media_devices.py b/livekit-rtc/livekit/rtc/media_devices.py index 35bdab12..c497fc70 100644 --- a/livekit-rtc/livekit/rtc/media_devices.py +++ b/livekit-rtc/livekit/rtc/media_devices.py @@ -34,12 +34,12 @@ This module provides a small, Pythonic helper around native audio I/O for LiveKit RTC usage: -- Capture the default microphone and feed frames into `rtc.AudioSource`. +- Capture the default audio input device and feed frames into `rtc.AudioSource`. - Optionally enable audio processing via `rtc.AudioProcessingModule` (AEC, noise suppression, high-pass filter, AGC). Frames are processed in 10 ms chunks as required by APM. - Play arbitrary audio frames to the default speaker. When AEC is enabled on - the microphone, the `OutputPlayer` can feed the APM reverse stream so echo + the input, the `OutputPlayer` can feed the APM reverse stream so echo cancellation has access to render (speaker) audio. Notes on AEC wiring: @@ -83,8 +83,8 @@ def get_output_delay(self) -> float: @dataclass -class MicrophoneCapture: - """Holds resources for an active microphone capture. +class InputCapture: + """Holds resources for an active audio input capture. Attributes: source: `rtc.AudioSource` that receives captured frames. This can be @@ -239,8 +239,8 @@ class MediaDevices: conventions and the `sounddevice` library. It provides: - Device enumeration helpers. - - Microphone capture into `rtc.AudioSource` with optional APM processing. - - Output player that can feed APM reverse stream for AEC. + - Audio input capture into `rtc.AudioSource` with optional APM processing. + - Audio output player that can feed APM reverse stream for AEC. Design notes: - APM operates on 10 ms frames; this module slices input/output audio into @@ -301,7 +301,7 @@ def default_output_device(self) -> Optional[int]: return dev[1] if isinstance(dev, (list, tuple)) else None # Capture / Playback - def open_microphone( + def open_input( self, *, enable_aec: bool = True, @@ -309,17 +309,17 @@ def open_microphone( high_pass_filter: bool = True, auto_gain_control: bool = True, input_device: Optional[int] = None, - queue_capacity: int = 200, + queue_capacity: int = 50, input_channel_index: Optional[int] = None, - ) -> MicrophoneCapture: - """Open the default (or chosen) microphone and start capture. + ) -> InputCapture: + """Open the default (or chosen) audio input device and start capture. Frames are sliced into 10 ms chunks. If any processing option is enabled, an `AudioProcessingModule` is created and applied to each frame before it is queued for `AudioSource.capture_frame`. To enable AEC end-to-end, pass the returned `apm` to - `open_output_player(apm_for_reverse=...)` and route remote audio through + `open_output(apm_for_reverse=...)` and route remote audio through that player so reverse frames are provided to APM. Args: @@ -333,7 +333,7 @@ def open_microphone( only that channel is opened (via sounddevice mapping) and used as mono input. Returns: - MicrophoneCapture: Holder with `source`, `apm`, and `aclose()`. + InputCapture: Holder with `source`, `apm`, and `aclose()`. """ loop = self._loop source = AudioSource(self._in_sr, self._channels, loop=loop) @@ -452,7 +452,7 @@ async def _pump() -> None: pass task = asyncio.create_task(_pump()) - return MicrophoneCapture( + return InputCapture( source=source, input_stream=input_stream, task=task, @@ -460,7 +460,7 @@ async def _pump() -> None: delay_estimator=delay_estimator, ) - def open_output_player( + def open_output( self, *, apm_for_reverse: Optional[AudioProcessingModule] = None, @@ -469,7 +469,7 @@ def open_output_player( """Create an `OutputPlayer` for rendering and (optionally) AEC reverse. Args: - apm_for_reverse: Pass the APM used by the microphone to enable AEC. + apm_for_reverse: Pass the APM used by the audio input device to enable AEC. output_device: Optional output device index (default system device if None). """ return OutputPlayer( From 89fb1ba2ab6950eac784b2fa39db462479495533 Mon Sep 17 00:00:00 2001 From: David Chen Date: Wed, 24 Sep 2025 15:24:46 -0700 Subject: [PATCH 12/23] update example --- examples/local_audio/full_duplex.py | 20 +++++++++++++++++--- examples/local_audio/publish_mic.py | 24 +++++++++++++++++++----- 2 files changed, 36 insertions(+), 8 deletions(-) diff --git a/examples/local_audio/full_duplex.py b/examples/local_audio/full_duplex.py index f65d8eb8..f6e188b1 100644 --- a/examples/local_audio/full_duplex.py +++ b/examples/local_audio/full_duplex.py @@ -3,7 +3,7 @@ import logging from dotenv import load_dotenv, find_dotenv -from livekit import rtc +from livekit import api, rtc async def main() -> None: @@ -13,8 +13,9 @@ async def main() -> None: load_dotenv(find_dotenv()) url = os.getenv("LIVEKIT_URL") - token = os.getenv("LIVEKIT_TOKEN") - if not url or not token: + api_key = os.getenv("LIVEKIT_API_KEY") + api_secret = os.getenv("LIVEKIT_API_SECRET") + if not url or not api_key or not api_secret: raise RuntimeError("LIVEKIT_URL and LIVEKIT_TOKEN must be set in env") room = rtc.Room() @@ -99,6 +100,19 @@ def on_participant_disconnected(participant: rtc.RemoteParticipant): logging.info("participant disconnected: %s", participant.identity) room.on("participant_disconnected", on_participant_disconnected) + + token = ( + api.AccessToken(api_key, api_secret) + .with_identity("local-audio") + .with_name("Local Audio") + .with_grants( + api.VideoGrants( + room_join=True, + room="local-audio", + ) + ) + .to_jwt() + ) try: await room.connect(url, token) diff --git a/examples/local_audio/publish_mic.py b/examples/local_audio/publish_mic.py index a556173a..40fa7ed0 100644 --- a/examples/local_audio/publish_mic.py +++ b/examples/local_audio/publish_mic.py @@ -3,7 +3,7 @@ import logging from dotenv import load_dotenv, find_dotenv -from livekit import rtc +from livekit import api, rtc async def main() -> None: @@ -13,16 +13,30 @@ async def main() -> None: load_dotenv(find_dotenv()) url = os.getenv("LIVEKIT_URL") - token = os.getenv("LIVEKIT_TOKEN") - if not url or not token: - raise RuntimeError("LIVEKIT_URL and LIVEKIT_TOKEN must be set in env") + api_key = os.getenv("LIVEKIT_API_KEY") + api_secret = os.getenv("LIVEKIT_API_SECRET") + if not url or not api_key or not api_secret: + raise RuntimeError("LIVEKIT_URL and LIVEKIT_API_KEY and LIVEKIT_API_SECRET must be set in env") room = rtc.Room() # Create media devices helper and open default microphone with AEC enabled devices = rtc.MediaDevices() mic = devices.open_input(enable_aec=True) - + + token = ( + api.AccessToken(api_key, api_secret) + .with_identity("local-audio") + .with_name("Local Audio") + .with_grants( + api.VideoGrants( + room_join=True, + room="local-audio", + ) + ) + .to_jwt() + ) + try: await room.connect(url, token) logging.info("connected to room %s", room.name) From c48e1ebf4909d25409bccdbbd2b3bf342168899e Mon Sep 17 00:00:00 2001 From: David Chen Date: Wed, 24 Sep 2025 15:43:59 -0700 Subject: [PATCH 13/23] update comments --- livekit-rtc/livekit/rtc/media_devices.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/livekit-rtc/livekit/rtc/media_devices.py b/livekit-rtc/livekit/rtc/media_devices.py index c497fc70..dc88e779 100644 --- a/livekit-rtc/livekit/rtc/media_devices.py +++ b/livekit-rtc/livekit/rtc/media_devices.py @@ -65,8 +65,7 @@ class _APMDelayEstimator: The sounddevice callbacks are invoked on PortAudio's threads. This helper allows sharing the latest output delay measurement with the input callback so we can set - APM's combined stream delay (render + capture), following the approach in - stream_example.py. + APM's combined stream delay (render + capture). """ def __init__(self) -> None: @@ -235,9 +234,6 @@ async def aclose(self) -> None: class MediaDevices: """High-level interface to native audio devices. - This class is inspired by the browser `MediaDevices` concept but uses Python - conventions and the `sounddevice` library. It provides: - - Device enumeration helpers. - Audio input capture into `rtc.AudioSource` with optional APM processing. - Audio output player that can feed APM reverse stream for AEC. From 72f546f1e9ff81e661113d09f01a21dd825f1ef0 Mon Sep 17 00:00:00 2001 From: David Chen Date: Thu, 25 Sep 2025 15:26:13 -0700 Subject: [PATCH 14/23] ruff format --- examples/local_audio/full_duplex.py | 2 +- examples/local_audio/publish_mic.py | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/examples/local_audio/full_duplex.py b/examples/local_audio/full_duplex.py index f6e188b1..718bd2f8 100644 --- a/examples/local_audio/full_duplex.py +++ b/examples/local_audio/full_duplex.py @@ -100,7 +100,7 @@ def on_participant_disconnected(participant: rtc.RemoteParticipant): logging.info("participant disconnected: %s", participant.identity) room.on("participant_disconnected", on_participant_disconnected) - + token = ( api.AccessToken(api_key, api_secret) .with_identity("local-audio") diff --git a/examples/local_audio/publish_mic.py b/examples/local_audio/publish_mic.py index 40fa7ed0..7a3a59f3 100644 --- a/examples/local_audio/publish_mic.py +++ b/examples/local_audio/publish_mic.py @@ -16,14 +16,16 @@ async def main() -> None: api_key = os.getenv("LIVEKIT_API_KEY") api_secret = os.getenv("LIVEKIT_API_SECRET") if not url or not api_key or not api_secret: - raise RuntimeError("LIVEKIT_URL and LIVEKIT_API_KEY and LIVEKIT_API_SECRET must be set in env") + raise RuntimeError( + "LIVEKIT_URL and LIVEKIT_API_KEY and LIVEKIT_API_SECRET must be set in env" + ) room = rtc.Room() # Create media devices helper and open default microphone with AEC enabled devices = rtc.MediaDevices() mic = devices.open_input(enable_aec=True) - + token = ( api.AccessToken(api_key, api_secret) .with_identity("local-audio") @@ -36,7 +38,7 @@ async def main() -> None: ) .to_jwt() ) - + try: await room.connect(url, token) logging.info("connected to room %s", room.name) From ef56542124f8ff118749ac3422edd17a19cd70d6 Mon Sep 17 00:00:00 2001 From: David Chen Date: Thu, 2 Oct 2025 15:31:52 -0700 Subject: [PATCH 15/23] clean up input stream creation --- livekit-rtc/livekit/rtc/audio_mixer.py | 2 +- livekit-rtc/livekit/rtc/media_devices.py | 38 ++++++------------------ 2 files changed, 10 insertions(+), 30 deletions(-) diff --git a/livekit-rtc/livekit/rtc/audio_mixer.py b/livekit-rtc/livekit/rtc/audio_mixer.py index aa3437bb..f0da446f 100644 --- a/livekit-rtc/livekit/rtc/audio_mixer.py +++ b/livekit-rtc/livekit/rtc/audio_mixer.py @@ -169,7 +169,7 @@ async def _mixer(self) -> None: await self._queue.put(None) async def _get_contribution( - self, stream: AsyncIterator[AudioFrame], buf: np.ndarray + self, stream: AsyncIterator[AudioFrame | AudioFrameEvent], buf: np.ndarray ) -> _Contribution: had_data = buf.shape[0] > 0 exhausted = False diff --git a/livekit-rtc/livekit/rtc/media_devices.py b/livekit-rtc/livekit/rtc/media_devices.py index dc88e779..be95e642 100644 --- a/livekit-rtc/livekit/rtc/media_devices.py +++ b/livekit-rtc/livekit/rtc/media_devices.py @@ -403,35 +403,15 @@ def _input_callback( channels_arg = 1 mapping = [int(input_channel_index) + 1] - # Build kwargs and conditionally include 'mapping' based on sounddevice version - stream_kwargs: dict[str, Any] = { - "callback": _input_callback, - "dtype": "int16", - "channels": channels_arg, - "device": input_device, - "samplerate": self._in_sr, - "blocksize": self._blocksize, - } - try: - init_params = inspect.signature(sd.InputStream.__init__).parameters - if "mapping" in init_params and mapping is not None: - stream_kwargs["mapping"] = mapping - elif mapping is not None: - logging.getLogger(__name__).warning( - "sounddevice.InputStream does not support 'mapping' in this version; " - "ignoring input_channel_index=%s", - input_channel_index, - ) - except Exception: - # If inspection fails for any reason, fall back without mapping - if mapping is not None: - logging.getLogger(__name__).warning( - "Unable to inspect sounddevice.InputStream.__init__; " - "ignoring input_channel_index=%s", - input_channel_index, - ) - - input_stream = sd.InputStream(**stream_kwargs) + input_stream = sd.InputStream( + callback=_input_callback, + dtype="int16", + channels=channels_arg, + device=input_device, + samplerate=self._in_sr, + blocksize=self._blocksize, + mapping=mapping, + ) input_stream.start() async def _pump() -> None: From 236fad1a1c440457cb0831bfcb8543d629040205 Mon Sep 17 00:00:00 2001 From: David Chen Date: Fri, 3 Oct 2025 15:08:01 -0700 Subject: [PATCH 16/23] add missing dep --- livekit-rtc/livekit/rtc/audio_mixer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/livekit-rtc/livekit/rtc/audio_mixer.py b/livekit-rtc/livekit/rtc/audio_mixer.py index f0da446f..8ffc4865 100644 --- a/livekit-rtc/livekit/rtc/audio_mixer.py +++ b/livekit-rtc/livekit/rtc/audio_mixer.py @@ -4,6 +4,7 @@ from dataclasses import dataclass from typing import AsyncIterator, Optional from .audio_frame import AudioFrame +from .audio_stream import AudioFrameEvent from .log import logger _Stream = AsyncIterator[AudioFrame] From 7cc6efb4f65fcb9c477b30e96509ede1097b7d67 Mon Sep 17 00:00:00 2001 From: David Chen Date: Fri, 3 Oct 2025 15:18:05 -0700 Subject: [PATCH 17/23] remove mapping --- livekit-rtc/livekit/rtc/media_devices.py | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/livekit-rtc/livekit/rtc/media_devices.py b/livekit-rtc/livekit/rtc/media_devices.py index be95e642..4e625638 100644 --- a/livekit-rtc/livekit/rtc/media_devices.py +++ b/livekit-rtc/livekit/rtc/media_devices.py @@ -395,22 +395,15 @@ def _input_callback( except Exception: pass - # If a specific device channel is requested, map to that channel only. - # sounddevice's channel mapping is 1-based (PortAudio convention). - mapping = None - channels_arg = self._channels - if input_channel_index is not None: - channels_arg = 1 - mapping = [int(input_channel_index) + 1] - + # Note: input_channel_index is currently not used as sounddevice mapping + # parameter is not supported in all versions. input_stream = sd.InputStream( callback=_input_callback, dtype="int16", - channels=channels_arg, + channels=self._channels, device=input_device, samplerate=self._in_sr, blocksize=self._blocksize, - mapping=mapping, ) input_stream.start() From 1ba7f9f7a00727912cd3a9d7d788c0d6595f583a Mon Sep 17 00:00:00 2001 From: David Chen Date: Fri, 3 Oct 2025 16:37:46 -0700 Subject: [PATCH 18/23] make apm internal --- examples/local_audio/full_duplex.py | 2 +- livekit-rtc/livekit/rtc/media_devices.py | 17 ++++++++++------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/examples/local_audio/full_duplex.py b/examples/local_audio/full_duplex.py index 718bd2f8..53716cec 100644 --- a/examples/local_audio/full_duplex.py +++ b/examples/local_audio/full_duplex.py @@ -24,7 +24,7 @@ async def main() -> None: # Open microphone with AEC and prepare a player for remote audio feeding AEC reverse stream mic = devices.open_input(enable_aec=True) - player = devices.open_output(apm_for_reverse=mic.apm) + player = devices.open_output() # Mixer for all remote audio streams mixer = rtc.AudioMixer(sample_rate=48000, num_channels=1) diff --git a/livekit-rtc/livekit/rtc/media_devices.py b/livekit-rtc/livekit/rtc/media_devices.py index 4e625638..1bb57555 100644 --- a/livekit-rtc/livekit/rtc/media_devices.py +++ b/livekit-rtc/livekit/rtc/media_devices.py @@ -262,6 +262,7 @@ def __init__( self._channels = num_channels self._blocksize = blocksize self._delay_estimator: Optional[_APMDelayEstimator] = None + self._apm: Optional[AudioProcessingModule] = None # Device enumeration def list_input_devices(self) -> list[dict[str, Any]]: @@ -314,9 +315,9 @@ def open_input( an `AudioProcessingModule` is created and applied to each frame before it is queued for `AudioSource.capture_frame`. - To enable AEC end-to-end, pass the returned `apm` to - `open_output(apm_for_reverse=...)` and route remote audio through - that player so reverse frames are provided to APM. + To enable AEC end-to-end, call `open_output()` after opening the input + device. The output player will automatically use the input's APM for + reverse stream processing, enabling echo cancellation. Args: enable_aec: Enable acoustic echo cancellation. @@ -344,8 +345,9 @@ def open_input( delay_estimator: Optional[_APMDelayEstimator] = ( _APMDelayEstimator() if apm is not None else None ) - # Store the shared estimator on the device helper so the output player can reuse it + # Store the shared estimator and APM on the device helper so the output player can reuse them self._delay_estimator = delay_estimator + self._apm = apm # Queue from callback to async task q: asyncio.Queue[AudioFrame] = asyncio.Queue(maxsize=queue_capacity) @@ -432,20 +434,21 @@ async def _pump() -> None: def open_output( self, *, - apm_for_reverse: Optional[AudioProcessingModule] = None, output_device: Optional[int] = None, ) -> OutputPlayer: """Create an `OutputPlayer` for rendering and (optionally) AEC reverse. + If an input device was opened with AEC enabled, the output player will + automatically feed the APM's reverse stream for echo cancellation. + Args: - apm_for_reverse: Pass the APM used by the audio input device to enable AEC. output_device: Optional output device index (default system device if None). """ return OutputPlayer( sample_rate=self._out_sr, num_channels=self._channels, blocksize=self._blocksize, - apm_for_reverse=apm_for_reverse, + apm_for_reverse=self._apm, output_device=output_device, delay_estimator=self._delay_estimator, ) From 7e0df4fc778b8c075b5cf8585aeab636b15f5005 Mon Sep 17 00:00:00 2001 From: David Chen Date: Sun, 5 Oct 2025 14:44:30 -0700 Subject: [PATCH 19/23] add db meter --- examples/local_audio/db_meter.py | 253 ++++++++++++++++++++++++++++ examples/local_audio/full_duplex.py | 66 +++++++- examples/local_audio/publish_mic.py | 37 ++++ 3 files changed, 351 insertions(+), 5 deletions(-) create mode 100644 examples/local_audio/db_meter.py diff --git a/examples/local_audio/db_meter.py b/examples/local_audio/db_meter.py new file mode 100644 index 00000000..b62fed29 --- /dev/null +++ b/examples/local_audio/db_meter.py @@ -0,0 +1,253 @@ +""" +Audio dB meter utilities for LiveKit Python SDK examples. + +This module provides functions to calculate and display audio levels in decibels (dB) +from raw audio samples, useful for monitoring microphone input and room audio levels. +""" + +import math +import time +from typing import List + +# dB meter configuration constants +DB_METER_UPDATE_INTERVAL_MS = 50 # Update every 50ms +MIC_METER_WIDTH = 25 # Width of the mic dB meter bar +ROOM_METER_WIDTH = 25 # Width of the room dB meter bar + + +def calculate_db_level(samples: List[int]) -> float: + """ + Calculate decibel level from audio samples. + + Args: + samples: List of 16-bit audio samples + + Returns: + dB level as float. Returns -60.0 for silence/empty samples. + """ + if not samples: + return -60.0 # Very quiet + + # Calculate RMS (Root Mean Square) + sum_squares = sum( + (sample / 32767.0) ** 2 # Normalize to -1.0 to 1.0 range + for sample in samples + ) + + rms = math.sqrt(sum_squares / len(samples)) + + # Convert to dB (20 * log10(rms)) + if rms > 0.0: + return 20.0 * math.log10(rms) + else: + return -60.0 # Very quiet + + +def get_meter_color(db_level: float, position_ratio: float) -> str: + """ + Get ANSI color code based on dB level and position in meter. + + Args: + db_level: Current dB level + position_ratio: Position in meter (0.0 to 1.0) + + Returns: + ANSI color code string + """ + # Determine color based on both dB level and position in the meter + if db_level > -6.0 and position_ratio > 0.85: + return "\x1b[91m" # Bright red - clipping/very loud + elif db_level > -12.0 and position_ratio > 0.7: + return "\x1b[31m" # Red - loud + elif db_level > -18.0 and position_ratio > 0.5: + return "\x1b[93m" # Bright yellow - medium-loud + elif db_level > -30.0 and position_ratio > 0.3: + return "\x1b[33m" # Yellow - medium + elif position_ratio > 0.1: + return "\x1b[92m" # Bright green - low-medium + else: + return "\x1b[32m" # Green - low + + +def format_single_meter(db_level: float, meter_width: int, meter_label: str) -> str: + """ + Format a single dB meter with colors. + + Args: + db_level: dB level to display + meter_width: Width of the meter bar in characters + meter_label: Label text for the meter + + Returns: + Formatted meter string with ANSI colors + """ + # ANSI color codes + COLOR_RESET = "\x1b[0m" + COLOR_DIM = "\x1b[2m" + + db_clamped = max(-60.0, min(0.0, db_level)) + normalized = (db_clamped + 60.0) / 60.0 # Normalize to 0.0-1.0 + filled_width = int(normalized * meter_width) + + meter = meter_label + + # Add the dB value with appropriate color + if db_level > -6.0: + db_color = "\x1b[91m" # Bright red + elif db_level > -12.0: + db_color = "\x1b[31m" # Red + elif db_level > -24.0: + db_color = "\x1b[33m" # Yellow + else: + db_color = "\x1b[32m" # Green + + meter += f"{db_color}{db_level:>7.1f}{COLOR_RESET} " + + # Add the visual meter with colors + meter += "[" + for i in range(meter_width): + position_ratio = i / meter_width + + if i < filled_width: + color = get_meter_color(db_level, position_ratio) + meter += f"{color}█{COLOR_RESET}" # Full block for active levels + else: + meter += f"{COLOR_DIM}░{COLOR_RESET}" # Light shade for empty + + meter += "]" + return meter + + +def format_dual_meters(mic_db: float, room_db: float) -> str: + """ + Format both dB meters on the same line. + + Args: + mic_db: Microphone dB level + room_db: Room audio dB level + + Returns: + Formatted dual meter string + """ + mic_meter = format_single_meter(mic_db, MIC_METER_WIDTH, "Mic: ") + room_meter = format_single_meter(room_db, ROOM_METER_WIDTH, " Room: ") + + return f"{mic_meter}{room_meter}" + + +def display_dual_db_meters(mic_db_receiver, room_db_receiver) -> None: + """ + Display dual dB meters continuously until interrupted. + + Args: + mic_db_receiver: Queue or receiver for microphone dB levels + room_db_receiver: Queue or receiver for room dB levels + """ + try: + last_update = time.time() + current_mic_db = -60.0 + current_room_db = -60.0 + first_display = True + + print() # Start on a new line + print("\x1b[92mAudio Levels Monitor\x1b[0m") + print("\x1b[2m────────────────────────────────────────────────────────────────────────────────\x1b[0m") + + while True: + # Check for new data (non-blocking) + try: + while True: # Drain all available data + mic_db = mic_db_receiver.get_nowait() + current_mic_db = mic_db + except: + pass # No more data available + + try: + while True: # Drain all available data + room_db = room_db_receiver.get_nowait() + current_room_db = room_db + except: + pass # No more data available + + # Update display at regular intervals + current_time = time.time() + if current_time - last_update >= DB_METER_UPDATE_INTERVAL_MS / 1000.0: + # Clear current line and display meters in place + print(f"\r\x1b[K{format_dual_meters(current_mic_db, current_room_db)}", end="", flush=True) + last_update = current_time + + # Small sleep to prevent busy waiting + time.sleep(0.01) + + except KeyboardInterrupt: + print() # Move to next line after Ctrl+C + + +def display_single_db_meter(db_receiver, label: str = "Mic Level: ") -> None: + """ + Display a single dB meter continuously until interrupted. + + Args: + db_receiver: Queue or receiver for dB levels + label: Label for the meter display + """ + try: + last_update = time.time() + current_db = -60.0 + first_display = True + + if first_display: + print() # Start on a new line + print(f"\x1b[92m{label}\x1b[0m") + print("\x1b[2m────────────────────────────────────────\x1b[0m") + first_display = False + + while True: + # Check for new data (non-blocking) + try: + while True: # Drain all available data + db_level = db_receiver.get_nowait() + current_db = db_level + except: + pass # No more data available + + # Update display at regular intervals + current_time = time.time() + if current_time - last_update >= DB_METER_UPDATE_INTERVAL_MS / 1000.0: + # Clear current line and display meter in place + meter = format_single_meter(current_db, 40, label) + print(f"\r\x1b[K{meter}", end="", flush=True) + last_update = current_time + + # Small sleep to prevent busy waiting + time.sleep(0.01) + + except KeyboardInterrupt: + print() # Move to next line after Ctrl+C + + +# Example usage and testing functions +def demo_db_meter() -> None: + """Demo function to test dB meter functionality.""" + import random + + # Simulate some test data + class MockReceiver: + def __init__(self): + self.data = [] + + def get_nowait(self): + if not self.data: + # Generate random dB value between -60 and 0 + self.data.append(random.uniform(-60, 0)) + return self.data.pop(0) + + mic_receiver = MockReceiver() + room_receiver = MockReceiver() + + print("Starting dB meter demo (Ctrl+C to stop)...") + display_dual_db_meters(mic_receiver, room_receiver) + + +if __name__ == "__main__": + demo_db_meter() diff --git a/examples/local_audio/full_duplex.py b/examples/local_audio/full_duplex.py index 53716cec..e2085367 100644 --- a/examples/local_audio/full_duplex.py +++ b/examples/local_audio/full_duplex.py @@ -1,9 +1,12 @@ import os import asyncio import logging +import threading +import queue from dotenv import load_dotenv, find_dotenv from livekit import api, rtc +from db_meter import calculate_db_level, display_dual_db_meters async def main() -> None: @@ -22,17 +25,22 @@ async def main() -> None: devices = rtc.MediaDevices() - # Open microphone with AEC and prepare a player for remote audio feeding AEC reverse stream - mic = devices.open_input(enable_aec=True) + # Open microphone & speaker + mic = devices.open_input() player = devices.open_output() # Mixer for all remote audio streams mixer = rtc.AudioMixer(sample_rate=48000, num_channels=1) + # dB level monitoring + mic_db_queue = queue.Queue() + room_db_queue = queue.Queue() + # Track stream bookkeeping for cleanup streams_by_pub: dict[str, rtc.AudioStream] = {} streams_by_participant: dict[str, set[rtc.AudioStream]] = {} - + + # remove stream from mixer and close it async def _remove_stream( stream: rtc.AudioStream, participant_sid: str | None = None, pub_sid: str | None = None ) -> None: @@ -125,8 +133,56 @@ def on_participant_disconnected(participant: rtc.RemoteParticipant): await room.local_participant.publish_track(track, pub_opts) logging.info("published local microphone") - # Start playing mixed remote audio - asyncio.create_task(player.play(mixer)) + # Start dB meter display in a separate thread + meter_thread = threading.Thread( + target=display_dual_db_meters, + args=(mic_db_queue, room_db_queue), + daemon=True + ) + meter_thread.start() + + # Create a monitoring wrapper for the mixer that calculates dB levels + # while passing frames through to the player + async def monitored_mixer(): + try: + async for frame in mixer: + # Calculate dB level for room audio + samples = list(frame.data) + db_level = calculate_db_level(samples) + try: + room_db_queue.put_nowait(db_level) + except queue.Full: + pass # Drop if queue is full + # Yield the frame for playback + yield frame + except Exception: + pass + + # Start playing mixed remote audio with monitoring + asyncio.create_task(player.play(monitored_mixer())) + + # Monitor microphone dB levels + async def monitor_mic_db(): + mic_stream = rtc.AudioStream( + track, sample_rate=48000, num_channels=1 + ) + try: + async for frame_event in mic_stream: + frame = frame_event.frame + # Convert frame data to list of samples + samples = list(frame.data) + db_level = calculate_db_level(samples) + # Update queue with latest value (non-blocking) + try: + mic_db_queue.put_nowait(db_level) + except queue.Full: + pass # Drop if queue is full + except Exception: + pass + finally: + await mic_stream.aclose() + + asyncio.create_task(monitor_mic_db()) # Run until Ctrl+C while True: diff --git a/examples/local_audio/publish_mic.py b/examples/local_audio/publish_mic.py index 7a3a59f3..ef9c9108 100644 --- a/examples/local_audio/publish_mic.py +++ b/examples/local_audio/publish_mic.py @@ -1,9 +1,12 @@ import os import asyncio import logging +import threading +import queue from dotenv import load_dotenv, find_dotenv from livekit import api, rtc +from db_meter import calculate_db_level, display_single_db_meter async def main() -> None: @@ -26,6 +29,9 @@ async def main() -> None: devices = rtc.MediaDevices() mic = devices.open_input(enable_aec=True) + # dB level monitoring + mic_db_queue = queue.Queue() + token = ( api.AccessToken(api_key, api_secret) .with_identity("local-audio") @@ -49,6 +55,37 @@ async def main() -> None: await room.local_participant.publish_track(track, pub_opts) logging.info("published local microphone") + # Start dB meter display in a separate thread + meter_thread = threading.Thread( + target=display_single_db_meter, + args=(mic_db_queue, "Mic: "), + daemon=True + ) + meter_thread.start() + + # Monitor microphone dB levels + async def monitor_mic_db(): + mic_stream = rtc.AudioStream( + track, sample_rate=48000, num_channels=1 + ) + try: + async for frame_event in mic_stream: + frame = frame_event.frame + # Convert frame data to list of samples + samples = list(frame.data) + db_level = calculate_db_level(samples) + # Update queue with latest value (non-blocking) + try: + mic_db_queue.put_nowait(db_level) + except queue.Full: + pass # Drop if queue is full + except Exception: + pass + finally: + await mic_stream.aclose() + + asyncio.create_task(monitor_mic_db()) + # Run until Ctrl+C while True: await asyncio.sleep(1) From 8458783086003eee50d96fb0ddcc61914f848165 Mon Sep 17 00:00:00 2001 From: David Chen Date: Sun, 5 Oct 2025 15:20:00 -0700 Subject: [PATCH 20/23] fix lint issues --- examples/local_audio/db_meter.py | 8 ++++---- livekit-rtc/livekit/rtc/media_devices.py | 2 -- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/examples/local_audio/db_meter.py b/examples/local_audio/db_meter.py index b62fed29..5a02834c 100644 --- a/examples/local_audio/db_meter.py +++ b/examples/local_audio/db_meter.py @@ -6,6 +6,7 @@ """ import math +import queue import time from typing import List @@ -147,7 +148,6 @@ def display_dual_db_meters(mic_db_receiver, room_db_receiver) -> None: last_update = time.time() current_mic_db = -60.0 current_room_db = -60.0 - first_display = True print() # Start on a new line print("\x1b[92mAudio Levels Monitor\x1b[0m") @@ -159,14 +159,14 @@ def display_dual_db_meters(mic_db_receiver, room_db_receiver) -> None: while True: # Drain all available data mic_db = mic_db_receiver.get_nowait() current_mic_db = mic_db - except: + except queue.Empty: pass # No more data available try: while True: # Drain all available data room_db = room_db_receiver.get_nowait() current_room_db = room_db - except: + except queue.Empty: pass # No more data available # Update display at regular intervals @@ -208,7 +208,7 @@ def display_single_db_meter(db_receiver, label: str = "Mic Level: ") -> None: while True: # Drain all available data db_level = db_receiver.get_nowait() current_db = db_level - except: + except queue.Empty: pass # No more data available # Update display at regular intervals diff --git a/livekit-rtc/livekit/rtc/media_devices.py b/livekit-rtc/livekit/rtc/media_devices.py index 1bb57555..0274b876 100644 --- a/livekit-rtc/livekit/rtc/media_devices.py +++ b/livekit-rtc/livekit/rtc/media_devices.py @@ -16,8 +16,6 @@ import asyncio from dataclasses import dataclass -import inspect -import logging from typing import Any, AsyncIterator, Optional import numpy as np From ca27e5f225ce233ddb05b2c55731447509854500 Mon Sep 17 00:00:00 2001 From: David Chen Date: Mon, 6 Oct 2025 21:19:43 -0700 Subject: [PATCH 21/23] display room name --- examples/local_audio/db_meter.py | 5 +++-- examples/local_audio/full_duplex.py | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/examples/local_audio/db_meter.py b/examples/local_audio/db_meter.py index 5a02834c..49d7d798 100644 --- a/examples/local_audio/db_meter.py +++ b/examples/local_audio/db_meter.py @@ -136,13 +136,14 @@ def format_dual_meters(mic_db: float, room_db: float) -> str: return f"{mic_meter}{room_meter}" -def display_dual_db_meters(mic_db_receiver, room_db_receiver) -> None: +def display_dual_db_meters(mic_db_receiver, room_db_receiver, room_name: str = "Audio Levels Monitor") -> None: """ Display dual dB meters continuously until interrupted. Args: mic_db_receiver: Queue or receiver for microphone dB levels room_db_receiver: Queue or receiver for room dB levels + room_name: Name of the room to display as the title """ try: last_update = time.time() @@ -150,7 +151,7 @@ def display_dual_db_meters(mic_db_receiver, room_db_receiver) -> None: current_room_db = -60.0 print() # Start on a new line - print("\x1b[92mAudio Levels Monitor\x1b[0m") + print(f"\x1b[92mRoom [{room_name}]\x1b[0m") print("\x1b[2m────────────────────────────────────────────────────────────────────────────────\x1b[0m") while True: diff --git a/examples/local_audio/full_duplex.py b/examples/local_audio/full_duplex.py index e2085367..5f437036 100644 --- a/examples/local_audio/full_duplex.py +++ b/examples/local_audio/full_duplex.py @@ -136,7 +136,7 @@ def on_participant_disconnected(participant: rtc.RemoteParticipant): # Start dB meter display in a separate thread meter_thread = threading.Thread( target=display_dual_db_meters, - args=(mic_db_queue, room_db_queue), + args=(mic_db_queue, room_db_queue, room.name), daemon=True ) meter_thread.start() From 846538f65b7194feeb3095dabdf408c97d6518d2 Mon Sep 17 00:00:00 2001 From: David Chen Date: Tue, 7 Oct 2025 15:40:11 -0700 Subject: [PATCH 22/23] move audio mixer inside of MediaDevices for ease of playback --- examples/local_audio/full_duplex.py | 93 ++------------ livekit-rtc/livekit/rtc/media_devices.py | 147 ++++++++++++++++++++--- 2 files changed, 140 insertions(+), 100 deletions(-) diff --git a/examples/local_audio/full_duplex.py b/examples/local_audio/full_duplex.py index 5f437036..fdffe209 100644 --- a/examples/local_audio/full_duplex.py +++ b/examples/local_audio/full_duplex.py @@ -6,7 +6,7 @@ from dotenv import load_dotenv, find_dotenv from livekit import api, rtc -from db_meter import calculate_db_level, display_dual_db_meters +from db_meter import calculate_db_level, display_single_db_meter async def main() -> None: @@ -29,35 +29,8 @@ async def main() -> None: mic = devices.open_input() player = devices.open_output() - # Mixer for all remote audio streams - mixer = rtc.AudioMixer(sample_rate=48000, num_channels=1) - - # dB level monitoring + # dB level monitoring (mic only) mic_db_queue = queue.Queue() - room_db_queue = queue.Queue() - - # Track stream bookkeeping for cleanup - streams_by_pub: dict[str, rtc.AudioStream] = {} - streams_by_participant: dict[str, set[rtc.AudioStream]] = {} - - # remove stream from mixer and close it - async def _remove_stream( - stream: rtc.AudioStream, participant_sid: str | None = None, pub_sid: str | None = None - ) -> None: - try: - mixer.remove_stream(stream) - except Exception: - pass - try: - await stream.aclose() - except Exception: - pass - if participant_sid and participant_sid in streams_by_participant: - streams_by_participant.get(participant_sid, set()).discard(stream) - if not streams_by_participant.get(participant_sid): - streams_by_participant.pop(participant_sid, None) - if pub_sid is not None: - streams_by_pub.pop(pub_sid, None) def on_track_subscribed( track: rtc.Track, @@ -65,10 +38,7 @@ def on_track_subscribed( participant: rtc.RemoteParticipant, ): if track.kind == rtc.TrackKind.KIND_AUDIO: - stream = rtc.AudioStream(track, sample_rate=48000, num_channels=1) - streams_by_pub[publication.sid] = stream - streams_by_participant.setdefault(participant.sid, set()).add(stream) - mixer.add_stream(stream) + player.add_track(track) logging.info("subscribed to audio from %s", participant.identity) room.on("track_subscribed", on_track_subscribed) @@ -78,37 +48,11 @@ def on_track_unsubscribed( publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant, ): - stream = streams_by_pub.get(publication.sid) - if stream is not None: - asyncio.create_task(_remove_stream(stream, participant.sid, publication.sid)) - logging.info("unsubscribed from audio of %s", participant.identity) + asyncio.create_task(player.remove_track(track)) + logging.info("unsubscribed from audio of %s", participant.identity) room.on("track_unsubscribed", on_track_unsubscribed) - def on_track_unpublished( - publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant - ): - stream = streams_by_pub.get(publication.sid) - if stream is not None: - asyncio.create_task(_remove_stream(stream, participant.sid, publication.sid)) - logging.info("track unpublished: %s from %s", publication.sid, participant.identity) - - room.on("track_unpublished", on_track_unpublished) - - def on_participant_disconnected(participant: rtc.RemoteParticipant): - streams = list(streams_by_participant.pop(participant.sid, set())) - for stream in streams: - # Best-effort discover publication sid - pub_sid = None - for k, v in list(streams_by_pub.items()): - if v is stream: - pub_sid = k - break - asyncio.create_task(_remove_stream(stream, participant.sid, pub_sid)) - logging.info("participant disconnected: %s", participant.identity) - - room.on("participant_disconnected", on_participant_disconnected) - token = ( api.AccessToken(api_key, api_secret) .with_identity("local-audio") @@ -135,31 +79,15 @@ def on_participant_disconnected(participant: rtc.RemoteParticipant): # Start dB meter display in a separate thread meter_thread = threading.Thread( - target=display_dual_db_meters, - args=(mic_db_queue, room_db_queue, room.name), + target=display_single_db_meter, + args=(mic_db_queue,), + kwargs={"label": "Mic Level: "}, daemon=True ) meter_thread.start() - # Create a monitoring wrapper for the mixer that calculates dB levels - # while passing frames through to the player - async def monitored_mixer(): - try: - async for frame in mixer: - # Calculate dB level for room audio - samples = list(frame.data) - db_level = calculate_db_level(samples) - try: - room_db_queue.put_nowait(db_level) - except queue.Full: - pass # Drop if queue is full - # Yield the frame for playback - yield frame - except Exception: - pass - - # Start playing mixed remote audio with monitoring - asyncio.create_task(player.play(monitored_mixer())) + # Start playing mixed remote audio (tracks added via event handlers) + await player.start() # Monitor microphone dB levels async def monitor_mic_db(): @@ -191,7 +119,6 @@ async def monitor_mic_db(): pass finally: await mic.aclose() - await mixer.aclose() await player.aclose() try: await room.disconnect() diff --git a/livekit-rtc/livekit/rtc/media_devices.py b/livekit-rtc/livekit/rtc/media_devices.py index 0274b876..1795177a 100644 --- a/livekit-rtc/livekit/rtc/media_devices.py +++ b/livekit-rtc/livekit/rtc/media_devices.py @@ -25,6 +25,9 @@ from . import AudioSource from .audio_frame import AudioFrame from .apm import AudioProcessingModule +from .audio_mixer import AudioMixer +from .audio_stream import AudioStream +from .track import Track """ Media device helpers built on top of the `sounddevice` library. @@ -121,6 +124,10 @@ class OutputPlayer: When `apm_for_reverse` is provided, this player will feed the same PCM it renders (in 10 ms frames) into the APM reverse path so that echo cancellation can correlate mic input with speaker output. + + The OutputPlayer includes an internal `AudioMixer` for convenient multi-track + playback. Use `add_track()` and `remove_track()` to dynamically manage tracks, + then call `start()` to begin playback. """ def __init__( @@ -142,6 +149,10 @@ def __init__( self._play_task: Optional[asyncio.Task] = None self._running = False self._delay_estimator = delay_estimator + + # Internal mixer for add_track/remove_track API + self._mixer: Optional[AudioMixer] = None + self._track_streams: dict[str, AudioStream] = {} # track.sid -> AudioStream def _callback(outdata: np.ndarray, frame_count: int, time_info: Any, status: Any) -> None: # Pull PCM int16 from buffer; zero if not enough @@ -197,31 +208,133 @@ def _callback(outdata: np.ndarray, frame_count: int, time_info: Any, status: Any blocksize=blocksize, ) - async def play(self, stream: AsyncIterator[AudioFrame]) -> None: - """Render an async iterator of `AudioFrame` to the output device. + def add_track(self, track: Track) -> None: + """Add an audio track to the internal mixer for playback. + + This creates an `AudioStream` from the track and adds it to the internal + mixer. The mixer is created lazily on first track addition. Call `start()` + to begin playback of all added tracks. - The raw PCM data is appended to an internal buffer consumed by the - realtime callback. If an APM was supplied, reverse frames are fed for AEC. + Args: + track: The audio track to add (typically from a remote participant). + + Raises: + ValueError: If the track is not an audio track or has already been added. """ - self._running = True - self._stream.start() - try: - async for frame in stream: - if not self._running: - break - # Append raw PCM bytes for callback consumption - self._buffer.extend(frame.data.tobytes()) - finally: - self._running = False + if track.sid in self._track_streams: + raise ValueError(f"Track {track.sid} already added to player") + + # Create mixer on first track addition + if self._mixer is None: + self._mixer = AudioMixer( + sample_rate=self._sample_rate, + num_channels=self._num_channels + ) + + # Create audio stream for this track + stream = AudioStream( + track, + sample_rate=self._sample_rate, + num_channels=self._num_channels + ) + + self._track_streams[track.sid] = stream + self._mixer.add_stream(stream) + + async def remove_track(self, track: Track) -> None: + """Remove an audio track from the internal mixer. + + This removes the track's stream from the mixer and closes it. + + Args: + track: The audio track to remove. + """ + stream = self._track_streams.pop(track.sid, None) + if stream is None: + return + + if self._mixer is not None: try: - self._stream.stop() - self._stream.close() + self._mixer.remove_stream(stream) except Exception: pass + + try: + await stream.aclose() + except Exception: + pass + + async def start(self) -> None: + """Start playback of all tracks in the internal mixer. + + This begins a background task that consumes frames from the internal mixer + and sends them to the output device. Tracks can be added or removed + dynamically using `add_track()` and `remove_track()`. + + Raises: + RuntimeError: If playback is already started or no mixer is available. + """ + if self._play_task is not None and not self._play_task.done(): + raise RuntimeError("Playback already started") + + if self._mixer is None: + self._mixer = AudioMixer( + sample_rate=self._sample_rate, + num_channels=self._num_channels + ) + + async def _playback_loop(): + """Internal playback loop that consumes frames from the mixer.""" + self._running = True + self._stream.start() + try: + async for frame in self._mixer: + if not self._running: + break + # Append raw PCM bytes for callback consumption + self._buffer.extend(frame.data.tobytes()) + finally: + self._running = False + try: + self._stream.stop() + self._stream.close() + except Exception: + pass + + self._play_task = asyncio.create_task(_playback_loop()) async def aclose(self) -> None: - """Stop playback and close the output stream.""" + """Stop playback and close the output stream. + + This also cleans up all added tracks and the internal mixer. + """ self._running = False + + # Cancel playback task if running + if self._play_task is not None and not self._play_task.done(): + self._play_task.cancel() + try: + await self._play_task + except asyncio.CancelledError: + pass + + # Clean up all track streams + for stream in list(self._track_streams.values()): + try: + await stream.aclose() + except Exception: + pass + self._track_streams.clear() + + # Close mixer + if self._mixer is not None: + try: + await self._mixer.aclose() + except Exception: + pass + self._mixer = None + + # Close output stream try: self._stream.stop() self._stream.close() From 58483ac9a33c6ec0098ad9cab053a8b98fde3143 Mon Sep 17 00:00:00 2001 From: David Chen Date: Tue, 7 Oct 2025 15:51:17 -0700 Subject: [PATCH 23/23] remove unused import --- livekit-rtc/livekit/rtc/media_devices.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/livekit-rtc/livekit/rtc/media_devices.py b/livekit-rtc/livekit/rtc/media_devices.py index 1795177a..819dbe6a 100644 --- a/livekit-rtc/livekit/rtc/media_devices.py +++ b/livekit-rtc/livekit/rtc/media_devices.py @@ -16,7 +16,7 @@ import asyncio from dataclasses import dataclass -from typing import Any, AsyncIterator, Optional +from typing import Any, Optional import numpy as np import sounddevice as sd