From 7fb9bdfb99c93038029efa305043641748dd30cc Mon Sep 17 00:00:00 2001 From: Jintao Zhang Date: Sat, 29 Nov 2025 23:24:35 +0800 Subject: [PATCH] feat: add time-windowed endpoint metrics for load balancing Signed-off-by: Jintao Zhang --- src/semantic-router/cmd/main.go | 11 + src/semantic-router/pkg/config/config.go | 33 + .../pkg/extproc/processor_req_body.go | 6 + .../pkg/extproc/processor_req_header.go | 3 + .../pkg/extproc/processor_res_body.go | 14 + .../observability/metrics/windowed_metrics.go | 612 ++++++++++++++++++ .../metrics/windowed_metrics_test.go | 504 +++++++++++++++ .../docs/tutorials/observability/metrics.md | 64 +- 8 files changed, 1246 insertions(+), 1 deletion(-) create mode 100644 src/semantic-router/pkg/observability/metrics/windowed_metrics.go create mode 100644 src/semantic-router/pkg/observability/metrics/windowed_metrics_test.go diff --git a/src/semantic-router/cmd/main.go b/src/semantic-router/cmd/main.go index 9a46583bf..9b52a8ea2 100644 --- a/src/semantic-router/cmd/main.go +++ b/src/semantic-router/cmd/main.go @@ -18,6 +18,7 @@ import ( "github.com/vllm-project/semantic-router/src/semantic-router/pkg/extproc" "github.com/vllm-project/semantic-router/src/semantic-router/pkg/k8s" "github.com/vllm-project/semantic-router/src/semantic-router/pkg/observability/logging" + "github.com/vllm-project/semantic-router/src/semantic-router/pkg/observability/metrics" "github.com/vllm-project/semantic-router/src/semantic-router/pkg/observability/tracing" ) @@ -87,6 +88,16 @@ func main() { }() } + // Initialize windowed metrics if enabled + if cfg.Observability.Metrics.WindowedMetrics.Enabled { + logging.Infof("Initializing windowed metrics for load balancing...") + if err := metrics.InitializeWindowedMetrics(cfg.Observability.Metrics.WindowedMetrics); err != nil { + logging.Warnf("Failed to initialize windowed metrics: %v", err) + } else { + logging.Infof("Windowed metrics initialized successfully") + } + } + // Set up signal handling for graceful shutdown sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) diff --git a/src/semantic-router/pkg/config/config.go b/src/semantic-router/pkg/config/config.go index e26b156ae..3fe82fc93 100644 --- a/src/semantic-router/pkg/config/config.go +++ b/src/semantic-router/pkg/config/config.go @@ -262,6 +262,39 @@ type APIConfig struct { type ObservabilityConfig struct { // Tracing configuration for distributed tracing Tracing TracingConfig `yaml:"tracing"` + + // Metrics configuration for enhanced metrics collection + Metrics MetricsConfig `yaml:"metrics"` +} + +// MetricsConfig represents configuration for metrics collection +type MetricsConfig struct { + // Enable windowed metrics collection for load balancing + WindowedMetrics WindowedMetricsConfig `yaml:"windowed_metrics"` +} + +// WindowedMetricsConfig represents configuration for time-windowed metrics +type WindowedMetricsConfig struct { + // Enable windowed metrics collection + Enabled bool `yaml:"enabled"` + + // Time windows to track (in duration format, e.g., "1m", "5m", "15m", "1h", "24h") + // Default: ["1m", "5m", "15m", "1h", "24h"] + TimeWindows []string `yaml:"time_windows,omitempty"` + + // Update interval for windowed metrics computation (e.g., "10s", "30s") + // Default: "10s" + UpdateInterval string `yaml:"update_interval,omitempty"` + + // Enable endpoint-level metrics tracking + EndpointMetrics bool `yaml:"endpoint_metrics"` + + // Enable queue depth estimation + QueueDepthEstimation bool `yaml:"queue_depth_estimation"` + + // Maximum number of endpoints to track (to prevent cardinality explosion) + // Default: 100 + MaxEndpoints int `yaml:"max_endpoints,omitempty"` } // TracingConfig represents configuration for distributed tracing diff --git a/src/semantic-router/pkg/extproc/processor_req_body.go b/src/semantic-router/pkg/extproc/processor_req_body.go index 14c1160a4..7f488f249 100644 --- a/src/semantic-router/pkg/extproc/processor_req_body.go +++ b/src/semantic-router/pkg/extproc/processor_req_body.go @@ -229,6 +229,12 @@ func (r *OpenAIRouter) selectEndpointForModel(ctx *RequestContext, model string) backendSpan.End() ctx.TraceContext = backendCtx + // Store the selected endpoint in context for windowed metrics tracking + ctx.SelectedEndpoint = endpointAddress + + // Increment active request count for queue depth estimation + metrics.IncrementEndpointActiveRequests(endpointAddress, model) + return endpointAddress } diff --git a/src/semantic-router/pkg/extproc/processor_req_header.go b/src/semantic-router/pkg/extproc/processor_req_header.go index e0fce4ac7..be2208764 100644 --- a/src/semantic-router/pkg/extproc/processor_req_header.go +++ b/src/semantic-router/pkg/extproc/processor_req_header.go @@ -42,6 +42,9 @@ type RequestContext struct { VSRInjectedSystemPrompt bool // Whether a system prompt was injected into the request VSRSelectedDecision *config.Decision // The decision object selected by DecisionEngine (for plugins) + // Endpoint tracking for windowed metrics + SelectedEndpoint string // The endpoint address selected for this request + // Tracing context TraceContext context.Context // OpenTelemetry trace context for span propagation } diff --git a/src/semantic-router/pkg/extproc/processor_res_body.go b/src/semantic-router/pkg/extproc/processor_res_body.go index a5d6c25e7..301b675c7 100644 --- a/src/semantic-router/pkg/extproc/processor_res_body.go +++ b/src/semantic-router/pkg/extproc/processor_res_body.go @@ -15,6 +15,9 @@ import ( func (r *OpenAIRouter) handleResponseBody(v *ext_proc.ProcessingRequest_ResponseBody, ctx *RequestContext) (*ext_proc.ProcessingResponse, error) { completionLatency := time.Since(ctx.StartTime) + // Decrement active request count for queue depth estimation + defer metrics.DecrementEndpointActiveRequests(ctx.SelectedEndpoint, ctx.RequestModel) + // Process the response for caching responseBody := v.ResponseBody.Body @@ -68,6 +71,17 @@ func (r *OpenAIRouter) handleResponseBody(v *ext_proc.ProcessingRequest_Response metrics.RecordModelTPOT(ctx.RequestModel, timePerToken) } + // Record windowed endpoint metrics for load balancing + metrics.RecordEndpointRequest( + ctx.SelectedEndpoint, + ctx.RequestModel, + completionLatency.Seconds(), + int64(promptTokens), + int64(completionTokens), + false, // isError + false, // isTimeout + ) + // Compute and record cost if pricing is configured if r.Config != nil { promptRatePer1M, completionRatePer1M, currency, ok := r.Config.GetModelPricing(ctx.RequestModel) diff --git a/src/semantic-router/pkg/observability/metrics/windowed_metrics.go b/src/semantic-router/pkg/observability/metrics/windowed_metrics.go new file mode 100644 index 000000000..3b04e8d1b --- /dev/null +++ b/src/semantic-router/pkg/observability/metrics/windowed_metrics.go @@ -0,0 +1,612 @@ +package metrics + +import ( + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/vllm-project/semantic-router/src/semantic-router/pkg/config" + "github.com/vllm-project/semantic-router/src/semantic-router/pkg/consts" +) + +// Default time windows for windowed metrics +var DefaultTimeWindows = []string{"1m", "5m", "15m", "1h", "24h"} + +// Default update interval for computing windowed metrics +const DefaultUpdateInterval = 10 * time.Second + +// Default maximum endpoints to track +const DefaultMaxEndpoints = 100 + +// WindowedMetricsManager manages time-windowed metrics for load balancing +type WindowedMetricsManager struct { + config config.WindowedMetricsConfig + timeWindows []time.Duration + windowLabels []string + updateInterval time.Duration + maxEndpoints int + + // Ring buffers for storing request data per endpoint/model + requestBuffers map[string]*RequestRingBuffer + bufferMutex sync.RWMutex + + // Active request tracking for queue depth estimation + activeRequests map[string]int64 + activeMutex sync.RWMutex + + // Stop channel for background goroutine + stopChan chan struct{} + running bool +} + +// RequestData represents a single request's data for windowed tracking +type RequestData struct { + Timestamp time.Time + Endpoint string + Model string + LatencySeconds float64 + PromptTokens int64 + CompletionTokens int64 + IsError bool + IsTimeout bool +} + +// RequestRingBuffer is a time-based ring buffer for storing request data +type RequestRingBuffer struct { + data []RequestData + head int + size int + capacity int + mutex sync.RWMutex +} + +// NewRequestRingBuffer creates a new ring buffer with the given capacity +func NewRequestRingBuffer(capacity int) *RequestRingBuffer { + return &RequestRingBuffer{ + data: make([]RequestData, capacity), + capacity: capacity, + } +} + +// Add adds a request to the ring buffer +func (rb *RequestRingBuffer) Add(req RequestData) { + rb.mutex.Lock() + defer rb.mutex.Unlock() + + rb.data[rb.head] = req + rb.head = (rb.head + 1) % rb.capacity + if rb.size < rb.capacity { + rb.size++ + } +} + +// GetDataSince returns all request data since the given time +func (rb *RequestRingBuffer) GetDataSince(since time.Time) []RequestData { + rb.mutex.RLock() + defer rb.mutex.RUnlock() + + result := make([]RequestData, 0, rb.size) + for i := 0; i < rb.size; i++ { + idx := (rb.head - rb.size + i + rb.capacity) % rb.capacity + if !rb.data[idx].Timestamp.Before(since) { + result = append(result, rb.data[idx]) + } + } + return result +} + +// Prometheus metrics for windowed endpoint tracking +var ( + // EndpointLatencyWindowed tracks latency by endpoint, model, and time window + EndpointLatencyWindowed *prometheus.GaugeVec + + // EndpointRequestsWindowed tracks request counts by endpoint, model, and time window + EndpointRequestsWindowed *prometheus.GaugeVec + + // EndpointTokensWindowed tracks token throughput by endpoint, model, token type, and time window + EndpointTokensWindowed *prometheus.GaugeVec + + // EndpointUtilization tracks utilization percentage by endpoint and time window + EndpointUtilization *prometheus.GaugeVec + + // EndpointQueueDepth tracks estimated queue depth by endpoint and model + EndpointQueueDepth *prometheus.GaugeVec + + // EndpointErrorRate tracks error rate by endpoint, model, and time window + EndpointErrorRate *prometheus.GaugeVec + + // EndpointLatencyP50 tracks P50 latency by endpoint, model, and time window + EndpointLatencyP50 *prometheus.GaugeVec + + // EndpointLatencyP95 tracks P95 latency by endpoint, model, and time window + EndpointLatencyP95 *prometheus.GaugeVec + + // EndpointLatencyP99 tracks P99 latency by endpoint, model, and time window + EndpointLatencyP99 *prometheus.GaugeVec + + windowedMetricsInitOnce sync.Once +) + +// Global instance of WindowedMetricsManager +var ( + globalWindowedManager *WindowedMetricsManager + globalWindowedManagerMutex sync.RWMutex +) + +// InitializeWindowedMetrics initializes the windowed metrics system +func InitializeWindowedMetrics(cfg config.WindowedMetricsConfig) error { + windowedMetricsInitOnce.Do(func() { + // Initialize Prometheus metrics + EndpointLatencyWindowed = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "llm_endpoint_latency_windowed_seconds", + Help: "Average latency by endpoint, model, and time window", + }, + []string{"endpoint", "model", "time_window"}, + ) + + EndpointRequestsWindowed = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "llm_endpoint_requests_windowed_total", + Help: "Total requests by endpoint, model, and time window", + }, + []string{"endpoint", "model", "time_window"}, + ) + + EndpointTokensWindowed = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "llm_endpoint_tokens_windowed_total", + Help: "Total tokens by endpoint, model, token type, and time window", + }, + []string{"endpoint", "model", "token_type", "time_window"}, + ) + + EndpointUtilization = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "llm_endpoint_utilization_percentage", + Help: "Estimated utilization percentage by endpoint and time window", + }, + []string{"endpoint", "time_window"}, + ) + + EndpointQueueDepth = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "llm_endpoint_queue_depth_estimated", + Help: "Estimated queue depth by endpoint and model", + }, + []string{"endpoint", "model"}, + ) + + EndpointErrorRate = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "llm_endpoint_error_rate_windowed", + Help: "Error rate by endpoint, model, and time window", + }, + []string{"endpoint", "model", "time_window"}, + ) + + EndpointLatencyP50 = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "llm_endpoint_latency_p50_windowed_seconds", + Help: "P50 latency by endpoint, model, and time window", + }, + []string{"endpoint", "model", "time_window"}, + ) + + EndpointLatencyP95 = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "llm_endpoint_latency_p95_windowed_seconds", + Help: "P95 latency by endpoint, model, and time window", + }, + []string{"endpoint", "model", "time_window"}, + ) + + EndpointLatencyP99 = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "llm_endpoint_latency_p99_windowed_seconds", + Help: "P99 latency by endpoint, model, and time window", + }, + []string{"endpoint", "model", "time_window"}, + ) + }) + + // Create and start the manager + manager, err := NewWindowedMetricsManager(cfg) + if err != nil { + return err + } + + globalWindowedManagerMutex.Lock() + globalWindowedManager = manager + globalWindowedManagerMutex.Unlock() + + manager.Start() + return nil +} + +// NewWindowedMetricsManager creates a new WindowedMetricsManager +func NewWindowedMetricsManager(cfg config.WindowedMetricsConfig) (*WindowedMetricsManager, error) { + // Parse time windows + windowStrings := cfg.TimeWindows + if len(windowStrings) == 0 { + windowStrings = DefaultTimeWindows + } + + timeWindows := make([]time.Duration, 0, len(windowStrings)) + windowLabels := make([]string, 0, len(windowStrings)) + for _, ws := range windowStrings { + d, parseErr := time.ParseDuration(ws) + if parseErr != nil { + // Skip invalid durations + continue + } + timeWindows = append(timeWindows, d) + windowLabels = append(windowLabels, ws) + } + + // Parse update interval + updateInterval := DefaultUpdateInterval + if cfg.UpdateInterval != "" { + if d, parseErr := time.ParseDuration(cfg.UpdateInterval); parseErr == nil { + updateInterval = d + } + } + + // Set max endpoints + maxEndpoints := cfg.MaxEndpoints + if maxEndpoints <= 0 { + maxEndpoints = DefaultMaxEndpoints + } + + return &WindowedMetricsManager{ + config: cfg, + timeWindows: timeWindows, + windowLabels: windowLabels, + updateInterval: updateInterval, + maxEndpoints: maxEndpoints, + requestBuffers: make(map[string]*RequestRingBuffer), + activeRequests: make(map[string]int64), + stopChan: make(chan struct{}), + }, nil +} + +// Start begins the background metrics computation goroutine +func (m *WindowedMetricsManager) Start() { + if m.running { + return + } + m.running = true + + go func() { + ticker := time.NewTicker(m.updateInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + m.computeWindowedMetrics() + case <-m.stopChan: + return + } + } + }() +} + +// Stop stops the background metrics computation +func (m *WindowedMetricsManager) Stop() { + if !m.running { + return + } + close(m.stopChan) + m.running = false +} + +// RecordRequest records a request for windowed metrics tracking +func (m *WindowedMetricsManager) RecordRequest(req RequestData) { + if !m.config.Enabled { + return + } + + key := m.getBufferKey(req.Endpoint, req.Model) + + m.bufferMutex.Lock() + buffer, exists := m.requestBuffers[key] + if !exists { + // Check if we've hit max endpoints + if len(m.requestBuffers) >= m.maxEndpoints { + m.bufferMutex.Unlock() + return + } + buffer = NewRequestRingBuffer(10000) // Adjust capacity as needed + m.requestBuffers[key] = buffer + } + m.bufferMutex.Unlock() + + buffer.Add(req) +} + +// IncrementActiveRequests increments the active request count for queue depth tracking +func (m *WindowedMetricsManager) IncrementActiveRequests(endpoint, model string) { + if !m.config.QueueDepthEstimation { + return + } + + key := m.getBufferKey(endpoint, model) + + m.activeMutex.Lock() + m.activeRequests[key]++ + count := m.activeRequests[key] + m.activeMutex.Unlock() + + // Update the gauge immediately (only if metrics are initialized) + if EndpointQueueDepth == nil { + return + } + if endpoint == "" { + endpoint = consts.UnknownLabel + } + if model == "" { + model = consts.UnknownLabel + } + EndpointQueueDepth.WithLabelValues(endpoint, model).Set(float64(count)) +} + +// DecrementActiveRequests decrements the active request count +func (m *WindowedMetricsManager) DecrementActiveRequests(endpoint, model string) { + if !m.config.QueueDepthEstimation { + return + } + + key := m.getBufferKey(endpoint, model) + + m.activeMutex.Lock() + m.activeRequests[key]-- + if m.activeRequests[key] < 0 { + m.activeRequests[key] = 0 + } + count := m.activeRequests[key] + m.activeMutex.Unlock() + + // Update the gauge immediately (only if metrics are initialized) + if EndpointQueueDepth == nil { + return + } + if endpoint == "" { + endpoint = consts.UnknownLabel + } + if model == "" { + model = consts.UnknownLabel + } + EndpointQueueDepth.WithLabelValues(endpoint, model).Set(float64(count)) +} + +// computeWindowedMetrics computes all windowed metrics +func (m *WindowedMetricsManager) computeWindowedMetrics() { + now := time.Now() + + m.bufferMutex.RLock() + buffers := make(map[string]*RequestRingBuffer, len(m.requestBuffers)) + for k, v := range m.requestBuffers { + buffers[k] = v + } + m.bufferMutex.RUnlock() + + // Compute metrics for each endpoint/model combination and each time window + for key, buffer := range buffers { + endpoint, model := m.parseBufferKey(key) + if endpoint == "" { + endpoint = consts.UnknownLabel + } + if model == "" { + model = consts.UnknownLabel + } + + for i, window := range m.timeWindows { + windowLabel := m.windowLabels[i] + since := now.Add(-window) + data := buffer.GetDataSince(since) + + if len(data) == 0 { + // Set zero values for empty windows + EndpointRequestsWindowed.WithLabelValues(endpoint, model, windowLabel).Set(0) + EndpointLatencyWindowed.WithLabelValues(endpoint, model, windowLabel).Set(0) + EndpointErrorRate.WithLabelValues(endpoint, model, windowLabel).Set(0) + continue + } + + // Compute metrics + var totalLatency float64 + var totalPromptTokens, totalCompletionTokens int64 + var errorCount int + latencies := make([]float64, 0, len(data)) + + for _, d := range data { + totalLatency += d.LatencySeconds + totalPromptTokens += d.PromptTokens + totalCompletionTokens += d.CompletionTokens + latencies = append(latencies, d.LatencySeconds) + if d.IsError || d.IsTimeout { + errorCount++ + } + } + + requestCount := float64(len(data)) + avgLatency := totalLatency / requestCount + errorRate := float64(errorCount) / requestCount + + // Update Prometheus metrics + EndpointRequestsWindowed.WithLabelValues(endpoint, model, windowLabel).Set(requestCount) + EndpointLatencyWindowed.WithLabelValues(endpoint, model, windowLabel).Set(avgLatency) + EndpointTokensWindowed.WithLabelValues(endpoint, model, "prompt", windowLabel).Set(float64(totalPromptTokens)) + EndpointTokensWindowed.WithLabelValues(endpoint, model, "completion", windowLabel).Set(float64(totalCompletionTokens)) + EndpointErrorRate.WithLabelValues(endpoint, model, windowLabel).Set(errorRate) + + // Compute percentiles + if len(latencies) > 0 { + p50 := computePercentile(latencies, 0.50) + p95 := computePercentile(latencies, 0.95) + p99 := computePercentile(latencies, 0.99) + + EndpointLatencyP50.WithLabelValues(endpoint, model, windowLabel).Set(p50) + EndpointLatencyP95.WithLabelValues(endpoint, model, windowLabel).Set(p95) + EndpointLatencyP99.WithLabelValues(endpoint, model, windowLabel).Set(p99) + } + + // Compute utilization (requests per second / expected capacity) + // This is a simple approximation based on request rate + requestsPerSecond := requestCount / window.Seconds() + // Assume 100 req/s as theoretical max for utilization calculation + // This can be made configurable + utilization := (requestsPerSecond / 100.0) * 100.0 + if utilization > 100.0 { + utilization = 100.0 + } + EndpointUtilization.WithLabelValues(endpoint, windowLabel).Set(utilization) + } + } +} + +// getBufferKey creates a unique key for endpoint/model combination +func (m *WindowedMetricsManager) getBufferKey(endpoint, model string) string { + return endpoint + "|" + model +} + +// parseBufferKey parses endpoint and model from a buffer key +func (m *WindowedMetricsManager) parseBufferKey(key string) (endpoint, model string) { + for i := 0; i < len(key); i++ { + if key[i] == '|' { + return key[:i], key[i+1:] + } + } + return key, "" +} + +// computePercentile computes the given percentile from a slice of values +func computePercentile(values []float64, percentile float64) float64 { + if len(values) == 0 { + return 0 + } + + // Sort the values + sorted := make([]float64, len(values)) + copy(sorted, values) + sortFloat64s(sorted) + + // Calculate the index + index := percentile * float64(len(sorted)-1) + lower := int(index) + upper := lower + 1 + + if upper >= len(sorted) { + return sorted[len(sorted)-1] + } + + // Linear interpolation + weight := index - float64(lower) + return sorted[lower]*(1-weight) + sorted[upper]*weight +} + +// sortFloat64s sorts a slice of float64 in ascending order +func sortFloat64s(a []float64) { + // Simple insertion sort for small slices, quick sort for larger + if len(a) < 12 { + for i := 1; i < len(a); i++ { + for j := i; j > 0 && a[j] < a[j-1]; j-- { + a[j], a[j-1] = a[j-1], a[j] + } + } + return + } + + // Quick sort + quickSort(a, 0, len(a)-1) +} + +func quickSort(a []float64, low, high int) { + if low < high { + p := partition(a, low, high) + quickSort(a, low, p-1) + quickSort(a, p+1, high) + } +} + +func partition(a []float64, low, high int) int { + pivot := a[high] + i := low - 1 + for j := low; j < high; j++ { + if a[j] <= pivot { + i++ + a[i], a[j] = a[j], a[i] + } + } + a[i+1], a[high] = a[high], a[i+1] + return i + 1 +} + +// Global helper functions for recording windowed metrics + +// RecordEndpointRequest records a request to the global windowed metrics manager +func RecordEndpointRequest(endpoint, model string, latencySeconds float64, promptTokens, completionTokens int64, isError, isTimeout bool) { + globalWindowedManagerMutex.RLock() + manager := globalWindowedManager + globalWindowedManagerMutex.RUnlock() + + if manager == nil { + return + } + + manager.RecordRequest(RequestData{ + Timestamp: time.Now(), + Endpoint: endpoint, + Model: model, + LatencySeconds: latencySeconds, + PromptTokens: promptTokens, + CompletionTokens: completionTokens, + IsError: isError, + IsTimeout: isTimeout, + }) +} + +// IncrementEndpointActiveRequests increments the active request count +func IncrementEndpointActiveRequests(endpoint, model string) { + globalWindowedManagerMutex.RLock() + manager := globalWindowedManager + globalWindowedManagerMutex.RUnlock() + + if manager == nil { + return + } + + manager.IncrementActiveRequests(endpoint, model) +} + +// DecrementEndpointActiveRequests decrements the active request count +func DecrementEndpointActiveRequests(endpoint, model string) { + globalWindowedManagerMutex.RLock() + manager := globalWindowedManager + globalWindowedManagerMutex.RUnlock() + + if manager == nil { + return + } + + manager.DecrementActiveRequests(endpoint, model) +} + +// GetWindowedMetricsManager returns the global windowed metrics manager +func GetWindowedMetricsManager() *WindowedMetricsManager { + globalWindowedManagerMutex.RLock() + defer globalWindowedManagerMutex.RUnlock() + return globalWindowedManager +} + +// IsWindowedMetricsEnabled returns true if windowed metrics are enabled +func IsWindowedMetricsEnabled() bool { + globalWindowedManagerMutex.RLock() + manager := globalWindowedManager + globalWindowedManagerMutex.RUnlock() + + return manager != nil && manager.config.Enabled +} diff --git a/src/semantic-router/pkg/observability/metrics/windowed_metrics_test.go b/src/semantic-router/pkg/observability/metrics/windowed_metrics_test.go new file mode 100644 index 000000000..f2c5d225b --- /dev/null +++ b/src/semantic-router/pkg/observability/metrics/windowed_metrics_test.go @@ -0,0 +1,504 @@ +package metrics + +import ( + "testing" + "time" + + "github.com/vllm-project/semantic-router/src/semantic-router/pkg/config" +) + +// TestNewWindowedMetricsManager tests the creation of a new WindowedMetricsManager +func TestNewWindowedMetricsManager(t *testing.T) { + tests := []struct { + name string + config config.WindowedMetricsConfig + wantTimeWindows int + wantInterval time.Duration + wantMaxEndpoints int + }{ + { + name: "Default configuration", + config: config.WindowedMetricsConfig{ + Enabled: true, + }, + wantTimeWindows: 5, // Default: 1m, 5m, 15m, 1h, 24h + wantInterval: DefaultUpdateInterval, + wantMaxEndpoints: DefaultMaxEndpoints, + }, + { + name: "Custom time windows", + config: config.WindowedMetricsConfig{ + Enabled: true, + TimeWindows: []string{"30s", "2m", "10m"}, + }, + wantTimeWindows: 3, + wantInterval: DefaultUpdateInterval, + wantMaxEndpoints: DefaultMaxEndpoints, + }, + { + name: "Custom update interval", + config: config.WindowedMetricsConfig{ + Enabled: true, + UpdateInterval: "5s", + }, + wantTimeWindows: 5, + wantInterval: 5 * time.Second, + wantMaxEndpoints: DefaultMaxEndpoints, + }, + { + name: "Custom max endpoints", + config: config.WindowedMetricsConfig{ + Enabled: true, + MaxEndpoints: 50, + }, + wantTimeWindows: 5, + wantInterval: DefaultUpdateInterval, + wantMaxEndpoints: 50, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + manager, err := NewWindowedMetricsManager(tt.config) + if err != nil { + t.Fatalf("NewWindowedMetricsManager() error = %v", err) + } + + if len(manager.timeWindows) != tt.wantTimeWindows { + t.Errorf("timeWindows count = %d, want %d", len(manager.timeWindows), tt.wantTimeWindows) + } + + if manager.updateInterval != tt.wantInterval { + t.Errorf("updateInterval = %v, want %v", manager.updateInterval, tt.wantInterval) + } + + if manager.maxEndpoints != tt.wantMaxEndpoints { + t.Errorf("maxEndpoints = %d, want %d", manager.maxEndpoints, tt.wantMaxEndpoints) + } + }) + } +} + +// TestRequestRingBuffer tests the ring buffer functionality +func TestRequestRingBuffer(t *testing.T) { + rb := NewRequestRingBuffer(5) + + // Add 3 items + now := time.Now() + for i := 0; i < 3; i++ { + rb.Add(RequestData{ + Timestamp: now.Add(time.Duration(i) * time.Second), + Endpoint: "endpoint1", + Model: "model1", + LatencySeconds: float64(i), + }) + } + + // Should have 3 items + data := rb.GetDataSince(now.Add(-time.Hour)) + if len(data) != 3 { + t.Errorf("GetDataSince() count = %d, want 3", len(data)) + } + + // Add 5 more items (should wrap around) + for i := 0; i < 5; i++ { + rb.Add(RequestData{ + Timestamp: now.Add(time.Duration(10+i) * time.Second), + Endpoint: "endpoint2", + Model: "model2", + LatencySeconds: float64(10 + i), + }) + } + + // Should have 5 items (capacity limit) + data = rb.GetDataSince(now.Add(-time.Hour)) + if len(data) != 5 { + t.Errorf("GetDataSince() count after wrap = %d, want 5", len(data)) + } + + // Verify data is from endpoint2 (most recent) + for _, d := range data { + if d.Endpoint != "endpoint2" { + t.Errorf("Expected endpoint2, got %s", d.Endpoint) + } + } +} + +// TestRequestRingBufferTimeBased tests time-based filtering +func TestRequestRingBufferTimeBased(t *testing.T) { + rb := NewRequestRingBuffer(100) + + now := time.Now() + + // Add items across different time ranges + // Old items (2 hours ago) + for i := 0; i < 10; i++ { + rb.Add(RequestData{ + Timestamp: now.Add(-2 * time.Hour), + Endpoint: "old", + Model: "model1", + LatencySeconds: 1.0, + }) + } + + // Recent items (5 minutes ago) + for i := 0; i < 5; i++ { + rb.Add(RequestData{ + Timestamp: now.Add(-5 * time.Minute), + Endpoint: "recent", + Model: "model1", + LatencySeconds: 2.0, + }) + } + + // Very recent items (30 seconds ago) + for i := 0; i < 3; i++ { + rb.Add(RequestData{ + Timestamp: now.Add(-30 * time.Second), + Endpoint: "very_recent", + Model: "model1", + LatencySeconds: 3.0, + }) + } + + // Query for last minute + data := rb.GetDataSince(now.Add(-1 * time.Minute)) + if len(data) != 3 { + t.Errorf("GetDataSince(1 minute) count = %d, want 3", len(data)) + } + + // Query for last 15 minutes + data = rb.GetDataSince(now.Add(-15 * time.Minute)) + if len(data) != 8 { // 5 + 3 + t.Errorf("GetDataSince(15 minutes) count = %d, want 8", len(data)) + } + + // Query for last 24 hours + data = rb.GetDataSince(now.Add(-24 * time.Hour)) + if len(data) != 18 { // 10 + 5 + 3 + t.Errorf("GetDataSince(24 hours) count = %d, want 18", len(data)) + } +} + +// TestComputePercentile tests percentile calculation +func TestComputePercentile(t *testing.T) { + tests := []struct { + name string + values []float64 + percentile float64 + want float64 + tolerance float64 + }{ + { + name: "Empty values", + values: []float64{}, + percentile: 0.5, + want: 0, + tolerance: 0.001, + }, + { + name: "Single value", + values: []float64{5.0}, + percentile: 0.5, + want: 5.0, + tolerance: 0.001, + }, + { + name: "P50 of sorted sequence", + values: []float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + percentile: 0.5, + want: 5.5, // Interpolated median + tolerance: 0.1, + }, + { + name: "P95 of sorted sequence", + values: []float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + percentile: 0.95, + want: 9.55, + tolerance: 0.1, + }, + { + name: "P99 of sorted sequence", + values: []float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + percentile: 0.99, + want: 9.91, + tolerance: 0.1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := computePercentile(tt.values, tt.percentile) + diff := got - tt.want + if diff < 0 { + diff = -diff + } + if diff > tt.tolerance { + t.Errorf("computePercentile() = %v, want %v (tolerance %v)", got, tt.want, tt.tolerance) + } + }) + } +} + +// TestSortFloat64s tests the sorting function +func TestSortFloat64s(t *testing.T) { + tests := []struct { + name string + input []float64 + want []float64 + }{ + { + name: "Empty slice", + input: []float64{}, + want: []float64{}, + }, + { + name: "Single element", + input: []float64{5.0}, + want: []float64{5.0}, + }, + { + name: "Already sorted", + input: []float64{1.0, 2.0, 3.0, 4.0, 5.0}, + want: []float64{1.0, 2.0, 3.0, 4.0, 5.0}, + }, + { + name: "Reverse sorted", + input: []float64{5.0, 4.0, 3.0, 2.0, 1.0}, + want: []float64{1.0, 2.0, 3.0, 4.0, 5.0}, + }, + { + name: "Random order", + input: []float64{3.0, 1.0, 4.0, 1.0, 5.0, 9.0, 2.0, 6.0}, + want: []float64{1.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 9.0}, + }, + { + name: "Large slice (tests quicksort path)", + input: []float64{15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1}, + want: []float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + input := make([]float64, len(tt.input)) + copy(input, tt.input) + sortFloat64s(input) + + if len(input) != len(tt.want) { + t.Errorf("sortFloat64s() length = %d, want %d", len(input), len(tt.want)) + return + } + + for i := range input { + if input[i] != tt.want[i] { + t.Errorf("sortFloat64s()[%d] = %v, want %v", i, input[i], tt.want[i]) + } + } + }) + } +} + +// TestBufferKey tests buffer key generation and parsing +func TestBufferKey(t *testing.T) { + manager, err := NewWindowedMetricsManager(config.WindowedMetricsConfig{ + Enabled: true, + }) + if err != nil { + t.Fatalf("NewWindowedMetricsManager() error = %v", err) + } + + tests := []struct { + endpoint string + model string + }{ + {"endpoint1:8080", "gpt-4"}, + {"localhost:50051", "llama-3"}, + {"", "model1"}, + {"endpoint1", ""}, + {"", ""}, + } + + for _, tt := range tests { + key := manager.getBufferKey(tt.endpoint, tt.model) + gotEndpoint, gotModel := manager.parseBufferKey(key) + + if gotEndpoint != tt.endpoint { + t.Errorf("parseBufferKey() endpoint = %q, want %q", gotEndpoint, tt.endpoint) + } + if gotModel != tt.model { + t.Errorf("parseBufferKey() model = %q, want %q", gotModel, tt.model) + } + } +} + +// TestWindowedMetricsManagerRecordRequest tests request recording +func TestWindowedMetricsManagerRecordRequest(t *testing.T) { + manager, err := NewWindowedMetricsManager(config.WindowedMetricsConfig{ + Enabled: true, + MaxEndpoints: 3, + }) + if err != nil { + t.Fatalf("NewWindowedMetricsManager() error = %v", err) + } + + // Record requests for multiple endpoints + now := time.Now() + for i := 0; i < 5; i++ { + manager.RecordRequest(RequestData{ + Timestamp: now, + Endpoint: "endpoint1", + Model: "model1", + LatencySeconds: 0.1, + PromptTokens: 100, + CompletionTokens: 50, + }) + } + + manager.RecordRequest(RequestData{ + Timestamp: now, + Endpoint: "endpoint2", + Model: "model2", + LatencySeconds: 0.2, + PromptTokens: 200, + CompletionTokens: 100, + }) + + manager.RecordRequest(RequestData{ + Timestamp: now, + Endpoint: "endpoint3", + Model: "model3", + LatencySeconds: 0.3, + PromptTokens: 300, + CompletionTokens: 150, + }) + + // This should be ignored (max endpoints reached) + manager.RecordRequest(RequestData{ + Timestamp: now, + Endpoint: "endpoint4", + Model: "model4", + LatencySeconds: 0.4, + PromptTokens: 400, + CompletionTokens: 200, + }) + + // Check buffer count + manager.bufferMutex.RLock() + bufferCount := len(manager.requestBuffers) + manager.bufferMutex.RUnlock() + + if bufferCount != 3 { + t.Errorf("Buffer count = %d, want 3 (max endpoints)", bufferCount) + } +} + +// TestActiveRequestTracking tests queue depth tracking +func TestActiveRequestTracking(t *testing.T) { + manager, err := NewWindowedMetricsManager(config.WindowedMetricsConfig{ + Enabled: true, + QueueDepthEstimation: true, + }) + if err != nil { + t.Fatalf("NewWindowedMetricsManager() error = %v", err) + } + + // Increment active requests + manager.IncrementActiveRequests("endpoint1", "model1") + manager.IncrementActiveRequests("endpoint1", "model1") + manager.IncrementActiveRequests("endpoint1", "model1") + + // Check count + manager.activeMutex.RLock() + count := manager.activeRequests["endpoint1|model1"] + manager.activeMutex.RUnlock() + + if count != 3 { + t.Errorf("Active requests count = %d, want 3", count) + } + + // Decrement + manager.DecrementActiveRequests("endpoint1", "model1") + manager.DecrementActiveRequests("endpoint1", "model1") + + manager.activeMutex.RLock() + count = manager.activeRequests["endpoint1|model1"] + manager.activeMutex.RUnlock() + + if count != 1 { + t.Errorf("Active requests count after decrement = %d, want 1", count) + } + + // Decrement beyond zero (should not go negative) + manager.DecrementActiveRequests("endpoint1", "model1") + manager.DecrementActiveRequests("endpoint1", "model1") + + manager.activeMutex.RLock() + count = manager.activeRequests["endpoint1|model1"] + manager.activeMutex.RUnlock() + + if count != 0 { + t.Errorf("Active requests count should not go negative, got %d", count) + } +} + +// TestDisabledManager tests that disabled manager doesn't record +func TestDisabledManager(t *testing.T) { + manager, err := NewWindowedMetricsManager(config.WindowedMetricsConfig{ + Enabled: false, + }) + if err != nil { + t.Fatalf("NewWindowedMetricsManager() error = %v", err) + } + + // Record request (should be ignored) + manager.RecordRequest(RequestData{ + Timestamp: time.Now(), + Endpoint: "endpoint1", + Model: "model1", + LatencySeconds: 0.1, + }) + + // Check buffer count (should be 0) + manager.bufferMutex.RLock() + bufferCount := len(manager.requestBuffers) + manager.bufferMutex.RUnlock() + + if bufferCount != 0 { + t.Errorf("Buffer count = %d, want 0 (disabled manager)", bufferCount) + } +} + +// BenchmarkRecordRequest benchmarks request recording +func BenchmarkRecordRequest(b *testing.B) { + manager, _ := NewWindowedMetricsManager(config.WindowedMetricsConfig{ + Enabled: true, + }) + + req := RequestData{ + Timestamp: time.Now(), + Endpoint: "endpoint1", + Model: "model1", + LatencySeconds: 0.1, + PromptTokens: 100, + CompletionTokens: 50, + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + manager.RecordRequest(req) + } +} + +// BenchmarkComputePercentile benchmarks percentile computation +func BenchmarkComputePercentile(b *testing.B) { + values := make([]float64, 1000) + for i := range values { + values[i] = float64(i) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + computePercentile(values, 0.95) + } +} diff --git a/website/docs/tutorials/observability/metrics.md b/website/docs/tutorials/observability/metrics.md index ea19671a5..8fe415e1d 100644 --- a/website/docs/tutorials/observability/metrics.md +++ b/website/docs/tutorials/observability/metrics.md @@ -179,7 +179,69 @@ histogram_quantile(0.95, rate(llm_model_completion_latency_seconds_bucket[5m])) --- -## 6. Troubleshooting +## 6. Windowed Endpoint Metrics (Load Balancing) + +Enhanced time-windowed metrics for endpoint and model performance tracking, useful for load balancing decisions. + +### Configuration + +Enable windowed metrics in `config.yaml`: + +```yaml +observability: + metrics: + windowed_metrics: + enabled: true + time_windows: ["1m", "5m", "15m", "1h", "24h"] + update_interval: "10s" + endpoint_metrics: true + queue_depth_estimation: true + max_endpoints: 100 +``` + +### Endpoint-Level Metrics + +| Metric | Type | Labels | Description | +| --------------------------------------------- | ----- | ----------------------------------- | ------------------------------------- | +| `llm_endpoint_latency_windowed_seconds` | gauge | endpoint, model, time_window | Average latency per time window | +| `llm_endpoint_requests_windowed_total` | gauge | endpoint, model, time_window | Request count per time window | +| `llm_endpoint_tokens_windowed_total` | gauge | endpoint, model, token_type, time_window | Token throughput per window | +| `llm_endpoint_utilization_percentage` | gauge | endpoint, time_window | Estimated utilization percentage | +| `llm_endpoint_queue_depth_estimated` | gauge | endpoint, model | Current estimated queue depth | +| `llm_endpoint_error_rate_windowed` | gauge | endpoint, model, time_window | Error rate per time window | +| `llm_endpoint_latency_p50_windowed_seconds` | gauge | endpoint, model, time_window | P50 latency per time window | +| `llm_endpoint_latency_p95_windowed_seconds` | gauge | endpoint, model, time_window | P95 latency per time window | +| `llm_endpoint_latency_p99_windowed_seconds` | gauge | endpoint, model, time_window | P99 latency per time window | + +### Example Queries + +```promql +# Average latency for endpoint in last 5 minutes +llm_endpoint_latency_windowed_seconds{endpoint="10.0.0.1:8080", time_window="5m"} + +# P95 latency comparison across endpoints +llm_endpoint_latency_p95_windowed_seconds{time_window="15m"} + +# Token throughput per endpoint +llm_endpoint_tokens_windowed_total{token_type="completion", time_window="1h"} + +# Current queue depth for load balancing decisions +llm_endpoint_queue_depth_estimated{endpoint="10.0.0.1:8080"} + +# Error rate monitoring +llm_endpoint_error_rate_windowed{time_window="5m"} > 0.05 +``` + +### Use Cases + +1. **Load Balancing**: Use queue depth and latency metrics to route requests to less loaded endpoints +2. **Performance Monitoring**: Track P95/P99 latency trends across time windows +3. **Capacity Planning**: Monitor utilization percentages to identify when to scale +4. **Alerting**: Set alerts on error rates or latency spikes within specific time windows + +--- + +## 7. Troubleshooting | Issue | Check | Fix | | --------------- | ------------------- | ----------------------------------------------------- |