Skip to content

Commit 4877a90

Browse files
feat: add time-windowed endpoint metrics for load balancing
Signed-off-by: Jintao Zhang <zhangjintao9020@gmail.com>
1 parent 519d4d8 commit 4877a90

File tree

8 files changed

+1261
-1
lines changed

8 files changed

+1261
-1
lines changed

src/semantic-router/cmd/main.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/vllm-project/semantic-router/src/semantic-router/pkg/extproc"
1919
"github.com/vllm-project/semantic-router/src/semantic-router/pkg/k8s"
2020
"github.com/vllm-project/semantic-router/src/semantic-router/pkg/observability/logging"
21+
"github.com/vllm-project/semantic-router/src/semantic-router/pkg/observability/metrics"
2122
"github.com/vllm-project/semantic-router/src/semantic-router/pkg/observability/tracing"
2223
)
2324

@@ -87,6 +88,16 @@ func main() {
8788
}()
8889
}
8990

91+
// Initialize windowed metrics if enabled
92+
if cfg.Observability.Metrics.WindowedMetrics.Enabled {
93+
logging.Infof("Initializing windowed metrics for load balancing...")
94+
if err := metrics.InitializeWindowedMetrics(cfg.Observability.Metrics.WindowedMetrics); err != nil {
95+
logging.Warnf("Failed to initialize windowed metrics: %v", err)
96+
} else {
97+
logging.Infof("Windowed metrics initialized successfully")
98+
}
99+
}
100+
90101
// Set up signal handling for graceful shutdown
91102
sigChan := make(chan os.Signal, 1)
92103
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)

src/semantic-router/pkg/config/config.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,39 @@ type APIConfig struct {
262262
type ObservabilityConfig struct {
263263
// Tracing configuration for distributed tracing
264264
Tracing TracingConfig `yaml:"tracing"`
265+
266+
// Metrics configuration for enhanced metrics collection
267+
Metrics MetricsConfig `yaml:"metrics"`
268+
}
269+
270+
// MetricsConfig represents configuration for metrics collection
271+
type MetricsConfig struct {
272+
// Enable windowed metrics collection for load balancing
273+
WindowedMetrics WindowedMetricsConfig `yaml:"windowed_metrics"`
274+
}
275+
276+
// WindowedMetricsConfig represents configuration for time-windowed metrics
277+
type WindowedMetricsConfig struct {
278+
// Enable windowed metrics collection
279+
Enabled bool `yaml:"enabled"`
280+
281+
// Time windows to track (in duration format, e.g., "1m", "5m", "15m", "1h", "24h")
282+
// Default: ["1m", "5m", "15m", "1h", "24h"]
283+
TimeWindows []string `yaml:"time_windows,omitempty"`
284+
285+
// Update interval for windowed metrics computation (e.g., "10s", "30s")
286+
// Default: "10s"
287+
UpdateInterval string `yaml:"update_interval,omitempty"`
288+
289+
// Enable endpoint-level metrics tracking
290+
EndpointMetrics bool `yaml:"endpoint_metrics"`
291+
292+
// Enable queue depth estimation
293+
QueueDepthEstimation bool `yaml:"queue_depth_estimation"`
294+
295+
// Maximum number of endpoints to track (to prevent cardinality explosion)
296+
// Default: 100
297+
MaxEndpoints int `yaml:"max_endpoints,omitempty"`
265298
}
266299

267300
// TracingConfig represents configuration for distributed tracing

src/semantic-router/pkg/extproc/processor_req_body.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,12 @@ func (r *OpenAIRouter) selectEndpointForModel(ctx *RequestContext, model string)
229229
backendSpan.End()
230230
ctx.TraceContext = backendCtx
231231

232+
// Store the selected endpoint in context for windowed metrics tracking
233+
ctx.SelectedEndpoint = endpointAddress
234+
235+
// Increment active request count for queue depth estimation
236+
metrics.IncrementEndpointActiveRequests(endpointAddress, model)
237+
232238
return endpointAddress
233239
}
234240

src/semantic-router/pkg/extproc/processor_req_header.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ type RequestContext struct {
4242
VSRInjectedSystemPrompt bool // Whether a system prompt was injected into the request
4343
VSRSelectedDecision *config.Decision // The decision object selected by DecisionEngine (for plugins)
4444

45+
// Endpoint tracking for windowed metrics
46+
SelectedEndpoint string // The endpoint address selected for this request
47+
4548
// Tracing context
4649
TraceContext context.Context // OpenTelemetry trace context for span propagation
4750
}

src/semantic-router/pkg/extproc/processor_res_body.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ import (
1515
func (r *OpenAIRouter) handleResponseBody(v *ext_proc.ProcessingRequest_ResponseBody, ctx *RequestContext) (*ext_proc.ProcessingResponse, error) {
1616
completionLatency := time.Since(ctx.StartTime)
1717

18+
// Decrement active request count for queue depth estimation
19+
defer metrics.DecrementEndpointActiveRequests(ctx.SelectedEndpoint, ctx.RequestModel)
20+
1821
// Process the response for caching
1922
responseBody := v.ResponseBody.Body
2023

@@ -68,6 +71,17 @@ func (r *OpenAIRouter) handleResponseBody(v *ext_proc.ProcessingRequest_Response
6871
metrics.RecordModelTPOT(ctx.RequestModel, timePerToken)
6972
}
7073

74+
// Record windowed endpoint metrics for load balancing
75+
metrics.RecordEndpointRequest(
76+
ctx.SelectedEndpoint,
77+
ctx.RequestModel,
78+
completionLatency.Seconds(),
79+
int64(promptTokens),
80+
int64(completionTokens),
81+
false, // isError
82+
false, // isTimeout
83+
)
84+
7185
// Compute and record cost if pricing is configured
7286
if r.Config != nil {
7387
promptRatePer1M, completionRatePer1M, currency, ok := r.Config.GetModelPricing(ctx.RequestModel)

0 commit comments

Comments
 (0)