From b96e00ec60863f5d86d33d130e04822b9aedc63e Mon Sep 17 00:00:00 2001 From: BuffMcBigHuge Date: Tue, 17 Feb 2026 17:39:31 -0500 Subject: [PATCH 1/9] Audio with NDI, audio buffer in frame loop, added audio track and media clock in webrtc. Signed-off-by: BuffMcBigHuge --- src/scope/core/ndi/lib.py | 6 + src/scope/core/outputs/ndi.py | 58 ++++++++- src/scope/server/frame_processor.py | 156 +++++++++++++++++++++++++ src/scope/server/media_clock.py | 61 ++++++++++ src/scope/server/pipeline_processor.py | 35 ++++++ src/scope/server/tracks.py | 78 ++++++++++++- src/scope/server/webrtc.py | 27 ++++- 7 files changed, 415 insertions(+), 6 deletions(-) create mode 100644 src/scope/server/media_clock.py diff --git a/src/scope/core/ndi/lib.py b/src/scope/core/ndi/lib.py index f30c4737e..a0df91a38 100644 --- a/src/scope/core/ndi/lib.py +++ b/src/scope/core/ndi/lib.py @@ -275,3 +275,9 @@ def setup_send_functions(lib: ctypes.CDLL) -> None: ctypes.c_void_p, ctypes.POINTER(NDIlib_video_frame_v2_t), ] + + lib.NDIlib_send_send_audio_v2.restype = None + lib.NDIlib_send_send_audio_v2.argtypes = [ + ctypes.c_void_p, + ctypes.POINTER(NDIlib_audio_frame_v2_t), + ] diff --git a/src/scope/core/outputs/ndi.py b/src/scope/core/outputs/ndi.py index 8afd0c700..22c3d3c5b 100644 --- a/src/scope/core/outputs/ndi.py +++ b/src/scope/core/outputs/ndi.py @@ -1,6 +1,6 @@ """NDI output sink implementation. -Sends processed video frames over the network via NDI. +Sends processed video frames and audio over the network via NDI. Uses the shared NDI ctypes bindings from scope.core.ndi. """ @@ -13,6 +13,7 @@ from scope.core.ndi import ( NDI_FOURCC_RGBA, + NDIlib_audio_frame_v2_t, NDIlib_send_create_t, NDIlib_video_frame_v2_t, load_library, @@ -162,6 +163,61 @@ def send_frame(self, frame: np.ndarray | torch.Tensor) -> bool: logger.error(f"Error sending NDI frame: {e}") return False + def send_audio( + self, + audio: np.ndarray | torch.Tensor, + sample_rate: int, + num_channels: int, + ) -> bool: + """Send audio samples over NDI. + + Args: + audio: Float32 audio samples. Shape (S,) for mono or (C, S) for multi-channel. + Values should be in [-1.0, 1.0] range. + sample_rate: Audio sample rate (e.g. 48000). + num_channels: Number of audio channels (e.g. 1 for mono). + + Returns: + True if send was successful. + """ + if self._send_instance is None or self._lib is None: + return False + + try: + if isinstance(audio, torch.Tensor): + if audio.is_cuda: + audio = audio.cpu() + audio = audio.numpy() + + audio = np.asarray(audio, dtype=np.float32) + + # Ensure contiguous + if not audio.flags["C_CONTIGUOUS"]: + audio = np.ascontiguousarray(audio) + + # NDI expects interleaved float32 samples + # For mono: shape (S,), for multi-channel: shape (C*S,) interleaved + num_samples = audio.shape[-1] if audio.ndim > 1 else len(audio) + + audio_frame = NDIlib_audio_frame_v2_t() + audio_frame.sample_rate = sample_rate + audio_frame.no_channels = num_channels + audio_frame.no_samples = num_samples + audio_frame.timecode = -1 # auto + audio_frame.p_data = audio.ctypes.data + audio_frame.channel_stride_in_bytes = num_samples * 4 # float32 = 4 bytes + audio_frame.p_metadata = None + audio_frame.timestamp = -1 # auto + + self._lib.NDIlib_send_send_audio_v2( + self._send_instance, ctypes.byref(audio_frame) + ) + return True + + except Exception as e: + logger.error(f"Error sending NDI audio: {e}") + return False + def resize(self, width: int, height: int): """Update output dimensions (NDI rebuilds frame struct per-send).""" self._width = width diff --git a/src/scope/server/frame_processor.py b/src/scope/server/frame_processor.py index ef411c3e8..63812a541 100644 --- a/src/scope/server/frame_processor.py +++ b/src/scope/server/frame_processor.py @@ -3,8 +3,10 @@ import threading import time import uuid +from collections import deque from typing import TYPE_CHECKING, Any +import numpy as np import torch from aiortc.mediastreams import VideoFrame @@ -18,6 +20,13 @@ from .cloud_connection import CloudConnectionManager +# Audio constants +WEBRTC_AUDIO_SAMPLE_RATE = 48000 # WebRTC standard output sample rate +AUDIO_FRAME_DURATION_MS = 20 # Standard WebRTC audio frame duration +AUDIO_SAMPLES_PER_FRAME = int( + WEBRTC_AUDIO_SAMPLE_RATE * AUDIO_FRAME_DURATION_MS / 1000 +) # 960 samples + logger = logging.getLogger(__name__) @@ -109,6 +118,15 @@ def __init__( self._playback_ready_emitted = False self._stream_start_time: float | None = None + # Audio buffer: accumulates resampled audio samples ready for WebRTC output. + # Stores interleaved float32 samples at WEBRTC_AUDIO_SAMPLE_RATE (48kHz). + # AudioProcessingTrack calls get_audio() to drain 20ms chunks. + self._audio_buffer = deque() # deque of np.ndarray chunks (mono, float32) + self._audio_buffer_lock = threading.Lock() + self._audio_buffer_samples = 0 # total samples buffered + self._audio_drain_thread: threading.Thread | None = None + self._audio_chunks_out = 0 + # Store pipeline_ids from initial_parameters if provided pipeline_ids = (initial_parameters or {}).get("pipeline_ids") if pipeline_ids is not None: @@ -206,6 +224,12 @@ def start(self): ) return + # Start audio drain thread to move audio from pipeline processor queue to buffer + self._audio_drain_thread = threading.Thread( + target=self._audio_drain_loop, daemon=True + ) + self._audio_drain_thread.start() + logger.info( f"[FRAME-PROCESSOR] Started with {len(self.pipeline_ids)} pipeline(s): {self.pipeline_ids}" ) @@ -234,6 +258,15 @@ def stop(self, error_message: str = None): # Clear pipeline processors self.pipeline_processors.clear() + # Wait for audio drain thread to finish + if self._audio_drain_thread and self._audio_drain_thread.is_alive(): + self._audio_drain_thread.join(timeout=2.0) + + # Clear audio buffer + with self._audio_buffer_lock: + self._audio_buffer.clear() + self._audio_buffer_samples = 0 + # Clean up all output sinks for sink_type, entry in list(self.output_sinks.items()): q = entry["queue"] @@ -474,6 +507,129 @@ def get(self) -> torch.Tensor | None: return frame + def _audio_drain_loop(self): + """Background thread that drains audio from the last pipeline processor's + audio_output_queue, resamples to 48kHz, and appends to the audio buffer. + """ + logger.info("[FRAME-PROCESSOR] Audio drain thread started") + + while self.running: + if not self.pipeline_processors: + time.sleep(0.01) + continue + + last_processor = self.pipeline_processors[-1] + try: + audio_tensor, sample_rate = last_processor.audio_output_queue.get( + timeout=0.1 + ) + except queue.Empty: + continue + + try: + # Convert torch tensor to numpy float32 + if isinstance(audio_tensor, torch.Tensor): + audio_np = audio_tensor.float().numpy() + else: + audio_np = np.asarray(audio_tensor, dtype=np.float32) + + # Ensure shape is [C, S] (channels, samples) + if audio_np.ndim == 1: + audio_np = audio_np[np.newaxis, :] # mono -> [1, S] + + # Mix down to mono for WebRTC (average channels) + if audio_np.shape[0] > 1: + audio_mono = audio_np.mean(axis=0) + else: + audio_mono = audio_np[0] + + # Resample to 48kHz if necessary + if sample_rate != WEBRTC_AUDIO_SAMPLE_RATE: + audio_mono = self._resample_audio( + audio_mono, sample_rate, WEBRTC_AUDIO_SAMPLE_RATE + ) + + # Append to buffer + with self._audio_buffer_lock: + self._audio_buffer.append(audio_mono) + self._audio_buffer_samples += len(audio_mono) + + # Also fan out to output sinks that support audio + if self.output_sinks: + for _sink_type, entry in self.output_sinks.items(): + sink = entry["sink"] + if hasattr(sink, "send_audio"): + try: + sink.send_audio(audio_mono, WEBRTC_AUDIO_SAMPLE_RATE, 1) + except Exception as e: + logger.debug( + f"Error sending audio to sink '{_sink_type}': {e}" + ) + + except Exception as e: + logger.error(f"[FRAME-PROCESSOR] Error processing audio chunk: {e}") + + logger.info( + f"[FRAME-PROCESSOR] Audio drain thread stopped ({self._audio_chunks_out} chunks served)" + ) + + @staticmethod + def _resample_audio(audio: np.ndarray, src_rate: int, dst_rate: int) -> np.ndarray: + """Simple linear interpolation resampling. + + For production quality, a proper resampler (e.g. libsamplerate) would be + better, but linear interpolation is sufficient for initial audio support. + """ + if src_rate == dst_rate: + return audio + duration = len(audio) / src_rate + num_output_samples = int(duration * dst_rate) + indices = np.linspace(0, len(audio) - 1, num_output_samples) + return np.interp(indices, np.arange(len(audio)), audio).astype(np.float32) + + def get_audio( + self, num_samples: int = AUDIO_SAMPLES_PER_FRAME + ) -> np.ndarray | None: + """Get the next chunk of audio samples for WebRTC output. + + Returns a mono float32 numpy array of length num_samples (default 960 = 20ms at 48kHz), + or None if no audio is available. + + Called by AudioProcessingTrack.recv(). + """ + if not self.running: + return None + + with self._audio_buffer_lock: + if self._audio_buffer_samples < num_samples: + return None + + # Collect enough samples from the buffer + collected = [] + remaining = num_samples + while remaining > 0 and self._audio_buffer: + chunk = self._audio_buffer[0] + if len(chunk) <= remaining: + collected.append(self._audio_buffer.popleft()) + self._audio_buffer_samples -= len(chunk) + remaining -= len(chunk) + else: + # Split chunk: take what we need, put the rest back + collected.append(chunk[:remaining]) + self._audio_buffer[0] = chunk[remaining:] + self._audio_buffer_samples -= remaining + remaining = 0 + + self._audio_chunks_out += 1 + + return np.concatenate(collected) if collected else None + + @property + def has_audio(self) -> bool: + """Check if any audio data is buffered.""" + with self._audio_buffer_lock: + return self._audio_buffer_samples > 0 + def _on_frame_from_cloud(self, frame: "VideoFrame") -> None: """Callback when a processed frame is received from cloud (cloud mode).""" self._frames_from_cloud += 1 diff --git a/src/scope/server/media_clock.py b/src/scope/server/media_clock.py new file mode 100644 index 000000000..237ebe3c3 --- /dev/null +++ b/src/scope/server/media_clock.py @@ -0,0 +1,61 @@ +"""Shared media clock for synchronizing audio and video streams. + +Provides a single source of truth for media timing so that audio and video +WebRTC tracks produce correlated PTS values. aiortc's RTCP Sender Reports +then map these to NTP wallclock time for receiver-side A/V sync. +""" + +import threading +import time + +# Standard WebRTC clock rates +AUDIO_CLOCK_RATE = 48000 # WebRTC audio: 48 kHz +VIDEO_CLOCK_RATE = 90000 # WebRTC video: 90 kHz + + +class MediaClock: + """Shared clock for synchronizing audio and video streams. + + Both VideoProcessingTrack and AudioProcessingTrack reference the same + MediaClock instance. The clock starts when the first media frame is ready + to play, and get_media_time() returns elapsed wall-clock seconds since then. + + PTS values derived from get_media_time() are correlated across tracks, + allowing the WebRTC receiver to synchronize audio and video playback. + """ + + def __init__(self): + self._start_time: float | None = None + self._lock = threading.Lock() + + def start(self): + """Start the clock. Call when the first media frame is ready to play. + + Safe to call multiple times; only the first call takes effect. + """ + with self._lock: + if self._start_time is None: + self._start_time = time.time() + + @property + def is_started(self) -> bool: + with self._lock: + return self._start_time is not None + + def get_media_time(self) -> float: + """Get elapsed media time in seconds since the clock started. + + Returns 0.0 if the clock hasn't been started yet. + """ + with self._lock: + if self._start_time is None: + return 0.0 + return time.time() - self._start_time + + def media_time_to_video_pts(self, media_time: float) -> int: + """Convert media time (seconds) to video PTS in 90 kHz clock units.""" + return int(media_time * VIDEO_CLOCK_RATE) + + def media_time_to_audio_pts(self, media_time: float) -> int: + """Convert media time (seconds) to audio PTS in 48 kHz sample units.""" + return int(media_time * AUDIO_CLOCK_RATE) diff --git a/src/scope/server/pipeline_processor.py b/src/scope/server/pipeline_processor.py index 971f99b9f..f47ff9ca7 100644 --- a/src/scope/server/pipeline_processor.py +++ b/src/scope/server/pipeline_processor.py @@ -67,6 +67,13 @@ def __init__( # Lock to protect input_queue assignment for thread-safe reference swapping self.input_queue_lock = threading.Lock() + # Audio output queue: stores (audio_tensor, sample_rate) tuples from pipeline output. + # Pipelines that produce audio return {"video": ..., "audio": ..., "audio_sample_rate": ...}. + # Only the last processor in a chain is read by FrameProcessor for audio output. + self.audio_output_queue: queue.Queue[tuple[torch.Tensor, int]] = queue.Queue( + maxsize=8 + ) + # Current parameters used by processing thread self.parameters = initial_parameters or {} # Queue for parameter updates from external threads @@ -206,6 +213,13 @@ def stop(self): except queue.Empty: break + # Clear audio output queue + while not self.audio_output_queue.empty(): + try: + self.audio_output_queue.get_nowait() + except queue.Empty: + break + logger.info(f"PipelineProcessor stopped for pipeline: {self.pipeline_id}") def update_parameters(self, parameters: dict[str, Any]): @@ -371,6 +385,12 @@ def process_chunk(self): self.output_queue.get_nowait() except queue.Empty: break + # Clear audio output queue + while not self.audio_output_queue.empty(): + try: + self.audio_output_queue.get_nowait() + except queue.Empty: + break requirements = None if hasattr(self.pipeline, "prepare"): @@ -509,6 +529,21 @@ def process_chunk(self): ) continue + # Extract audio from pipeline output and queue it + audio_output = output_dict.get("audio") + audio_sample_rate = output_dict.get("audio_sample_rate") + if audio_output is not None and audio_sample_rate is not None: + # Detach and move to CPU for downstream consumption + audio_output = audio_output.detach().cpu() + try: + self.audio_output_queue.put_nowait( + (audio_output, audio_sample_rate) + ) + except queue.Full: + logger.debug( + f"Audio output queue full for {self.pipeline_id}, dropping audio chunk" + ) + # Apply throttling if this pipeline is producing faster than next can consume # Only throttle if: (1) has video input, (2) has next processor if video_input is not None and self.next_processor is not None: diff --git a/src/scope/server/tracks.py b/src/scope/server/tracks.py index 3167a71e3..742262197 100644 --- a/src/scope/server/tracks.py +++ b/src/scope/server/tracks.py @@ -4,11 +4,17 @@ import threading import time +import numpy as np from aiortc import MediaStreamTrack from aiortc.mediastreams import VIDEO_CLOCK_RATE, VIDEO_TIME_BASE, MediaStreamError -from av import VideoFrame - -from .frame_processor import FrameProcessor +from av import AudioFrame, VideoFrame + +from .frame_processor import ( + AUDIO_SAMPLES_PER_FRAME, + WEBRTC_AUDIO_SAMPLE_RATE, + FrameProcessor, +) +from .media_clock import MediaClock from .pipeline_manager import PipelineManager logger = logging.getLogger(__name__) @@ -189,3 +195,69 @@ async def stop(self): self.frame_processor.stop() super().stop() + + +class AudioProcessingTrack(MediaStreamTrack): + """WebRTC audio track that reads from FrameProcessor's audio buffer. + + Produces 20ms audio frames (960 samples at 48kHz) synchronized with + the video track via a shared MediaClock. When no audio data is available, + silence frames are returned to keep the track alive. + """ + + kind = "audio" + + AUDIO_PTIME = AUDIO_SAMPLES_PER_FRAME / WEBRTC_AUDIO_SAMPLE_RATE # 0.02s (20ms) + + def __init__( + self, + frame_processor: FrameProcessor, + media_clock: MediaClock, + ): + super().__init__() + self.frame_processor = frame_processor + self.media_clock = media_clock + self._timestamp = 0 + self._started = False + self._last_frame_time: float | None = None + + async def recv(self) -> AudioFrame: + if self.readyState != "live": + raise MediaStreamError + + # Pace audio output at 20ms intervals + if self._last_frame_time is not None: + elapsed = time.time() - self._last_frame_time + wait = self.AUDIO_PTIME - elapsed + if wait > 0: + await asyncio.sleep(wait) + + self._last_frame_time = time.time() + + # Start the shared media clock on first audio frame + if not self._started: + self.media_clock.start() + self._started = True + + # Try to get audio data from the frame processor + audio_data = self.frame_processor.get_audio(AUDIO_SAMPLES_PER_FRAME) + + if audio_data is not None: + # Convert float32 [-1, 1] to int16 for WebRTC + audio_int16 = (np.clip(audio_data, -1.0, 1.0) * 32767.0).astype(np.int16) + else: + # Return silence when no audio is available + audio_int16 = np.zeros(AUDIO_SAMPLES_PER_FRAME, dtype=np.int16) + + # Create AudioFrame: shape must be (1, num_samples) for mono s16 layout + frame = AudioFrame.from_ndarray( + audio_int16.reshape(1, -1), format="s16", layout="mono" + ) + frame.sample_rate = WEBRTC_AUDIO_SAMPLE_RATE + + # Set PTS from shared media clock for A/V sync + media_time = self.media_clock.get_media_time() + frame.pts = self.media_clock.media_time_to_audio_pts(media_time) + frame.time_base = fractions.Fraction(1, WEBRTC_AUDIO_SAMPLE_RATE) + + return frame diff --git a/src/scope/server/webrtc.py b/src/scope/server/webrtc.py index cf10c7779..a05a65853 100644 --- a/src/scope/server/webrtc.py +++ b/src/scope/server/webrtc.py @@ -48,12 +48,14 @@ class Session: - """WebRTC Session containing peer connection and associated video track.""" + """WebRTC Session containing peer connection and associated tracks.""" def __init__( self, pc: RTCPeerConnection, video_track: MediaStreamTrack | None = None, + audio_track: "AudioProcessingTrack | None" = None, + media_clock: "MediaClock | None" = None, data_channel: RTCDataChannel | None = None, relay: MediaRelay | None = None, recording_manager: RecordingManager | None = None, @@ -64,6 +66,8 @@ def __init__( self.id = str(uuid.uuid4()) self.pc = pc self.video_track = video_track + self.audio_track = audio_track + self.media_clock = media_clock self.data_channel = data_channel self.relay = relay self.recording_manager = recording_manager @@ -226,6 +230,9 @@ async def handle_offer( # Create NotificationSender for this session to send notifications to the frontend notification_sender = NotificationSender() + # Create shared media clock for A/V synchronization + media_clock = MediaClock() + video_track = VideoProcessingTrack( pipeline_manager, initial_parameters=initial_parameters, @@ -236,6 +243,19 @@ async def handle_offer( connection_info=request.connection_info, ) session.video_track = video_track + session.media_clock = media_clock + + # Eagerly initialize the FrameProcessor so the AudioProcessingTrack + # can share it. VideoProcessingTrack.recv() normally does this lazily, + # but we need the reference now to wire audio. + video_track.initialize_output_processing() + + # Create AudioProcessingTrack sharing the same FrameProcessor and MediaClock + audio_track = AudioProcessingTrack( + frame_processor=video_track.frame_processor, + media_clock=media_clock, + ) + session.audio_track = audio_track # Create a MediaRelay to allow multiple consumers (WebRTC and recording) relay = MediaRelay() @@ -260,9 +280,12 @@ async def handle_offer( else: session.recording_manager = None - # Add the relayed track to WebRTC connection + # Add the relayed video track to WebRTC connection pc.addTrack(relayed_track) + # Add audio track to WebRTC connection + pc.addTrack(audio_track) + # Store relay for cleanup session.relay = relay From ad8f5d02f9c8cc4773588dc92110bfa23af39575 Mon Sep 17 00:00:00 2001 From: BuffMcBigHuge Date: Tue, 17 Feb 2026 17:39:44 -0500 Subject: [PATCH 2/9] Frontend audio work. Signed-off-by: BuffMcBigHuge --- frontend/src/components/VideoOutput.tsx | 49 ++++++++++++++++++++++++- frontend/src/hooks/useUnifiedWebRTC.ts | 15 ++++++-- 2 files changed, 59 insertions(+), 5 deletions(-) diff --git a/frontend/src/components/VideoOutput.tsx b/frontend/src/components/VideoOutput.tsx index 312477ca8..fa9bf80a2 100644 --- a/frontend/src/components/VideoOutput.tsx +++ b/frontend/src/components/VideoOutput.tsx @@ -1,4 +1,5 @@ import { useEffect, useRef, useState, useCallback } from "react"; +import { Volume2, VolumeX } from "lucide-react"; import { Card, CardContent, CardHeader, CardTitle } from "./ui/card"; import { Spinner } from "./ui/spinner"; import { PlayOverlay } from "./ui/play-overlay"; @@ -49,15 +50,47 @@ export function VideoOutput({ const [isFadingOut, setIsFadingOut] = useState(false); const overlayTimeoutRef = useRef(null); + // Audio state: start muted to comply with browser autoplay policy. + // User can click the speaker icon to unmute once the stream is playing. + const [isMuted, setIsMuted] = useState(true); + const [hasAudioTrack, setHasAudioTrack] = useState(false); + // Use external ref if provided, otherwise use internal const containerRef = videoContainerRef || internalContainerRef; useEffect(() => { if (videoRef.current && remoteStream) { videoRef.current.srcObject = remoteStream; + + // Check if the stream contains an audio track + const audioTracks = remoteStream.getAudioTracks(); + setHasAudioTrack(audioTracks.length > 0); + + // Listen for tracks being added later (audio may arrive after video) + const handleTrackAdded = () => { + const tracks = remoteStream.getAudioTracks(); + setHasAudioTrack(tracks.length > 0); + }; + remoteStream.addEventListener("addtrack", handleTrackAdded); + + return () => { + remoteStream.removeEventListener("addtrack", handleTrackAdded); + }; } }, [remoteStream]); + // Sync muted state to the video element + useEffect(() => { + if (videoRef.current) { + videoRef.current.muted = isMuted; + } + }, [isMuted]); + + const toggleMute = useCallback((e: React.MouseEvent) => { + e.stopPropagation(); // Don't trigger play/pause or pointer lock + setIsMuted(prev => !prev); + }, []); + // Listen for video playing event to notify parent useEffect(() => { const video = videoRef.current; @@ -174,9 +207,23 @@ export function VideoOutput({ : "max-w-full max-h-full object-contain" } autoPlay - muted + muted={isMuted} playsInline /> + {/* Audio mute/unmute toggle - only shown when stream has audio */} + {hasAudioTrack && ( + + )} {/* Play/Pause Overlay */} {showOverlay && (
diff --git a/frontend/src/hooks/useUnifiedWebRTC.ts b/frontend/src/hooks/useUnifiedWebRTC.ts index 190d1d41f..18be17e09 100644 --- a/frontend/src/hooks/useUnifiedWebRTC.ts +++ b/frontend/src/hooks/useUnifiedWebRTC.ts @@ -221,11 +221,18 @@ export function useUnifiedWebRTC(options?: UseUnifiedWebRTCOptions) { } // Event handlers + // Collect all incoming tracks (video + audio) into a single MediaStream. + // The backend may send video and audio as separate streams, so we + // merge them into one MediaStream for the