Skip to content

Conversation

@codeflash-ai
Copy link

@codeflash-ai codeflash-ai bot commented Dec 4, 2025

📄 85% (0.85x) speedup for KalmanFilterXYWH.predict in ultralytics/trackers/utils/kalman_filter.py

⏱️ Runtime : 16.0 milliseconds 8.60 milliseconds (best of 194 runs)

📝 Explanation and details

The optimized code achieves an 85% speedup by eliminating inefficient NumPy operations and Python overhead. The key optimizations are:

What was optimized:

  1. Eliminated np.r_ concatenation: The original code used np.r_[std_pos, std_vel] which is inefficient for small arrays. The optimized version pre-computes values and uses np.concatenate() directly.

  2. Reduced repeated multiplications: Instead of computing self._std_weight_position * mean[2] four times, the optimized code caches w = mean[2] and h = mean[3] to avoid redundant array indexing.

  3. Replaced np.square() with direct multiplication: Changed np.square(np.r_[...]) to stds * stds, which is faster for element-wise squaring.

  4. Used @ operator over np.linalg.multi_dot: For the 3-matrix multiplication, A @ B @ C is more optimized than np.linalg.multi_dot((A, B, C)) in modern NumPy.

Why it's faster:
The line profiler shows the original np.diag(np.square(np.r_[std_pos, std_vel])) took 59.7% of total runtime, while the optimized version's equivalent operations only take 28.9%. This is because np.r_ creates intermediate objects and performs type checking, while direct concatenation is more efficient.

Impact on workloads:
The optimizations are most effective for:

  • Frequent tracking scenarios: Test cases show 52-99% speedups across different input patterns
  • Batch processing: Large-scale tests (500+ tracks) show consistent 85% improvements
  • Real-time applications: The function appears to be in object tracking pipelines where this speedup significantly reduces latency

The optimizations maintain identical numerical results while being particularly beneficial for high-frequency prediction calls typical in video tracking systems.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 1461 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 100.0%
🌀 Generated Regression Tests and Runtime
import numpy as np

# imports
import pytest  # used for our unit tests
from ultralytics.trackers.utils.kalman_filter import KalmanFilterXYWH


# Minimal parent class stub for KalmanFilterXYWH
class KalmanFilterXYAH:
    def __init__(self):
        # 8x8 motion matrix for constant velocity model
        self._motion_mat = np.eye(8)
        for i in range(4):
            self._motion_mat[i, i + 4] = 1.0  # position += velocity
        self._update_mat = np.eye(8)[:4]
        self._std_weight_position = 1.0 / 20
        self._std_weight_velocity = 1.0 / 160


# ------------------ UNIT TESTS ------------------

# ----------- Basic Test Cases -----------


def test_predict_identity_mean_and_covariance():
    """Test predict with mean at origin and identity covariance."""
    kf = KalmanFilterXYWH()
    mean = np.array([0, 0, 1, 1, 0, 0, 0, 0], dtype=float)
    covariance = np.eye(8)
    predicted_mean, predicted_cov = kf.predict(mean, covariance)  # 36.6μs -> 23.9μs (52.7% faster)


def test_predict_with_nonzero_velocity():
    """Test predict with non-zero velocity."""
    kf = KalmanFilterXYWH()
    mean = np.array([10, 20, 5, 8, 1, -2, 0.5, -1], dtype=float)
    covariance = np.eye(8)
    predicted_mean, predicted_cov = kf.predict(mean, covariance)  # 30.8μs -> 18.6μs (66.1% faster)
    # Position should be updated by velocity
    expected_mean = mean.copy()
    expected_mean[:4] += mean[4:]


def test_predict_with_zero_covariance():
    """Test predict with zero covariance matrix."""
    kf = KalmanFilterXYWH()
    mean = np.array([5, 7, 2, 3, 0, 0, 0, 0], dtype=float)
    covariance = np.zeros((8, 8))
    predicted_mean, predicted_cov = kf.predict(mean, covariance)  # 29.5μs -> 17.0μs (73.2% faster)
    # Covariance should be exactly the motion noise
    std_pos = [
        kf._std_weight_position * mean[2],
        kf._std_weight_position * mean[3],
        kf._std_weight_position * mean[2],
        kf._std_weight_position * mean[3],
    ]
    std_vel = [
        kf._std_weight_velocity * mean[2],
        kf._std_weight_velocity * mean[3],
        kf._std_weight_velocity * mean[2],
        kf._std_weight_velocity * mean[3],
    ]
    expected_cov = np.diag(np.square(np.r_[std_pos, std_vel]))


# ----------- Edge Test Cases -----------


def test_predict_with_negative_dimensions():
    """Test predict with negative width/height (should still compute, but noise is based on abs value)."""
    kf = KalmanFilterXYWH()
    mean = np.array([0, 0, -5, -10, 0, 0, 0, 0], dtype=float)
    covariance = np.eye(8)
    predicted_mean, predicted_cov = kf.predict(mean, covariance)  # 28.2μs -> 17.2μs (64.3% faster)


def test_predict_with_large_values():
    """Test predict with very large values to check for numerical stability."""
    kf = KalmanFilterXYWH()
    large = 1e6
    mean = np.array([large, large, large, large, large, large, large, large], dtype=float)
    covariance = np.eye(8) * large
    predicted_mean, predicted_cov = kf.predict(mean, covariance)  # 27.7μs -> 15.4μs (80.3% faster)


def test_predict_with_small_values():
    """Test predict with very small values to check for underflow."""
    kf = KalmanFilterXYWH()
    small = 1e-8
    mean = np.array([small, small, small, small, small, small, small, small], dtype=float)
    covariance = np.eye(8) * small
    predicted_mean, predicted_cov = kf.predict(mean, covariance)  # 27.5μs -> 15.3μs (79.7% faster)


def test_predict_with_non_square_covariance():
    """Test predict raises error with non-square covariance matrix."""
    kf = KalmanFilterXYWH()
    mean = np.zeros(8)
    covariance = np.ones((8, 7))  # Not square
    with pytest.raises(ValueError):
        # Should raise ValueError from np.linalg.multi_dot or np.dot
        kf.predict(mean, covariance)  # 30.9μs -> 17.5μs (77.2% faster)


def test_predict_with_wrong_mean_shape():
    """Test predict raises error with wrong mean shape."""
    kf = KalmanFilterXYWH()
    mean = np.zeros(7)  # Should be length 8
    covariance = np.eye(8)
    with pytest.raises(IndexError):
        kf.predict(mean, covariance)


def test_predict_with_wrong_covariance_shape():
    """Test predict raises error with wrong covariance shape."""
    kf = KalmanFilterXYWH()
    mean = np.zeros(8)
    covariance = np.eye(7)  # Should be 8x8
    with pytest.raises(ValueError):
        kf.predict(mean, covariance)  # 44.8μs -> 27.3μs (64.0% faster)


def test_predict_with_inf_and_nan_values():
    """Test predict with inf and nan values in mean and covariance."""
    kf = KalmanFilterXYWH()
    mean = np.array([np.nan, np.inf, 1, 1, 0, 0, 0, 0], dtype=float)
    covariance = np.eye(8)
    predicted_mean, predicted_cov = kf.predict(mean, covariance)  # 36.5μs -> 33.1μs (10.5% faster)


# ----------- Large Scale Test Cases -----------


def test_predict_many_tracks():
    """Test predict on a large batch of tracks to check scalability."""
    kf = KalmanFilterXYWH()
    n = 500  # Large but not excessive
    means = np.random.rand(n, 8)
    covariances = np.array([np.eye(8) for _ in range(n)])
    # Run predict for each track and check output shapes and basic properties
    for i in range(n):
        predicted_mean, predicted_cov = kf.predict(means[i], covariances[i])  # 5.36ms -> 2.89ms (85.4% faster)


def test_predict_stress_randomized_inputs():
    """Test predict with randomized means and covariances for stress testing."""
    kf = KalmanFilterXYWH()
    n = 100  # Reasonable batch size
    for _ in range(n):
        mean = np.random.randn(8) * np.random.uniform(0.1, 100)
        cov = np.eye(8) * np.random.uniform(0.1, 100)
        predicted_mean, predicted_cov = kf.predict(mean, cov)  # 1.19ms -> 596μs (99.2% faster)


# ----------- Determinism Test -----------


def test_predict_determinism():
    """Test that predict is deterministic for the same input."""
    kf = KalmanFilterXYWH()
    mean = np.array([1, 2, 3, 4, 5, 6, 7, 8], dtype=float)
    covariance = np.eye(8) * 2
    out1_mean, out1_cov = kf.predict(mean, covariance)  # 28.1μs -> 15.8μs (77.8% faster)
    out2_mean, out2_cov = kf.predict(mean, covariance)  # 15.6μs -> 7.32μs (113% faster)


# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
import numpy as np

# imports
import pytest  # used for our unit tests
from ultralytics.trackers.utils.kalman_filter import KalmanFilterXYWH


# Minimal stub for KalmanFilterXYAH (as KalmanFilterXYWH inherits from it)
class KalmanFilterXYAH:
    def __init__(self):
        # 8x8 motion matrix for constant velocity model
        self._motion_mat = np.eye(8)
        for i in range(4):
            self._motion_mat[i, i + 4] = 1  # position += velocity
        self._update_mat = np.eye(8)[:4]
        self._std_weight_position = 1.0 / 20
        self._std_weight_velocity = 1.0 / 160


# ------------------ UNIT TESTS ------------------

# ----------- BASIC TEST CASES -----------


def test_predict_basic_identity():
    """Test basic prediction with identity covariance and zero velocities."""
    kf = KalmanFilterXYWH()
    mean = np.array([10, 20, 30, 40, 0, 0, 0, 0], dtype=float)
    covariance = np.eye(8)
    pred_mean, pred_cov = kf.predict(mean, covariance)  # 26.8μs -> 15.1μs (77.1% faster)


def test_predict_basic_nonzero_velocity():
    """Test prediction with nonzero velocities."""
    kf = KalmanFilterXYWH()
    mean = np.array([10, 20, 30, 40, 1, -2, 3, -4], dtype=float)
    covariance = np.eye(8)
    pred_mean, pred_cov = kf.predict(mean, covariance)  # 26.7μs -> 15.9μs (67.5% faster)
    # Positions should be updated by velocity
    expected_pos = mean[:4] + mean[4:8]


def test_predict_basic_covariance_shape():
    """Test that output covariance has correct shape."""
    kf = KalmanFilterXYWH()
    mean = np.zeros(8)
    covariance = np.eye(8)
    pred_mean, pred_cov = kf.predict(mean, covariance)  # 27.7μs -> 16.6μs (67.3% faster)


# ----------- EDGE TEST CASES -----------


def test_predict_edge_zero_width_height():
    """Test prediction when width and height are zero (could cause zero std)."""
    kf = KalmanFilterXYWH()
    mean = np.array([0, 0, 0, 0, 0, 0, 0, 0], dtype=float)
    covariance = np.eye(8)
    pred_mean, pred_cov = kf.predict(mean, covariance)  # 26.7μs -> 16.5μs (62.1% faster)


def test_predict_edge_negative_width_height():
    """Test prediction with negative width/height (should be handled gracefully)."""
    kf = KalmanFilterXYWH()
    mean = np.array([5, 5, -10, -20, 0, 0, 0, 0], dtype=float)
    covariance = np.eye(8)
    pred_mean, pred_cov = kf.predict(mean, covariance)  # 27.3μs -> 16.1μs (69.0% faster)


def test_predict_edge_large_values():
    """Test prediction with very large values in mean and covariance."""
    kf = KalmanFilterXYWH()
    mean = np.array([1e6, -1e6, 1e5, -1e5, 1e3, -1e3, 1e2, -1e2], dtype=float)
    covariance = np.eye(8) * 1e8
    pred_mean, pred_cov = kf.predict(mean, covariance)  # 27.5μs -> 15.4μs (78.9% faster)


def test_predict_edge_small_values():
    """Test prediction with very small values in mean and covariance."""
    kf = KalmanFilterXYWH()
    mean = np.array([1e-8, -1e-8, 1e-9, -1e-9, 1e-10, -1e-10, 1e-11, -1e-11], dtype=float)
    covariance = np.eye(8) * 1e-12
    pred_mean, pred_cov = kf.predict(mean, covariance)  # 26.9μs -> 15.6μs (72.5% faster)


def test_predict_edge_non_identity_covariance():
    """Test prediction with a non-identity, non-diagonal covariance matrix."""
    kf = KalmanFilterXYWH()
    mean = np.array([10, 20, 30, 40, 1, 2, 3, 4], dtype=float)
    covariance = np.eye(8) * 2
    covariance[0, 1] = covariance[1, 0] = 0.5
    covariance[2, 3] = covariance[3, 2] = -0.5
    pred_mean, pred_cov = kf.predict(mean, covariance)  # 27.7μs -> 15.4μs (79.4% faster)


def test_predict_edge_singular_covariance():
    """Test prediction with a singular covariance matrix."""
    kf = KalmanFilterXYWH()
    mean = np.array([1, 2, 3, 4, 5, 6, 7, 8], dtype=float)
    covariance = np.zeros((8, 8))
    pred_mean, pred_cov = kf.predict(mean, covariance)  # 28.5μs -> 16.7μs (71.3% faster)


def test_predict_edge_non_float_input():
    """Test prediction with integer arrays (should be cast to float internally)."""
    kf = KalmanFilterXYWH()
    mean = np.array([1, 2, 3, 4, 5, 6, 7, 8], dtype=int)
    covariance = np.eye(8, dtype=int)
    pred_mean, pred_cov = kf.predict(mean, covariance)  # 41.2μs -> 25.3μs (62.9% faster)


def test_predict_edge_nan_inf_input():
    """Test prediction with NaN/Inf values in mean/covariance."""
    kf = KalmanFilterXYWH()
    mean = np.array([1, 2, np.nan, 4, 5, 6, 7, 8], dtype=float)
    covariance = np.eye(8)
    with pytest.raises(ValueError):
        # Should raise due to NaN in mean
        kf.predict(mean, covariance)
    mean = np.array([1, 2, 3, 4, 5, 6, 7, 8], dtype=float)
    covariance[0, 0] = np.inf
    with pytest.raises(ValueError):
        # Should raise due to Inf in covariance
        kf.predict(mean, covariance)


# ----------- LARGE SCALE TEST CASES -----------


def test_predict_large_scale_batch():
    """Test predict on a large batch of means/covariances."""
    kf = KalmanFilterXYWH()
    batch_size = 500  # Large but <1000
    means = np.tile(np.array([10, 20, 30, 40, 1, 2, 3, 4], dtype=float), (batch_size, 1))
    covariances = np.tile(np.eye(8), (batch_size, 1, 1))
    # Run predict on each batch element
    for i in range(batch_size):
        pred_mean, pred_cov = kf.predict(means[i], covariances[i])  # 5.36ms -> 2.89ms (85.2% faster)
        # Positions should be updated by velocity
        expected_pos = means[i][:4] + means[i][4:8]


def test_predict_large_scale_extreme_values():
    """Test predict with a large batch of extreme values."""
    kf = KalmanFilterXYWH()
    batch_size = 100
    # Use alternating large/small values
    means = np.zeros((batch_size, 8))
    covariances = np.zeros((batch_size, 8, 8))
    for i in range(batch_size):
        means[i] = np.array([i * 1e5, -i * 1e5, i * 1e4, -i * 1e4, i * 1e3, -i * 1e3, i * 1e2, -i * 1e2], dtype=float)
        covariances[i] = np.eye(8) * (i + 1)
    for i in range(batch_size):
        pred_mean, pred_cov = kf.predict(means[i], covariances[i])  # 1.11ms -> 595μs (86.5% faster)


def test_predict_large_scale_randomized():
    """Test predict with randomized means and covariances."""
    kf = KalmanFilterXYWH()
    rng = np.random.default_rng(42)
    batch_size = 200
    means = rng.normal(0, 100, (batch_size, 8))
    covariances = np.array([np.eye(8) * rng.uniform(1, 10) for _ in range(batch_size)])
    for i in range(batch_size):
        pred_mean, pred_cov = kf.predict(means[i], covariances[i])  # 2.17ms -> 1.17ms (85.5% faster)


# ----------- FUNCTIONALITY & INVARIANTS -----------


