Skip to content

Conversation

@codeflash-ai
Copy link

@codeflash-ai codeflash-ai bot commented Dec 4, 2025

📄 26% (0.26x) speedup for VideoSourcesManager.retrieve_frames_from_sources in inference/core/interfaces/camera/utils.py

⏱️ Runtime : 212 microseconds 168 microseconds (best of 30 runs)

📝 Explanation and details

The optimized code achieves a 25% speedup through several key micro-optimizations focused on the hot path of the retrieve_frames_from_sources method:

Primary Optimizations:

  1. Inlined method call elimination: The original code called _is_source_inactive() for every source in the loop (277 calls taking 44.7% of total time). The optimized version inlines this check directly as source_ord in self._ended_sources or source_ord in self._reconnection_threads, eliminating function call overhead entirely.

  2. Loop structure optimization: Replaced the enumerate(zip(...)) pattern with a simple range(total_sources) loop and direct indexing. This avoids creating intermediate tuples and iterator objects, improving cache locality and reducing allocation overhead.

  3. Reduced datetime operations: Cached datetime.now as a function reference outside the loop when timeout calculations are needed, preventing repeated attribute lookups in the hot path.

  4. Pre-cached attribute access: Moved self._video_sources.all_sources and self._video_sources.allow_reconnection to local variables, eliminating repeated attribute access overhead in the loop.

  5. Minor copy optimization: In join_all_reconnection_threads, replaced copy(self._threads_to_join) with set(self._threads_to_join) to avoid unnecessary copying.

Performance Impact by Test Case:

  • Large-scale scenarios show the biggest gains (25.7% to 39.9% faster) where the loop optimizations compound across many sources
  • Basic operations see consistent 17-32% improvements across various conditions
  • Early exit scenarios benefit significantly (32.7% faster) due to reduced per-iteration overhead

These optimizations are particularly valuable for video processing workloads where retrieve_frames_from_sources is called frequently in real-time scenarios, making the cumulative effect of these micro-optimizations substantial for overall system performance.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 36 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 95.0%
🌀 Generated Regression Tests and Runtime
from copy import copy
from datetime import datetime, timedelta
from queue import Empty, Queue
from threading import Thread
from typing import Any, Callable, List, Optional

# imports
import pytest
from inference.core.interfaces.camera.utils import VideoSourcesManager

# ---- Minimal stubs for dependencies ----


# Exception classes
class EndOfStreamError(Exception):
    pass


# VideoFrame entity
class VideoFrame:
    def __init__(self, frame_id, source_id, frame_timestamp=None):
        self.frame_id = frame_id
        self.source_id = source_id
        self.frame_timestamp = frame_timestamp or datetime.now()


# VideoSource stub
class DummyVideoSource:
    """
    A dummy video source for testing.
    - frames: list of VideoFrame or special values (e.g., 'raise_eos', None)
    - read_frame returns frames in order, raises EndOfStreamError on 'raise_eos'
    """

    def __init__(self, frames: List[Any]):
        self.frames = frames
        self.read_calls = 0

    def read_frame(self, timeout: Optional[float] = None) -> Optional[VideoFrame]:
        if self.read_calls >= len(self.frames):
            return None
        val = self.frames[self.read_calls]
        self.read_calls += 1
        if val == "raise_eos":
            raise EndOfStreamError("End of stream")
        return val


# VideoSources container
class VideoSources:
    def __init__(
        self, all_sources: List[DummyVideoSource], allow_reconnection: List[bool]
    ):
        self.all_sources = all_sources
        self.allow_reconnection = allow_reconnection


# ---- Unit tests ----


# Helper for VideoFrame lists
def make_frames(source_id, count):
    return [VideoFrame(frame_id=i, source_id=source_id) for i in range(count)]


# ----------------- BASIC TEST CASES -----------------


def test_single_source_multiple_frames():
    # One source, multiple frames (should only get one per call)
    frames = make_frames(0, 3)
    src = DummyVideoSource(frames)
    vs = VideoSources([src], [False])
    manager = VideoSourcesManager(
        vs, should_stop=lambda: False, on_reconnection_error=lambda a, b: None
    )
    # First call gets frame 0
    codeflash_output = manager.retrieve_frames_from_sources(
        batch_collection_timeout=None
    )
    result1 = codeflash_output  # 5.22μs -> 3.98μs (31.1% faster)
    # Second call gets frame 1
    codeflash_output = manager.retrieve_frames_from_sources(
        batch_collection_timeout=None
    )
    result2 = codeflash_output  # 2.26μs -> 1.91μs (18.4% faster)


