Skip to content

Conversation

@codeflash-ai
Copy link

@codeflash-ai codeflash-ai bot commented Dec 4, 2025

📄 55% (0.55x) speedup for BasePipelineWatchDog.on_status_update in inference/core/interfaces/stream/watchdog.py

⏱️ Runtime : 2.60 milliseconds 1.68 milliseconds (best of 5 runs)

📝 Explanation and details

The optimization improves performance by caching a frequently accessed enum value and reducing attribute lookups in a hot path method.

Key optimizations:

  1. Pre-cache the DEBUG severity value: During initialization, UpdateSeverity.DEBUG.value is stored in self._debug_severity_value to avoid repeated enum attribute lookups.

  2. Extract severity value once: In on_status_update, status_update.severity.value is retrieved once into a local variable instead of being accessed twice in the original comparison.

Why this speeds up the code:

  • Eliminates repeated enum lookups: The original code performed UpdateSeverity.DEBUG.value on every call (8,989 hits according to the profiler). Enum attribute access involves dictionary lookups and is relatively expensive in Python.
  • Reduces attribute chain traversals: status_update.severity.value went from being accessed twice to once per call, cutting attribute access overhead.
  • Improves CPU cache locality: Local variables are faster to access than instance attributes or enum values.

Performance impact analysis:

The line profiler shows the comparison line dropped from 91.5% to 78.7% of total execution time, with per-hit time improving from 3,754ns to 2,527ns - a 33% improvement on the hottest line. This translated to an overall 55% speedup (2.60ms → 1.68ms).

Test case performance:

The optimization is particularly effective for:

  • High-frequency calls with DEBUG updates (50-90% faster in many test cases)
  • Bulk operations processing many status updates (50-60% faster for large-scale tests)
  • Mixed severity scenarios where the comparison happens frequently

This optimization would be especially valuable if on_status_update is called frequently in video processing pipelines or real-time monitoring systems where status updates are generated at high rates.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 11 Passed
🌀 Generated Regression Tests 4564 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 100.0%
⚙️ Existing Unit Tests and Runtime
🌀 Generated Regression Tests and Runtime
from collections import deque
from enum import Enum
from typing import Dict, List, Optional

# imports
import pytest
from inference.core.interfaces.stream.watchdog import BasePipelineWatchDog

# --- Minimal stubs for dependencies to make the function testable ---


# Severity enum for status updates
class UpdateSeverity(Enum):
    DEBUG = 0
    INFO = 1
    WARNING = 2
    ERROR = 3
    CRITICAL = 4


# StatusUpdate stub
class StatusUpdate:
    def __init__(self, severity, message=""):
        self.severity = severity
        self.message = message


# VideoSource stub
class VideoSource:
    def __init__(self, id):
        self.id = id


# LatencyMonitor stub
class LatencyMonitor:
    pass


# FPSMonitor stub
class FPSMonitor:
    pass


# --- Actual implementation of BasePipelineWatchDog ---

MAX_UPDATES_CONTEXT = 512


class PipelineWatchDog:
    def __init__(self):
        pass


from inference.core.interfaces.stream.watchdog import BasePipelineWatchDog

# --- Unit tests for BasePipelineWatchDog.on_status_update ---

# 1. Basic Test Cases


def test_status_update_info_is_stored():
    """Test that INFO severity updates are stored."""
    wd = BasePipelineWatchDog()
    su = StatusUpdate(UpdateSeverity.INFO, "Info message")
    wd.on_status_update(su)  # 4.50μs -> 1.86μs (142% faster)


def test_status_update_warning_is_stored():
    """Test that WARNING severity updates are stored."""
    wd = BasePipelineWatchDog()
    su = StatusUpdate(UpdateSeverity.WARNING, "Warning message")
    wd.on_status_update(su)  # 2.06μs -> 1.28μs (61.0% faster)


def test_status_update_error_is_stored():
    """Test that ERROR severity updates are stored."""
    wd = BasePipelineWatchDog()
    su = StatusUpdate(UpdateSeverity.ERROR, "Error message")
    wd.on_status_update(su)  # 2.24μs -> 1.20μs (86.2% faster)


def test_status_update_critical_is_stored():
    """Test that CRITICAL severity updates are stored."""
    wd = BasePipelineWatchDog()
    su = StatusUpdate(UpdateSeverity.CRITICAL, "Critical message")
    wd.on_status_update(su)  # 1.92μs -> 1.10μs (74.4% faster)


def test_status_update_debug_is_not_stored():
    """Test that DEBUG severity updates are not stored."""
    wd = BasePipelineWatchDog()
    su = StatusUpdate(UpdateSeverity.DEBUG, "Debug message")
    wd.on_status_update(su)  # 2.19μs -> 1.14μs (91.9% faster)


# 2. Edge Test Cases


def test_multiple_debug_updates_are_not_stored():
    """Test that multiple DEBUG updates are not stored."""
    wd = BasePipelineWatchDog()
    for i in range(10):
        wd.on_status_update(
            StatusUpdate(UpdateSeverity.DEBUG, f"Debug {i}")
        )  # 7.04μs -> 4.47μs (57.3% faster)


def test_none_status_update_raises():
    """Test that passing None as status_update raises AttributeError."""
    wd = BasePipelineWatchDog()
    with pytest.raises(AttributeError):
        wd.on_status_update(None)  # 1.38μs -> 1.64μs (16.0% slower)


def test_status_update_with_custom_severity_value():
    """Test with a custom severity value just above DEBUG."""

    class CustomSeverity:
        def __init__(self, value):
            self.value = value

    wd = BasePipelineWatchDog()
    # value = 1 (INFO), should be stored
    su = StatusUpdate(CustomSeverity(UpdateSeverity.INFO.value), "Custom INFO")
    wd.on_status_update(su)  # 1.57μs -> 727ns (116% faster)
    # value = 0 (DEBUG), should not be stored
    su2 = StatusUpdate(CustomSeverity(UpdateSeverity.DEBUG.value), "Custom DEBUG")
    wd.on_status_update(su2)  # 650ns -> 299ns (117% faster)


def test_status_update_severity_boundary():
    """Test boundary condition: severity exactly DEBUG (should not store), and exactly INFO (should store)."""
    wd = BasePipelineWatchDog()
    su_debug = StatusUpdate(UpdateSeverity.DEBUG, "Boundary debug")
    su_info = StatusUpdate(UpdateSeverity.INFO, "Boundary info")
    wd.on_status_update(su_debug)  # 1.70μs -> 1.13μs (50.4% faster)
    wd.on_status_update(su_info)  # 820ns -> 541ns (51.6% faster)


def test_status_update_with_missing_severity_attribute():
    """Test that a status_update missing severity attribute raises AttributeError."""
    wd = BasePipelineWatchDog()

    class IncompleteStatusUpdate:
        def __init__(self, message):
            self.message = message

    with pytest.raises(AttributeError):
        wd.on_status_update(
            IncompleteStatusUpdate("No severity")
        )  # 1.45μs -> 1.50μs (3.07% slower)


def test_status_update_severity_value_is_none():
    """Test that a status_update with severity.value == None raises TypeError."""
    wd = BasePipelineWatchDog()

    class WeirdSeverity:
        value = None

    su = StatusUpdate(WeirdSeverity(), "None value")
    with pytest.raises(TypeError):
        wd.on_status_update(su)  # 3.65μs -> 2.18μs (67.5% faster)


def test_stream_updates_maxlen_is_enforced():
    """Test that only the last MAX_UPDATES_CONTEXT updates are kept."""
    wd = BasePipelineWatchDog()
    for i in range(MAX_UPDATES_CONTEXT + 10):
        wd.on_status_update(
            StatusUpdate(UpdateSeverity.INFO, f"update {i}")
        )  # 291μs -> 189μs (54.0% faster)


def test_stream_updates_empty_initially():
    """Test that _stream_updates is empty on construction."""
    wd = BasePipelineWatchDog()


# 3. Large Scale Test Cases


def test_large_number_of_status_updates():
    """Test storing a large number of status updates (INFO/ERROR/CRITICAL)."""
    wd = BasePipelineWatchDog()
    severities = [
        UpdateSeverity.INFO,
        UpdateSeverity.WARNING,
        UpdateSeverity.ERROR,
        UpdateSeverity.CRITICAL,
    ]
    for i in range(500):
        sev = severities[i % len(severities)]
        wd.on_status_update(
            StatusUpdate(sev, f"msg {i}")
        )  # 284μs -> 179μs (58.4% faster)
    # All should be above DEBUG
    for su in wd._stream_updates:
        pass


def test_large_number_of_mixed_severities():
    """Test a large number of mixed severity updates, only non-DEBUG are stored."""
    wd = BasePipelineWatchDog()
    for i in range(800):
        sev = UpdateSeverity.DEBUG if i % 5 == 0 else UpdateSeverity.INFO
        wd.on_status_update(
            StatusUpdate(sev, f"msg {i}")
        )  # 446μs -> 287μs (55.2% faster)
    # Only those not divisible by 5 should be stored
    expected_count = 800 - (800 // 5)
    # But limited by MAX_UPDATES_CONTEXT
    expected_count = min(expected_count, MAX_UPDATES_CONTEXT)


def test_stream_updates_performance_under_max_load():
    """Test performance and correctness near the maxlen boundary."""
    wd = BasePipelineWatchDog()
    # Fill up to maxlen with ERROR updates
    for i in range(MAX_UPDATES_CONTEXT):
        wd.on_status_update(
            StatusUpdate(UpdateSeverity.ERROR, f"err {i}")
        )  # 296μs -> 184μs (60.9% faster)
    # Add more updates to force rotation
    for i in range(20):
        wd.on_status_update(
            StatusUpdate(UpdateSeverity.CRITICAL, f"crit {i}")
        )  # 11.4μs -> 7.36μs (55.1% faster)


def test_stream_updates_with_varied_severity_and_messages():
    """Test that all types of messages above DEBUG are stored and retrievable."""
    wd = BasePipelineWatchDog()
    messages = []
    for i in range(100):
        sev = [
            UpdateSeverity.INFO,
            UpdateSeverity.WARNING,
            UpdateSeverity.ERROR,
            UpdateSeverity.CRITICAL,
        ][i % 4]
        msg = f"message {i}"
        messages.append(msg)
        wd.on_status_update(StatusUpdate(sev, msg))  # 59.3μs -> 38.0μs (56.2% faster)
    stored_messages = [su.message for su in wd._stream_updates]


# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
from collections import deque
from enum import Enum
from typing import Dict, List, Optional

# imports
import pytest
from inference.core.interfaces.stream.watchdog import BasePipelineWatchDog

# --- Minimal stubs for dependencies ---


# UpdateSeverity Enum stub
class UpdateSeverity(Enum):
    DEBUG = 0
    INFO = 1
    WARNING = 2
    ERROR = 3
    CRITICAL = 4


# StatusUpdate stub
class StatusUpdate:
    def __init__(self, message: str, severity: UpdateSeverity):
        self.message = message
        self.severity = severity


# VideoSource stub
class VideoSource:
    def __init__(self, source_id: int):
        self.source_id = source_id


# LatencyMonitor stub
class LatencyMonitor:
    pass


# FPSMonitor stub
class FPSMonitor:
    pass


# --- The function to test (as provided, with minimal changes to use stubs above) ---

MAX_UPDATES_CONTEXT = 512


class PipelineWatchDog:
    def __init__(self):
        pass


from inference.core.interfaces.stream.watchdog import BasePipelineWatchDog

# --- Unit Tests ---


@pytest.fixture
def watchdog():
    """Fixture to create a fresh BasePipelineWatchDog for each test."""
    return BasePipelineWatchDog()


# 1. Basic Test Cases


def test_info_update_is_appended(watchdog):
    """Test that an INFO update is appended to the stream updates."""
    update = StatusUpdate("Info message", UpdateSeverity.INFO)
    watchdog.on_status_update(update)  # 2.79μs -> 1.93μs (44.5% faster)


def test_warning_update_is_appended(watchdog):
    """Test that a WARNING update is appended to the stream updates."""
    update = StatusUpdate("Warning message", UpdateSeverity.WARNING)
    watchdog.on_status_update(update)  # 1.92μs -> 1.33μs (44.4% faster)


def test_error_update_is_appended(watchdog):
    """Test that an ERROR update is appended to the stream updates."""
    update = StatusUpdate("Error message", UpdateSeverity.ERROR)
    watchdog.on_status_update(update)  # 1.88μs -> 1.34μs (40.6% faster)


def test_critical_update_is_appended(watchdog):
    """Test that a CRITICAL update is appended to the stream updates."""
    update = StatusUpdate("Critical message", UpdateSeverity.CRITICAL)
    watchdog.on_status_update(update)  # 1.79μs -> 1.28μs (39.7% faster)


def test_debug_update_is_ignored(watchdog):
    """Test that a DEBUG update is ignored and not appended."""
    update = StatusUpdate("Debug message", UpdateSeverity.DEBUG)
    watchdog.on_status_update(update)  # 1.79μs -> 1.27μs (41.0% faster)


# 2. Edge Test Cases


def test_multiple_debug_updates_are_ignored(watchdog):
    """Test that multiple DEBUG updates are all ignored."""
    for i in range(5):
        watchdog.on_status_update(
            StatusUpdate(f"Debug {i}", UpdateSeverity.DEBUG)
        )  # 4.13μs -> 3.03μs (36.3% faster)


def test_mixed_severity_updates(watchdog):
    """Test that only updates with severity above DEBUG are appended."""
    updates = [
        StatusUpdate("Debug", UpdateSeverity.DEBUG),
        StatusUpdate("Info", UpdateSeverity.INFO),
        StatusUpdate("Warning", UpdateSeverity.WARNING),
        StatusUpdate("Error", UpdateSeverity.ERROR),
        StatusUpdate("Critical", UpdateSeverity.CRITICAL),
        StatusUpdate("Debug2", UpdateSeverity.DEBUG),
    ]
    for update in updates:
        watchdog.on_status_update(update)  # 4.87μs -> 3.29μs (48.2% faster)
    messages = [u.message for u in watchdog._stream_updates]


def test_update_severity_exactly_debug(watchdog):
    """Test that severity exactly DEBUG is ignored."""
    update = StatusUpdate("Exactly debug", UpdateSeverity.DEBUG)
    watchdog.on_status_update(update)  # 1.69μs -> 1.08μs (56.3% faster)


def test_update_severity_just_above_debug(watchdog):
    """Test that severity just above DEBUG (INFO) is appended."""
    update = StatusUpdate("Just above debug", UpdateSeverity.INFO)
    watchdog.on_status_update(update)  # 1.60μs -> 1.16μs (38.8% faster)


def test_none_message_update(watchdog):
    """Test that an update with None as message is accepted."""
    update = StatusUpdate(None, UpdateSeverity.INFO)
    watchdog.on_status_update(update)  # 1.88μs -> 1.25μs (50.4% faster)


def test_empty_message_update(watchdog):
    """Test that an update with empty string message is accepted."""
    update = StatusUpdate("", UpdateSeverity.INFO)
    watchdog.on_status_update(update)  # 1.54μs -> 1.14μs (35.7% faster)


def test_update_with_large_message(watchdog):
    """Test that an update with a very large message is accepted and appended."""
    large_message = "A" * 10000  # 10,000 characters
    update = StatusUpdate(large_message, UpdateSeverity.INFO)
    watchdog.on_status_update(update)  # 1.55μs -> 1.25μs (24.3% faster)


# 3. Large Scale Test Cases


def test_maxlen_context_is_respected(watchdog):
    """Test that only the last MAX_UPDATES_CONTEXT updates are kept."""
    # Add MAX_UPDATES_CONTEXT + 10 updates
    for i in range(MAX_UPDATES_CONTEXT + 10):
        update = StatusUpdate(f"Update {i}", UpdateSeverity.INFO)
        watchdog.on_status_update(update)  # 293μs -> 190μs (54.3% faster)


def test_large_number_of_mixed_updates(watchdog):
    """Test with a large number of mixed severity updates."""
    # Add 1000 updates, alternating severity
    for i in range(1000):
        severity = UpdateSeverity.INFO if i % 2 == 0 else UpdateSeverity.DEBUG
        update = StatusUpdate(f"Update {i}", severity)
        watchdog.on_status_update(update)  # 567μs -> 372μs (52.6% faster)
    # Only INFO updates should be present, and only the last MAX_UPDATES_CONTEXT of those
    expected_count = min(MAX_UPDATES_CONTEXT, 1000 // 2)
    # All messages should be from even indices, and only last MAX_UPDATES_CONTEXT
    messages = [u.message for u in watchdog._stream_updates]
    expected_messages = [f"Update {i}" for i in range(0, 1000, 2)][
        -MAX_UPDATES_CONTEXT:
    ]


def test_stream_updates_is_deque_and_maxlen(watchdog):
    """Test that _stream_updates is a deque with correct maxlen."""


def test_performance_under_large_input(watchdog):
    """Performance test: Ensure appending 512 updates is efficient and correct."""
    import time

    updates = [
        StatusUpdate(f"Perf {i}", UpdateSeverity.INFO)
        for i in range(MAX_UPDATES_CONTEXT)
    ]
    start = time.time()
    for update in updates:
        watchdog.on_status_update(update)  # 291μs -> 188μs (54.2% faster)
    duration = time.time() - start


# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

To edit these changes git checkout codeflash/optimize-BasePipelineWatchDog.on_status_update-miqrpekj and push.

Codeflash Static Badge

The optimization improves performance by **caching a frequently accessed enum value** and **reducing attribute lookups** in a hot path method.

**Key optimizations:**

1. **Pre-cache the DEBUG severity value**: During initialization, `UpdateSeverity.DEBUG.value` is stored in `self._debug_severity_value` to avoid repeated enum attribute lookups.

2. **Extract severity value once**: In `on_status_update`, `status_update.severity.value` is retrieved once into a local variable instead of being accessed twice in the original comparison.

**Why this speeds up the code:**

- **Eliminates repeated enum lookups**: The original code performed `UpdateSeverity.DEBUG.value` on every call (8,989 hits according to the profiler). Enum attribute access involves dictionary lookups and is relatively expensive in Python.
- **Reduces attribute chain traversals**: `status_update.severity.value` went from being accessed twice to once per call, cutting attribute access overhead.
- **Improves CPU cache locality**: Local variables are faster to access than instance attributes or enum values.

**Performance impact analysis:**

The line profiler shows the comparison line dropped from 91.5% to 78.7% of total execution time, with per-hit time improving from 3,754ns to 2,527ns - a 33% improvement on the hottest line. This translated to an overall 55% speedup (2.60ms → 1.68ms).

**Test case performance:**

The optimization is particularly effective for:
- **High-frequency calls** with DEBUG updates (50-90% faster in many test cases)
- **Bulk operations** processing many status updates (50-60% faster for large-scale tests)
- **Mixed severity scenarios** where the comparison happens frequently

This optimization would be especially valuable if `on_status_update` is called frequently in video processing pipelines or real-time monitoring systems where status updates are generated at high rates.
@codeflash-ai codeflash-ai bot requested a review from mashraf-222 December 4, 2025 01:39
@codeflash-ai codeflash-ai bot added ⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash labels Dec 4, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant