Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 147 additions & 0 deletions examples/local_audio/full_duplex.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import os
import asyncio
import logging
from dotenv import load_dotenv, find_dotenv

from livekit import api, rtc


async def main() -> None:
logging.basicConfig(level=logging.INFO)

# Load environment variables from a .env file if present
load_dotenv(find_dotenv())

url = os.getenv("LIVEKIT_URL")
api_key = os.getenv("LIVEKIT_API_KEY")
api_secret = os.getenv("LIVEKIT_API_SECRET")
if not url or not api_key or not api_secret:
raise RuntimeError("LIVEKIT_URL and LIVEKIT_TOKEN must be set in env")

room = rtc.Room()

devices = rtc.MediaDevices()

# Open microphone with AEC; output will auto-wire reverse stream for AEC
mic = devices.open_input(enable_aec=True)
player = devices.open_output()

# Mixer for all remote audio streams
mixer = rtc.AudioMixer(sample_rate=48000, num_channels=1)

# Track stream bookkeeping for cleanup
streams_by_pub: dict[str, rtc.AudioStream] = {}
streams_by_participant: dict[str, set[rtc.AudioStream]] = {}

async def _remove_stream(
stream: rtc.AudioStream, participant_sid: str | None = None, pub_sid: str | None = None
) -> None:
try:
mixer.remove_stream(stream)
except Exception:
pass
try:
await stream.aclose()
except Exception:
pass
if participant_sid and participant_sid in streams_by_participant:
streams_by_participant.get(participant_sid, set()).discard(stream)
if not streams_by_participant.get(participant_sid):
streams_by_participant.pop(participant_sid, None)
if pub_sid is not None:
streams_by_pub.pop(pub_sid, None)

def on_track_subscribed(
track: rtc.Track,
publication: rtc.RemoteTrackPublication,
participant: rtc.RemoteParticipant,
):
if track.kind == rtc.TrackKind.KIND_AUDIO:
stream = rtc.AudioStream(track, sample_rate=48000, num_channels=1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should abstract further down, and directly allow to add a AudioTrack

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean we should add a add_track and remove_track method to the AudioMixer class?

streams_by_pub[publication.sid] = stream
streams_by_participant.setdefault(participant.sid, set()).add(stream)
mixer.add_stream(stream)
logging.info("subscribed to audio from %s", participant.identity)

room.on("track_subscribed", on_track_subscribed)

def on_track_unsubscribed(
track: rtc.Track,
publication: rtc.RemoteTrackPublication,
participant: rtc.RemoteParticipant,
):
stream = streams_by_pub.get(publication.sid)
if stream is not None:
asyncio.create_task(_remove_stream(stream, participant.sid, publication.sid))
logging.info("unsubscribed from audio of %s", participant.identity)

room.on("track_unsubscribed", on_track_unsubscribed)

def on_track_unpublished(
publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant
):
stream = streams_by_pub.get(publication.sid)
if stream is not None:
asyncio.create_task(_remove_stream(stream, participant.sid, publication.sid))
logging.info("track unpublished: %s from %s", publication.sid, participant.identity)

room.on("track_unpublished", on_track_unpublished)

def on_participant_disconnected(participant: rtc.RemoteParticipant):
streams = list(streams_by_participant.pop(participant.sid, set()))
for stream in streams:
# Best-effort discover publication sid
pub_sid = None
for k, v in list(streams_by_pub.items()):
if v is stream:
pub_sid = k
break
asyncio.create_task(_remove_stream(stream, participant.sid, pub_sid))
logging.info("participant disconnected: %s", participant.identity)

room.on("participant_disconnected", on_participant_disconnected)

token = (
api.AccessToken(api_key, api_secret)
.with_identity("local-audio")
.with_name("Local Audio")
.with_grants(
api.VideoGrants(
room_join=True,
room="local-audio",
)
)
.to_jwt()
)

try:
await room.connect(url, token)
logging.info("connected to room %s", room.name)

# Publish microphone
track = rtc.LocalAudioTrack.create_audio_track("mic", mic.source)
pub_opts = rtc.TrackPublishOptions()
pub_opts.source = rtc.TrackSource.SOURCE_MICROPHONE
await room.local_participant.publish_track(track, pub_opts)
logging.info("published local microphone")

# Start playing mixed remote audio
asyncio.create_task(player.play(mixer))

# Run until Ctrl+C
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
pass
finally:
await mic.aclose()
await mixer.aclose()
await player.aclose()
try:
await room.disconnect()
except Exception:
pass


if __name__ == "__main__":
asyncio.run(main())
66 changes: 66 additions & 0 deletions examples/local_audio/publish_mic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import os
import asyncio
import logging
from dotenv import load_dotenv, find_dotenv

from livekit import api, rtc


async def main() -> None:
logging.basicConfig(level=logging.INFO)

# Load environment variables from a .env file if present
load_dotenv(find_dotenv())

url = os.getenv("LIVEKIT_URL")
api_key = os.getenv("LIVEKIT_API_KEY")
api_secret = os.getenv("LIVEKIT_API_SECRET")
if not url or not api_key or not api_secret:
raise RuntimeError(
"LIVEKIT_URL and LIVEKIT_API_KEY and LIVEKIT_API_SECRET must be set in env"
)

room = rtc.Room()

# Create media devices helper and open default microphone with AEC enabled
devices = rtc.MediaDevices()
mic = devices.open_input(enable_aec=True)

token = (
api.AccessToken(api_key, api_secret)
.with_identity("local-audio")
.with_name("Local Audio")
.with_grants(
api.VideoGrants(
room_join=True,
room="local-audio",
)
)
.to_jwt()
)

try:
await room.connect(url, token)
logging.info("connected to room %s", room.name)

track = rtc.LocalAudioTrack.create_audio_track("mic", mic.source)
pub_opts = rtc.TrackPublishOptions()
pub_opts.source = rtc.TrackSource.SOURCE_MICROPHONE
await room.local_participant.publish_track(track, pub_opts)
logging.info("published local microphone")

# Run until Ctrl+C
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
pass
finally:
await mic.aclose()
try:
await room.disconnect()
except Exception:
pass


if __name__ == "__main__":
asyncio.run(main())
11 changes: 11 additions & 0 deletions livekit-rtc/livekit/rtc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@
from .audio_resampler import AudioResampler, AudioResamplerQuality
from .audio_mixer import AudioMixer
from .apm import AudioProcessingModule

try:
from .media_devices import MediaDevices as MediaDevices

_HAS_MEDIA_DEVICES = True
except Exception: # pragma: no cover - optional dependency (sounddevice)
_HAS_MEDIA_DEVICES = False
from .utils import combine_audio_frames
from .rpc import RpcError, RpcInvocationData
from .synchronizer import AVSynchronizer
Expand Down Expand Up @@ -179,3 +186,7 @@
"AudioProcessingModule",
"__version__",
]

# add MediaDevices if available
if _HAS_MEDIA_DEVICES:
__all__.append("MediaDevices")
6 changes: 5 additions & 1 deletion livekit-rtc/livekit/rtc/audio_mixer.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@
await self._queue.put(None)

async def _get_contribution(
self, stream: AsyncIterator[AudioFrame], buf: np.ndarray
self, stream: AsyncIterator[AudioFrame | AudioFrameEvent], buf: np.ndarray

Check failure on line 172 in livekit-rtc/livekit/rtc/audio_mixer.py

View workflow job for this annotation

GitHub Actions / build

Ruff (F821)

livekit-rtc/livekit/rtc/audio_mixer.py:172:50: F821 Undefined name `AudioFrameEvent`
) -> _Contribution:
had_data = buf.shape[0] > 0
exhausted = False
Expand All @@ -184,6 +184,10 @@
except StopAsyncIteration:
exhausted = True
break
# AudioStream may yield either AudioFrame or AudioFrameEvent; unwrap if needed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The correct typing should be:

self, stream: AsyncIterator[AudioFrame | AudioFrameEvent], buf: np.ndarray

if hasattr(frame, "frame"):
frame = frame.frame # type: ignore[assignment]

new_data = np.frombuffer(frame.data.tobytes(), dtype=np.int16).reshape(
-1, self._num_channels
)
Expand Down
Loading
Loading