From f871bcafec1aeef401a253e3982ac79d10bebd94 Mon Sep 17 00:00:00 2001 From: junhyr Date: Mon, 23 Feb 2026 18:26:20 +0900 Subject: [PATCH] feat: optimize webrtc frame pipeline latency and correctness Signed-off-by: junhyr --- src/scope/server/cloud_track.py | 4 +- src/scope/server/frame_processor.py | 80 +++++++++++++++++++++----- src/scope/server/pipeline_processor.py | 18 +++++- src/scope/server/tracks.py | 5 +- 4 files changed, 89 insertions(+), 18 deletions(-) diff --git a/src/scope/server/cloud_track.py b/src/scope/server/cloud_track.py index eb4ca03c4..9b0a6de99 100644 --- a/src/scope/server/cloud_track.py +++ b/src/scope/server/cloud_track.py @@ -110,6 +110,7 @@ async def _start(self) -> None: connection_id=self.connection_id, connection_info=self.connection_info, ) + self.frame_processor.set_event_loop(asyncio.get_running_loop()) self.frame_processor.start() # Start input processing if we have a source track @@ -194,8 +195,7 @@ async def recv(self) -> VideoFrame: self._last_frame = frame return frame - # No frame yet, wait a bit - await asyncio.sleep(0.01) + await self.frame_processor.wait_for_output() def update_parameters(self, params: dict) -> None: """Update pipeline parameters on cloud.""" diff --git a/src/scope/server/frame_processor.py b/src/scope/server/frame_processor.py index ef411c3e8..58f5d4701 100644 --- a/src/scope/server/frame_processor.py +++ b/src/scope/server/frame_processor.py @@ -1,3 +1,4 @@ +import asyncio import logging import queue import threading @@ -76,8 +77,10 @@ def __init__( self.paused = False - # Pinned memory buffer cache for faster GPU transfers (local mode only) - self._pinned_buffer_cache = {} + # Pinned memory double-buffer cache for faster GPU transfers (local mode only) + # Maps shape → (buffer_a, buffer_b) to avoid DA race conditions + self._pinned_buffer_cache: dict[tuple, tuple[torch.Tensor, torch.Tensor]] = {} + self._pinned_buffer_index: dict[tuple, int] = {} self._pinned_buffer_lock = threading.Lock() # Cloud mode: send frames to cloud instead of local processing @@ -97,6 +100,10 @@ def __init__( # Input mode: video waits for frames, text generates immediately self._video_mode = (initial_parameters or {}).get("input_mode") == "video" + # Event-based output signaling (replaces 10ms polling sleep) + self._output_event: asyncio.Event | None = None + self._event_loop: asyncio.AbstractEventLoop | None = None + # Pipeline chaining support self.pipeline_processors: list[PipelineProcessor] = [] self.pipeline_ids: list[str] = [] @@ -330,17 +337,23 @@ def stop(self, error_message: str = None): ) def _get_or_create_pinned_buffer(self, shape): - """Get or create a reusable pinned memory buffer for the given shape. + """Get the next available pinned memory buffer for the given shape. - This avoids repeated pinned memory allocations, which are expensive. - Pinned memory enables faster DMA transfers to GPU. + Uses double-buffering to prevent data corruption: while one buffer's + DMA transfer is in flight (via cuda(non_blocking=True)), the other + buffer is available for the next frame's copy_() call. """ with self._pinned_buffer_lock: if shape not in self._pinned_buffer_cache: - self._pinned_buffer_cache[shape] = torch.empty( - shape, dtype=torch.uint8, pin_memory=True - ) - return self._pinned_buffer_cache[shape] + buffer_a = torch.empty(shape, dtype=torch.uint8, pin_memory=True) + buffer_b = torch.empty(shape, dtype=torch.uint8, pin_memory=True) + self._pinned_buffer_cache[shape] = (buffer_a, buffer_b) + self._pinned_buffer_index[shape] = 0 + + idx = self._pinned_buffer_index[shape] + buffer = self._pinned_buffer_cache[shape][idx] + self._pinned_buffer_index[shape] = 1 - idx + return buffer def put(self, frame: VideoFrame) -> bool: if not self.running: @@ -377,10 +390,9 @@ def put(self, frame: VideoFrame) -> bool: if torch.cuda.is_available(): shape = frame_array.shape + # Double-buffered: alternates between two pinned buffers so the + # prior DMA (non_blocking=True) can finish while we write into the other. pinned_buffer = self._get_or_create_pinned_buffer(shape) - # Note: We reuse pinned buffers for performance. This assumes the copy_() - # operation completes before the next frame arrives. - # In practice, copy_() is very fast (~microseconds) and frames arrive at 60 FPS max pinned_buffer.copy_(torch.as_tensor(frame_array, dtype=torch.uint8)) frame_tensor = pinned_buffer.cuda(non_blocking=True) else: @@ -424,9 +436,12 @@ def get(self) -> torch.Tensor | None: try: frame = last_processor.output_queue.get_nowait() # Frame is stored as [1, H, W, C], convert to [H, W, C] for output - # Move to CPU here for WebRTC streaming (frames stay on GPU between pipeline processors) + # Frames are already on CPU (moved by the last PipelineProcessor). frame = frame.squeeze(0) if frame.is_cuda: + logger.warning( + "Frame from last processor is still on GPU, moving to CPU" + ) frame = frame.cpu() except queue.Empty: return None @@ -490,6 +505,7 @@ def _on_frame_from_cloud(self, frame: "VideoFrame") -> None: self._cloud_output_queue.put_nowait(frame_np) except queue.Empty: pass + self._signal_output_ready() except Exception as e: logger.error(f"[FRAME-PROCESSOR] Error processing frame from cloud: {e}") @@ -949,6 +965,11 @@ def _setup_pipeline_chain_sync(self): curr_processor = self.pipeline_processors[i] prev_processor.set_next_processor(curr_processor) + if self.pipeline_processors: + self.pipeline_processors[-1].set_output_ready_callback( + self._signal_output_ready + ) + # Start all processors for processor in self.pipeline_processors: processor.start() @@ -957,6 +978,39 @@ def _setup_pipeline_chain_sync(self): f"Created pipeline chain with {len(self.pipeline_processors)} processors" ) + def set_event_loop(self, loop: asyncio.AbstractEventLoop): + """Register the asyncio event loop for thread-safe event signaling. + + Must be called from the asyncio thread before recv() polling begins. + """ + self._event_loop = loop + self._output_event = asyncio.Event() + + def _signal_output_ready(self): + """Signal that a new output frame is available. + + Thread-safe: can be called from any thread. + """ + if self._event_loop is not None and self._output_event is not None: + try: + self._event_loop.call_soon_threadsafe(self._output_event.set) + except RuntimeError: + pass + + async def wait_for_output(self, timeout: float = 0.1) -> None: + """Wait until a new output frame is available or timeout expires. + + Returns immediately if a frame is already available. + """ + if self._output_event is None: + await asyncio.sleep(0.01) + return + try: + await asyncio.wait_for(self._output_event.wait(), timeout=timeout) + except TimeoutError: + pass + self._output_event.clear() + def __enter__(self): self.start() return self diff --git a/src/scope/server/pipeline_processor.py b/src/scope/server/pipeline_processor.py index b11638996..2283409ab 100644 --- a/src/scope/server/pipeline_processor.py +++ b/src/scope/server/pipeline_processor.py @@ -108,11 +108,18 @@ def __init__( # Set when reset_cache flushes queues, cleared after successful pipeline call self._pending_cache_init = False + # Callback to signal that output frames are available (set by FrameProcessor) + self._output_ready_callback: callable | None = None + # Throttler for controlling processing rate in chained pipelines # Throttling is applied when this pipeline produces frames faster than # the next pipeline in the chain can consume them self.throttler = PipelineThrottler() + def set_output_ready_callback(self, callback: callable): + """Set callback to invoke when frames are placed in the output queue.""" + self._output_ready_callback = callback + def _resize_output_queue(self, target_size: int): """Resize the output queue to the target size, transferring existing frames. @@ -492,7 +499,6 @@ def process_chunk(self): self.throttler.record_output_batch(num_frames, processing_time) # Normalize to [0, 255] and convert to uint8 - # Keep frames on GPU - frame_processor handles CPU transfer for streaming output = ( (output * 255.0) .clamp(0, 255) @@ -501,6 +507,11 @@ def process_chunk(self): .detach() ) + # Move to CPU in the worker thread for the last processor in the chain. + # Intermediate processors keep frames on GPU for the next pipeline. + if self.next_processor is None: + output = output.cpu() + # Resize output queue to meet target max size target_output_queue_max_size = num_frames * OUTPUT_QUEUE_MAX_SIZE_FACTOR self._resize_output_queue(target_output_queue_max_size) @@ -509,18 +520,23 @@ def process_chunk(self): # For intermediate pipelines, output goes to next pipeline's input # For last pipeline, output goes to frame_processor's output_queue # Output frames are [H, W, C], convert to [1, H, W, C] for consistency + frames_queued = False for frame in output: frame = frame.unsqueeze(0) # Track when a frame is ready (production rate) self._track_output_frame() try: self.output_queue.put_nowait(frame) + frames_queued = True except queue.Full: logger.debug( f"Output queue full for {self.pipeline_id}, dropping processed frame" ) continue + if frames_queued and self._output_ready_callback is not None: + self._output_ready_callback() + # Apply throttling if this pipeline is producing faster than next can consume # Only throttle if: (1) has video input, (2) has next processor if video_input is not None and self.next_processor is not None: diff --git a/src/scope/server/tracks.py b/src/scope/server/tracks.py index 3167a71e3..818eb6aa3 100644 --- a/src/scope/server/tracks.py +++ b/src/scope/server/tracks.py @@ -114,6 +114,7 @@ def initialize_output_processing(self): connection_id=self.connection_id, connection_info=self.connection_info, ) + self.frame_processor.set_event_loop(asyncio.get_running_loop()) self.frame_processor.start() def initialize_input_processing(self, track: MediaStreamTrack): @@ -158,8 +159,8 @@ async def recv(self) -> VideoFrame: self._last_frame = frame return frame - # No frame available, wait a bit before trying again - await asyncio.sleep(0.01) + # Wait for a frame to be produced + await self.frame_processor.wait_for_output() except Exception as e: logger.error(f"Error getting processed frame: {e}")