Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions livekit-rtc/livekit/rtc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
ByteStreamWriter,
ByteStreamReader,
)
from .frame_processor import FrameProcessor

__all__ = [
"ConnectionQuality",
Expand Down Expand Up @@ -184,6 +185,7 @@
"ByteStreamReader",
"ByteStreamWriter",
"AudioProcessingModule",
"FrameProcessor",
"__version__",
]

Expand Down
18 changes: 12 additions & 6 deletions livekit-rtc/livekit/rtc/audio_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from .audio_frame import AudioFrame
from .participant import Participant
from .track import Track
from .frame_processor import FrameProcessor


@dataclass
Expand Down Expand Up @@ -62,7 +63,7 @@ def __init__(
sample_rate: int = 48000,
num_channels: int = 1,
frame_size_ms: int | None = None,
noise_cancellation: Optional[NoiseCancellationOptions] = None,
noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have another field name than noise_cancellation?
Like just processors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I went with reusing this for now is that the first official processor we'll have will also be noise cancellation, so I thought it might be confusing otherwise.

I think we can still transition away (deprecate) noise_cancellation lateron in favour of something more generic

**kwargs,
) -> None:
"""Initialize an `AudioStream` instance.
Expand All @@ -76,8 +77,8 @@ def __init__(
sample_rate (int, optional): The sample rate for the audio stream in Hz.
Defaults to 48000.
num_channels (int, optional): The number of audio channels. Defaults to 1.
noise_cancellation (Optional[NoiseCancellationOptions], optional):
If noise cancellation is used, pass a `NoiseCancellationOptions` instance
noise_cancellation (Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]], optional):
If noise cancellation is used, pass a `NoiseCancellationOptions` or `FrameProcessor[AudioFrame]` instance
created by the noise cancellation module.

Example:
Expand Down Expand Up @@ -105,9 +106,12 @@ def __init__(

self._audio_filter_module = None
self._audio_filter_options = None
if noise_cancellation is not None:
if isinstance(noise_cancellation, NoiseCancellationOptions):
self._audio_filter_module = noise_cancellation.module_id
self._audio_filter_options = noise_cancellation.options
elif isinstance(noise_cancellation, FrameProcessor):
self._processor = noise_cancellation

self._task = self._loop.create_task(self._run())
self._task.add_done_callback(task_done_logger)

Expand All @@ -132,7 +136,7 @@ def from_participant(
sample_rate: int = 48000,
num_channels: int = 1,
frame_size_ms: int | None = None,
noise_cancellation: Optional[NoiseCancellationOptions] = None,
noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None,
) -> AudioStream:
"""Create an `AudioStream` from a participant's audio track.

Expand Down Expand Up @@ -182,7 +186,7 @@ def from_track(
sample_rate: int = 48000,
num_channels: int = 1,
frame_size_ms: int | None = None,
noise_cancellation: Optional[NoiseCancellationOptions] = None,
noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None,
) -> AudioStream:
"""Create an `AudioStream` from an existing audio track.

Expand Down Expand Up @@ -268,6 +272,8 @@ async def _run(self):
if audio_event.HasField("frame_received"):
owned_buffer_info = audio_event.frame_received.frame
frame = AudioFrame._from_owned_info(owned_buffer_info)
if self._processor is not None:
frame = self._processor._process(frame)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

neat.. this is clean

event = AudioFrameEvent(frame)
self._queue.put(event)
elif audio_event.HasField("eos"):
Expand Down
34 changes: 34 additions & 0 deletions livekit-rtc/livekit/rtc/frame_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from abc import ABC, abstractmethod
from typing import Generic, TypeVar, Union
from .audio_frame import AudioFrame
from .video_frame import VideoFrame


T = TypeVar("T", bound=Union[AudioFrame, VideoFrame])


class FrameProcessor(Generic[T], ABC):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (non-blocking): This might be over abstracting, but we could make this more generalizable by defining an AuthenticatedFrameProcessor interface which inherits from a more general FrameProcessor one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true, we can always go the other way around later with an Unauthenticated... (not that name, but something better) that stubs out the setCredentials call.

@property
@abstractmethod
def is_enabled(self) -> bool: ...

@abstractmethod
def set_enabled(self, enable: bool): ...

@abstractmethod
def _update_stream_info(
self,
*,
room_name: str,
participant_identity: str,
publication_sid: str,
): ...

@abstractmethod
def _update_credentials(self, *, token: str, url: str): ...

@abstractmethod
def _process(self, frame: T) -> T: ...

@abstractmethod
def _close(self): ...
Comment on lines +19 to +34
Copy link
Member

@theomonnom theomonnom Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need those methods (e.g: _update_credentials) ? Seems like an implementation detail?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking this interface could be used by anybody, more like a general purpose API

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the underlying Rust implementation would need to make calls to _update_credentials when tokens have refreshed.. this is the same thread as jacob's comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking this interface could be used by anybody, more like a general purpose API

yeah, that would be good.

The methods defined right now is the minimal API we need to make authenticated processors work without tying them too deeply to the RTC package.
Apart from that it's pretty much general purpose already.

Loading