Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/semantic-router/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/vllm-project/semantic-router/src/semantic-router/pkg/extproc"
"github.com/vllm-project/semantic-router/src/semantic-router/pkg/k8s"
"github.com/vllm-project/semantic-router/src/semantic-router/pkg/observability/logging"
"github.com/vllm-project/semantic-router/src/semantic-router/pkg/observability/metrics"
"github.com/vllm-project/semantic-router/src/semantic-router/pkg/observability/tracing"
)

Expand Down Expand Up @@ -87,6 +88,16 @@ func main() {
}()
}

// Initialize windowed metrics if enabled
if cfg.Observability.Metrics.WindowedMetrics.Enabled {
logging.Infof("Initializing windowed metrics for load balancing...")
if err := metrics.InitializeWindowedMetrics(cfg.Observability.Metrics.WindowedMetrics); err != nil {
logging.Warnf("Failed to initialize windowed metrics: %v", err)
} else {
logging.Infof("Windowed metrics initialized successfully")
}
}

// Set up signal handling for graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
Expand Down
33 changes: 33 additions & 0 deletions src/semantic-router/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,39 @@ type APIConfig struct {
type ObservabilityConfig struct {
// Tracing configuration for distributed tracing
Tracing TracingConfig `yaml:"tracing"`

// Metrics configuration for enhanced metrics collection
Metrics MetricsConfig `yaml:"metrics"`
}

// MetricsConfig represents configuration for metrics collection
type MetricsConfig struct {
// Enable windowed metrics collection for load balancing
WindowedMetrics WindowedMetricsConfig `yaml:"windowed_metrics"`
}

// WindowedMetricsConfig represents configuration for time-windowed metrics
type WindowedMetricsConfig struct {
// Enable windowed metrics collection
Enabled bool `yaml:"enabled"`

// Time windows to track (in duration format, e.g., "1m", "5m", "15m", "1h", "24h")
// Default: ["1m", "5m", "15m", "1h", "24h"]
TimeWindows []string `yaml:"time_windows,omitempty"`

// Update interval for windowed metrics computation (e.g., "10s", "30s")
// Default: "10s"
UpdateInterval string `yaml:"update_interval,omitempty"`

// Enable endpoint-level metrics tracking
EndpointMetrics bool `yaml:"endpoint_metrics"`

// Enable queue depth estimation
QueueDepthEstimation bool `yaml:"queue_depth_estimation"`

// Maximum number of endpoints to track (to prevent cardinality explosion)
// Default: 100
MaxEndpoints int `yaml:"max_endpoints,omitempty"`
}

// TracingConfig represents configuration for distributed tracing
Expand Down
6 changes: 6 additions & 0 deletions src/semantic-router/pkg/extproc/processor_req_body.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,12 @@ func (r *OpenAIRouter) selectEndpointForModel(ctx *RequestContext, model string)
backendSpan.End()
ctx.TraceContext = backendCtx

// Store the selected endpoint in context for windowed metrics tracking
ctx.SelectedEndpoint = endpointAddress

// Increment active request count for queue depth estimation
metrics.IncrementEndpointActiveRequests(endpointAddress, model)

return endpointAddress
}

Expand Down
3 changes: 3 additions & 0 deletions src/semantic-router/pkg/extproc/processor_req_header.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ type RequestContext struct {
VSRInjectedSystemPrompt bool // Whether a system prompt was injected into the request
VSRSelectedDecision *config.Decision // The decision object selected by DecisionEngine (for plugins)

// Endpoint tracking for windowed metrics
SelectedEndpoint string // The endpoint address selected for this request

// Tracing context
TraceContext context.Context // OpenTelemetry trace context for span propagation
}
Expand Down
14 changes: 14 additions & 0 deletions src/semantic-router/pkg/extproc/processor_res_body.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import (
func (r *OpenAIRouter) handleResponseBody(v *ext_proc.ProcessingRequest_ResponseBody, ctx *RequestContext) (*ext_proc.ProcessingResponse, error) {
completionLatency := time.Since(ctx.StartTime)

// Decrement active request count for queue depth estimation
defer metrics.DecrementEndpointActiveRequests(ctx.SelectedEndpoint, ctx.RequestModel)

// Process the response for caching
responseBody := v.ResponseBody.Body

Expand Down Expand Up @@ -68,6 +71,17 @@ func (r *OpenAIRouter) handleResponseBody(v *ext_proc.ProcessingRequest_Response
metrics.RecordModelTPOT(ctx.RequestModel, timePerToken)
}

// Record windowed endpoint metrics for load balancing
metrics.RecordEndpointRequest(
ctx.SelectedEndpoint,
ctx.RequestModel,
completionLatency.Seconds(),
int64(promptTokens),
int64(completionTokens),
false, // isError
false, // isTimeout
)

// Compute and record cost if pricing is configured
if r.Config != nil {
promptRatePer1M, completionRatePer1M, currency, ok := r.Config.GetModelPricing(ctx.RequestModel)
Expand Down
Loading
Loading