def test_predict_invariant_velocity():
    """Velocity should not change after prediction."""
    kf = KalmanFilterXYWH()
    mean = np.array([10, 20, 30, 40, 5, 6, 7, 8], dtype=float)
    covariance = np.eye(8)
    pred_mean, _ = kf.predict(mean, covariance)  # 29.6μs -> 18.1μs (63.3% faster)


def test_predict_invariant_position_with_zero_velocity():
    """Position should not change if velocity is zero."""
    kf = KalmanFilterXYWH()
    mean = np.array([100, 200, 50, 60, 0, 0, 0, 0], dtype=float)
    covariance = np.eye(8)
    pred_mean, _ = kf.predict(mean, covariance)  # 27.3μs -> 15.6μs (75.7% faster)


def test_predict_motion_covariance_effect():
    """Test that motion covariance increases with larger width/height."""
    kf = KalmanFilterXYWH()
    mean1 = np.array([0, 0, 10, 10, 0, 0, 0, 0], dtype=float)
    mean2 = np.array([0, 0, 100, 100, 0, 0, 0, 0], dtype=float)
    cov = np.eye(8)
    _, cov1 = kf.predict(mean1, cov)  # 28.3μs -> 16.7μs (69.1% faster)
    _, cov2 = kf.predict(mean2, cov)  # 16.2μs -> 7.81μs (107% faster)


# ----------- ERROR HANDLING -----------


def test_predict_error_wrong_shape_mean():
    """Test error is raised if mean shape is wrong."""
    kf = KalmanFilterXYWH()
    mean = np.array([1, 2, 3, 4, 5, 6, 7], dtype=float)  # Only 7 elements
    covariance = np.eye(8)
    with pytest.raises(ValueError):
        kf.predict(mean, covariance)  # 24.1μs -> 12.8μs (88.2% faster)


def test_predict_error_wrong_shape_covariance():
    """Test error is raised if covariance shape is wrong."""
    kf = KalmanFilterXYWH()
    mean = np.zeros(8)
    covariance = np.eye(7)  # 7x7 instead of 8x8
    with pytest.raises(ValueError):
        kf.predict(mean, covariance)  # 30.1μs -> 16.1μs (87.3% faster)


# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

To edit these changes git checkout codeflash/optimize-KalmanFilterXYWH.predict-mir99zml and push.

Codeflash Static Badge

The optimized code achieves an **85% speedup** by eliminating inefficient NumPy operations and Python overhead. The key optimizations are:

**What was optimized:**
1. **Eliminated `np.r_` concatenation**: The original code used `np.r_[std_pos, std_vel]` which is inefficient for small arrays. The optimized version pre-computes values and uses `np.concatenate()` directly.

2. **Reduced repeated multiplications**: Instead of computing `self._std_weight_position * mean[2]` four times, the optimized code caches `w = mean[2]` and `h = mean[3]` to avoid redundant array indexing.

3. **Replaced `np.square()` with direct multiplication**: Changed `np.square(np.r_[...])` to `stds * stds`, which is faster for element-wise squaring.

4. **Used `@` operator over `np.linalg.multi_dot`**: For the 3-matrix multiplication, `A @ B @ C` is more optimized than `np.linalg.multi_dot((A, B, C))` in modern NumPy.

**Why it's faster:**
The line profiler shows the original `np.diag(np.square(np.r_[std_pos, std_vel]))` took **59.7% of total runtime**, while the optimized version's equivalent operations only take **28.9%**. This is because `np.r_` creates intermediate objects and performs type checking, while direct concatenation is more efficient.

**Impact on workloads:**
The optimizations are most effective for:
- **Frequent tracking scenarios**: Test cases show 52-99% speedups across different input patterns
- **Batch processing**: Large-scale tests (500+ tracks) show consistent 85% improvements
- **Real-time applications**: The function appears to be in object tracking pipelines where this speedup significantly reduces latency

The optimizations maintain identical numerical results while being particularly beneficial for high-frequency prediction calls typical in video tracking systems.
@codeflash-ai codeflash-ai bot requested a review from mashraf-222 December 4, 2025 09:51
@codeflash-ai codeflash-ai bot added ⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash labels Dec 4, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant