From 60ce71fa6b3d19742e3232cfd2678e0f144af0b0 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 2 Dec 2025 08:39:16 +0100 Subject: [PATCH 1/5] Add frame processor support for audio streams --- livekit-rtc/livekit/rtc/audio_stream.py | 16 ++++++++--- livekit-rtc/livekit/rtc/frame_processor.py | 33 ++++++++++++++++++++++ 2 files changed, 45 insertions(+), 4 deletions(-) create mode 100644 livekit-rtc/livekit/rtc/frame_processor.py diff --git a/livekit-rtc/livekit/rtc/audio_stream.py b/livekit-rtc/livekit/rtc/audio_stream.py index b33e668f..dc56c504 100644 --- a/livekit-rtc/livekit/rtc/audio_stream.py +++ b/livekit-rtc/livekit/rtc/audio_stream.py @@ -27,6 +27,7 @@ from .audio_frame import AudioFrame from .participant import Participant from .track import Track +from .frame_processor import SyncFrameProcessor @dataclass @@ -62,7 +63,9 @@ def __init__( sample_rate: int = 48000, num_channels: int = 1, frame_size_ms: int | None = None, - noise_cancellation: Optional[NoiseCancellationOptions] = None, + noise_cancellation: Optional[ + NoiseCancellationOptions | SyncFrameProcessor[AudioFrame] + ] = None, **kwargs, ) -> None: """Initialize an `AudioStream` instance. @@ -76,8 +79,8 @@ def __init__( sample_rate (int, optional): The sample rate for the audio stream in Hz. Defaults to 48000. num_channels (int, optional): The number of audio channels. Defaults to 1. - noise_cancellation (Optional[NoiseCancellationOptions], optional): - If noise cancellation is used, pass a `NoiseCancellationOptions` instance + noise_cancellation (Optional[NoiseCancellationOptions | SyncFrameProcessor[AudioFrame]], optional): + If noise cancellation is used, pass a `NoiseCancellationOptions` or `SyncFrameProcessor[AudioFrame]` instance created by the noise cancellation module. Example: @@ -105,9 +108,12 @@ def __init__( self._audio_filter_module = None self._audio_filter_options = None - if noise_cancellation is not None: + if isinstance(noise_cancellation, NoiseCancellationOptions): self._audio_filter_module = noise_cancellation.module_id self._audio_filter_options = noise_cancellation.options + elif isinstance(noise_cancellation, SyncFrameProcessor): + self._processor = noise_cancellation + self._task = self._loop.create_task(self._run()) self._task.add_done_callback(task_done_logger) @@ -268,6 +274,8 @@ async def _run(self): if audio_event.HasField("frame_received"): owned_buffer_info = audio_event.frame_received.frame frame = AudioFrame._from_owned_info(owned_buffer_info) + if self._processor is not None: + frame = self._processor._process(frame) event = AudioFrameEvent(frame) self._queue.put(event) elif audio_event.HasField("eos"): diff --git a/livekit-rtc/livekit/rtc/frame_processor.py b/livekit-rtc/livekit/rtc/frame_processor.py new file mode 100644 index 00000000..3fa3bac8 --- /dev/null +++ b/livekit-rtc/livekit/rtc/frame_processor.py @@ -0,0 +1,33 @@ +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, Generic, TypeVar, Union + +if TYPE_CHECKING: + from .audio_frame import AudioFrame + from .video_frame import VideoFrame + + +T = TypeVar("T", bound=Union[AudioFrame, VideoFrame]) + + +class SyncFrameProcessor(Generic[T], ABC): + @property + @abstractmethod + def is_enabled(self) -> bool: ... + + @abstractmethod + def set_enabled(self, enable: bool): ... + + @abstractmethod + def _set_context( + self, + *, + room_name: str, + participant_identity: str, + publication_sid: str, + ): ... + + @abstractmethod + def _process(self, frame: T) -> T: ... + + @abstractmethod + def _close(self): ... From 0d378f2e9a2dd3dccd18d635c610c5d260e28758 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 2 Dec 2025 08:42:18 +0100 Subject: [PATCH 2/5] rename and export --- livekit-rtc/livekit/rtc/__init__.py | 2 ++ livekit-rtc/livekit/rtc/audio_stream.py | 12 +++++------- livekit-rtc/livekit/rtc/frame_processor.py | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/livekit-rtc/livekit/rtc/__init__.py b/livekit-rtc/livekit/rtc/__init__.py index 565b8882..f641bfd9 100644 --- a/livekit-rtc/livekit/rtc/__init__.py +++ b/livekit-rtc/livekit/rtc/__init__.py @@ -108,6 +108,7 @@ ByteStreamWriter, ByteStreamReader, ) +from .frame_processor import FrameProcessor __all__ = [ "ConnectionQuality", @@ -184,6 +185,7 @@ "ByteStreamReader", "ByteStreamWriter", "AudioProcessingModule", + "FrameProcessor", "__version__", ] diff --git a/livekit-rtc/livekit/rtc/audio_stream.py b/livekit-rtc/livekit/rtc/audio_stream.py index dc56c504..92924e5b 100644 --- a/livekit-rtc/livekit/rtc/audio_stream.py +++ b/livekit-rtc/livekit/rtc/audio_stream.py @@ -27,7 +27,7 @@ from .audio_frame import AudioFrame from .participant import Participant from .track import Track -from .frame_processor import SyncFrameProcessor +from .frame_processor import FrameProcessor @dataclass @@ -63,9 +63,7 @@ def __init__( sample_rate: int = 48000, num_channels: int = 1, frame_size_ms: int | None = None, - noise_cancellation: Optional[ - NoiseCancellationOptions | SyncFrameProcessor[AudioFrame] - ] = None, + noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None, **kwargs, ) -> None: """Initialize an `AudioStream` instance. @@ -79,8 +77,8 @@ def __init__( sample_rate (int, optional): The sample rate for the audio stream in Hz. Defaults to 48000. num_channels (int, optional): The number of audio channels. Defaults to 1. - noise_cancellation (Optional[NoiseCancellationOptions | SyncFrameProcessor[AudioFrame]], optional): - If noise cancellation is used, pass a `NoiseCancellationOptions` or `SyncFrameProcessor[AudioFrame]` instance + noise_cancellation (Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]], optional): + If noise cancellation is used, pass a `NoiseCancellationOptions` or `FrameProcessor[AudioFrame]` instance created by the noise cancellation module. Example: @@ -111,7 +109,7 @@ def __init__( if isinstance(noise_cancellation, NoiseCancellationOptions): self._audio_filter_module = noise_cancellation.module_id self._audio_filter_options = noise_cancellation.options - elif isinstance(noise_cancellation, SyncFrameProcessor): + elif isinstance(noise_cancellation, FrameProcessor): self._processor = noise_cancellation self._task = self._loop.create_task(self._run()) diff --git a/livekit-rtc/livekit/rtc/frame_processor.py b/livekit-rtc/livekit/rtc/frame_processor.py index 3fa3bac8..24eb164f 100644 --- a/livekit-rtc/livekit/rtc/frame_processor.py +++ b/livekit-rtc/livekit/rtc/frame_processor.py @@ -9,7 +9,7 @@ T = TypeVar("T", bound=Union[AudioFrame, VideoFrame]) -class SyncFrameProcessor(Generic[T], ABC): +class FrameProcessor(Generic[T], ABC): @property @abstractmethod def is_enabled(self) -> bool: ... From d786dbdf5cf7eed5586c21e4e90aa8a5d4fe2f97 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 2 Dec 2025 08:45:06 +0100 Subject: [PATCH 3/5] fix import --- livekit-rtc/livekit/rtc/frame_processor.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/livekit-rtc/livekit/rtc/frame_processor.py b/livekit-rtc/livekit/rtc/frame_processor.py index 24eb164f..6172a76e 100644 --- a/livekit-rtc/livekit/rtc/frame_processor.py +++ b/livekit-rtc/livekit/rtc/frame_processor.py @@ -1,9 +1,7 @@ from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, Generic, TypeVar, Union - -if TYPE_CHECKING: - from .audio_frame import AudioFrame - from .video_frame import VideoFrame +from typing import Generic, TypeVar, Union +from .audio_frame import AudioFrame +from .video_frame import VideoFrame T = TypeVar("T", bound=Union[AudioFrame, VideoFrame]) From cb8745c0ee8a15d7acf5eafdbc30dfab2bf328c2 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 2 Dec 2025 09:37:32 +0100 Subject: [PATCH 4/5] add method for credential updates --- livekit-rtc/livekit/rtc/frame_processor.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/livekit-rtc/livekit/rtc/frame_processor.py b/livekit-rtc/livekit/rtc/frame_processor.py index 6172a76e..8704e82f 100644 --- a/livekit-rtc/livekit/rtc/frame_processor.py +++ b/livekit-rtc/livekit/rtc/frame_processor.py @@ -16,7 +16,7 @@ def is_enabled(self) -> bool: ... def set_enabled(self, enable: bool): ... @abstractmethod - def _set_context( + def _update_stream_info( self, *, room_name: str, @@ -24,6 +24,9 @@ def _set_context( publication_sid: str, ): ... + @abstractmethod + def _update_credentials(self, *, token: str, url: str): ... + @abstractmethod def _process(self, frame: T) -> T: ... From 68378882f8916131906d46f97765b49d8b15d387 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 2 Dec 2025 10:23:52 +0100 Subject: [PATCH 5/5] fix factory types --- livekit-rtc/livekit/rtc/audio_stream.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/livekit-rtc/livekit/rtc/audio_stream.py b/livekit-rtc/livekit/rtc/audio_stream.py index 92924e5b..b2e8bbaa 100644 --- a/livekit-rtc/livekit/rtc/audio_stream.py +++ b/livekit-rtc/livekit/rtc/audio_stream.py @@ -136,7 +136,7 @@ def from_participant( sample_rate: int = 48000, num_channels: int = 1, frame_size_ms: int | None = None, - noise_cancellation: Optional[NoiseCancellationOptions] = None, + noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None, ) -> AudioStream: """Create an `AudioStream` from a participant's audio track. @@ -186,7 +186,7 @@ def from_track( sample_rate: int = 48000, num_channels: int = 1, frame_size_ms: int | None = None, - noise_cancellation: Optional[NoiseCancellationOptions] = None, + noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None, ) -> AudioStream: """Create an `AudioStream` from an existing audio track.