Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 43 additions & 3 deletions livekit-agents/livekit/agents/voice/room_io/_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(
room: rtc.Room,
*,
track_source: rtc.TrackSource.ValueType | list[rtc.TrackSource.ValueType],
processor: rtc.FrameProcessor[T] | None = None,
) -> None:
self._room = room
self._accepted_sources = (
Expand All @@ -48,6 +49,9 @@ def __init__(

self._room.on("track_subscribed", self._on_track_available)
self._room.on("track_unpublished", self._on_track_unavailable)
self._room.on("token_refreshed", self._on_token_refreshed)

self._processor = processor

async def __anext__(self) -> T:
return await self._data_ch.__anext__()
Expand Down Expand Up @@ -122,6 +126,8 @@ async def aclose(self) -> None:

self._room.off("track_subscribed", self._on_track_available)
self._data_ch.close()
if self._processor:
self._processor._close()

@log_exceptions(logger=logger)
async def _forward_task(
Expand Down Expand Up @@ -174,6 +180,16 @@ def _on_track_available(
self._close_stream()
self._stream = self._create_stream(track)
self._publication = publication
if self._processor:
self._processor._update_stream_info(
room_name=self._room.name,
participant_identity=participant.identity,
publication_sid=publication.sid,
)
if self._room._token is not None and self._room._server_url is not None:
self._processor._update_credentials(
token=self._room._token, url=self._room._server_url
)
self._forward_atask = asyncio.create_task(
self._forward_task(self._forward_atask, self._stream, publication, participant)
)
Expand All @@ -198,6 +214,14 @@ def _on_track_unavailable(
if self._on_track_available(publication.track, publication, participant):
return

def _on_token_refreshed(self) -> None:
if (
self._processor is not None
and self._room._token is not None
and self._room._server_url is not None
):
self._processor._update_credentials(token=self._room._token, url=self._room._server_url)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think the livekit-rtc package should be responsible for directly updating the credentials?



class _ParticipantAudioInputStream(_ParticipantInputStream[rtc.AudioFrame], AudioInput):
def __init__(
Expand All @@ -206,11 +230,20 @@ def __init__(
*,
sample_rate: int,
num_channels: int,
noise_cancellation: rtc.NoiseCancellationOptions | None,
noise_cancellation: rtc.NoiseCancellationOptions
| rtc.FrameProcessor[rtc.AudioFrame]
| None,
pre_connect_audio_handler: PreConnectAudioHandler | None,
) -> None:
audio_processor: rtc.FrameProcessor[rtc.AudioFrame] | None = None
if isinstance(noise_cancellation, rtc.FrameProcessor):
audio_processor = noise_cancellation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

python sdk already handles the noise_cancellation if is a FrameProcessor in https://github.com/livekit/python-sdks/pull/533/files#diff-7e2485e8b1480ff233d810a2d222fe14cb1432d0837ce28a6e6182ae8b2422b6R112-R113, here it's processed duplicately?

Copy link
Contributor Author

@lukasIO lukasIO Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry, you're right, I meant to delete the processor.process call within the input handler and forgot.
I'll update it right away, thanks for catching that!

The original intention:
The processor is forwarded directly to the audio stream.
The audio stream however does not have any participant and/or room context.
In order to hook into room+participant+publication info, the processor is passed to the input handler additionally which allows the stream metadata to be updated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed in f9e324c

Does that look good to you now?


_ParticipantInputStream.__init__(
self, room=room, track_source=rtc.TrackSource.SOURCE_MICROPHONE
self,
room=room,
track_source=rtc.TrackSource.SOURCE_MICROPHONE,
processor=audio_processor,
)
AudioInput.__init__(self, label="RoomIO")
self._sample_rate = sample_rate
Expand Down Expand Up @@ -251,7 +284,7 @@ async def _forward_task(
try:
duration: float = 0
frames = await self._pre_connect_audio_handler.wait_for_data(publication.track.sid)
for frame in self._resample_frames(frames):
for frame in self._resample_frames(self._apply_audio_processor(frames)):
if self._attached:
await self._data_ch.send(frame)
duration += frame.duration
Expand Down Expand Up @@ -305,6 +338,13 @@ def _resample_frames(self, frames: Iterable[rtc.AudioFrame]) -> Iterable[rtc.Aud
if resampler:
yield from resampler.flush()

def _apply_audio_processor(self, frames: Iterable[rtc.AudioFrame]) -> Iterable[rtc.AudioFrame]:
for frame in frames:
if self._processor is not None:
yield self._processor._process(frame)
else:
yield frame


class _ParticipantVideoInputStream(_ParticipantInputStream[rtc.VideoFrame], VideoInput):
def __init__(self, room: rtc.Room) -> None:
Expand Down