def test_batch_collection_timeout_positive():
    # batch_collection_timeout > 0 should pass time value to read_frame
    called = []

    class MySource(DummyVideoSource):
        def read_frame(self, timeout=None):
            called.append(timeout)
            return super().read_frame(timeout)

    src = MySource(make_frames(0, 1))
    vs = VideoSources([src], [False])
    manager = VideoSourcesManager(
        vs, should_stop=lambda: False, on_reconnection_error=lambda a, b: None
    )
    manager.retrieve_frames_from_sources(
        batch_collection_timeout=1.5
    )  # 10.8μs -> 9.05μs (19.3% faster)


# ----------------- EDGE TEST CASES -----------------


def test_no_sources():
    # No sources at all
    vs = VideoSources([], [])
    manager = VideoSourcesManager(
        vs, should_stop=lambda: False, on_reconnection_error=lambda a, b: None
    )
    codeflash_output = manager.retrieve_frames_from_sources(
        batch_collection_timeout=None
    )
    result = codeflash_output  # 3.12μs -> 2.58μs (20.9% faster)


def test_should_stop_triggers_early_exit():
    # should_stop returns True, should return None
    src = DummyVideoSource(make_frames(0, 1))
    vs = VideoSources([src], [False])
    called = []

    def should_stop():
        called.append(True)
        return True

    manager = VideoSourcesManager(
        vs, should_stop=should_stop, on_reconnection_error=lambda a, b: None
    )
    codeflash_output = manager.retrieve_frames_from_sources(
        batch_collection_timeout=None
    )
    result = codeflash_output  # 4.74μs -> 3.57μs (32.7% faster)


def test_multiple_sources_some_inactive():
    # Some sources inactive, only active ones yield frames
    src0 = DummyVideoSource(make_frames(0, 1))
    src1 = DummyVideoSource(make_frames(1, 1))
    vs = VideoSources([src0, src1], [False, False])
    manager = VideoSourcesManager(
        vs, should_stop=lambda: False, on_reconnection_error=lambda a, b: None
    )
    manager._ended_sources.add(1)
    codeflash_output = manager.retrieve_frames_from_sources(
        batch_collection_timeout=None
    )
    result = codeflash_output  # 5.79μs -> 4.29μs (35.2% faster)


def test_source_returns_none_then_frame():
    # Source returns None first, then frame
    src = DummyVideoSource([None, VideoFrame(1, 0)])
    vs = VideoSources([src], [False])
    manager = VideoSourcesManager(
        vs, should_stop=lambda: False, on_reconnection_error=lambda a, b: None
    )
    codeflash_output = manager.retrieve_frames_from_sources(
        batch_collection_timeout=None
    )
    result1 = codeflash_output  # 4.81μs -> 3.99μs (20.5% faster)
    codeflash_output = manager.retrieve_frames_from_sources(
        batch_collection_timeout=None
    )
    result2 = codeflash_output  # 2.42μs -> 1.87μs (29.4% faster)


def test_batch_timeout_expired():
    # batch_collection_timeout already expired (simulate by setting _last_batch_yielded_time in future)
    src = DummyVideoSource(make_frames(0, 1))
    vs = VideoSources([src], [False])
    manager = VideoSourcesManager(
        vs, should_stop=lambda: False, on_reconnection_error=lambda a, b: None
    )
    manager._last_batch_yielded_time = datetime.now() + timedelta(seconds=10)
    codeflash_output = manager.retrieve_frames_from_sources(
        batch_collection_timeout=0.1
    )
    result = codeflash_output  # 8.27μs -> 6.76μs (22.3% faster)


# ----------------- LARGE SCALE TEST CASES -----------------


def test_many_sources_all_frames():
    # 100 sources, each with one frame
    N = 100
    sources = [DummyVideoSource([VideoFrame(0, i)]) for i in range(N)]
    vs = VideoSources(sources, [False] * N)
    manager = VideoSourcesManager(
        vs, should_stop=lambda: False, on_reconnection_error=lambda a, b: None
    )
    codeflash_output = manager.retrieve_frames_from_sources(
        batch_collection_timeout=None
    )
    result = codeflash_output  # 42.8μs -> 34.0μs (25.7% faster)
    ids = set(f.source_id for f in result)


def test_many_sources_some_inactive():
    # 100 sources, half inactive
    N = 100
    sources = [DummyVideoSource([VideoFrame(0, i)]) for i in range(N)]
    vs = VideoSources(sources, [False] * N)
    manager = VideoSourcesManager(
        vs, should_stop=lambda: False, on_reconnection_error=lambda a, b: None
    )
    # Mark even sources as inactive
    for i in range(0, N, 2):
        manager._ended_sources.add(i)
    codeflash_output = manager.retrieve_frames_from_sources(
        batch_collection_timeout=None
    )
    result = codeflash_output  # 32.5μs -> 23.2μs (39.9% faster)
    ids = set(f.source_id for f in result)


def test_many_sources_should_stop_midway():
    # 50 sources, should_stop returns True after 10 calls
    N = 50
    sources = [DummyVideoSource([VideoFrame(0, i)]) for i in range(N)]
    vs = VideoSources(sources, [False] * N)
    calls = []

    def should_stop():
        calls.append(1)
        return len(calls) > 10

    manager = VideoSourcesManager(
        vs, should_stop=should_stop, on_reconnection_error=lambda a, b: None
    )
    codeflash_output = manager.retrieve_frames_from_sources(
        batch_collection_timeout=None
    )
    result = codeflash_output  # 10.6μs -> 8.83μs (19.6% faster)
from copy import copy
from datetime import datetime, timedelta
from queue import Empty, Queue
from threading import Thread
from typing import Callable, List, Optional

# imports
import pytest
from inference.core.interfaces.camera.utils import VideoSourcesManager


class VideoFrame:
    def __init__(self, frame_id, frame_timestamp, source_id):
        self.frame_id = frame_id
        self.frame_timestamp = frame_timestamp
        self.source_id = source_id


class VideoSources:
    def __init__(self, sources: List[VideoSource], allow_reconnection: List[bool]):
        self.all_sources = sources
        self.allow_reconnection = allow_reconnection


# --- Basic Test Cases ---


def test_empty_sources_list():
    """No sources: should return empty list."""
    mgr = VideoSourcesManager(
        VideoSources([], []),
        should_stop=lambda: False,
        on_reconnection_error=lambda a, b: None,
    )
    codeflash_output = mgr.retrieve_frames_from_sources(batch_collection_timeout=None)
    result = codeflash_output  # 4.05μs -> 3.44μs (17.5% faster)

To edit these changes git checkout codeflash/optimize-VideoSourcesManager.retrieve_frames_from_sources-miqutslf and push.

Codeflash Static Badge

The optimized code achieves a **25% speedup** through several key micro-optimizations focused on the hot path of the `retrieve_frames_from_sources` method:

**Primary Optimizations:**

1. **Inlined method call elimination**: The original code called `_is_source_inactive()` for every source in the loop (277 calls taking 44.7% of total time). The optimized version inlines this check directly as `source_ord in self._ended_sources or source_ord in self._reconnection_threads`, eliminating function call overhead entirely.

2. **Loop structure optimization**: Replaced the `enumerate(zip(...))` pattern with a simple `range(total_sources)` loop and direct indexing. This avoids creating intermediate tuples and iterator objects, improving cache locality and reducing allocation overhead.

3. **Reduced datetime operations**: Cached `datetime.now` as a function reference outside the loop when timeout calculations are needed, preventing repeated attribute lookups in the hot path.

4. **Pre-cached attribute access**: Moved `self._video_sources.all_sources` and `self._video_sources.allow_reconnection` to local variables, eliminating repeated attribute access overhead in the loop.

5. **Minor copy optimization**: In `join_all_reconnection_threads`, replaced `copy(self._threads_to_join)` with `set(self._threads_to_join)` to avoid unnecessary copying.

**Performance Impact by Test Case:**
- **Large-scale scenarios** show the biggest gains (25.7% to 39.9% faster) where the loop optimizations compound across many sources
- **Basic operations** see consistent 17-32% improvements across various conditions
- **Early exit scenarios** benefit significantly (32.7% faster) due to reduced per-iteration overhead

These optimizations are particularly valuable for video processing workloads where `retrieve_frames_from_sources` is called frequently in real-time scenarios, making the cumulative effect of these micro-optimizations substantial for overall system performance.
@codeflash-ai codeflash-ai bot requested a review from mashraf-222 December 4, 2025 03:06
@codeflash-ai codeflash-ai bot added ⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash labels Dec 4, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant