Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions cmd/pod-scaler/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"github.com/openshift/ci-tools/pkg/steps"
)

func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Interface, loaders map[string][]*cacheReloader, mutateResourceLimits bool, cpuCap int64, memoryCap string, cpuPriorityScheduling int64, reporter results.PodScalerReporter) {
func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Interface, loaders map[string][]*cacheReloader, mutateResourceLimits bool, cpuCap int64, memoryCap string, cpuPriorityScheduling int64, bqClient *BigQueryClient, reporter results.PodScalerReporter) {
logger := logrus.WithField("component", "pod-scaler admission")
logger.Infof("Initializing admission webhook server with %d loaders.", len(loaders))
health := pjutil.NewHealthOnPort(healthPort)
Expand All @@ -44,7 +44,7 @@ func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Int
Port: port,
CertDir: certDir,
})
server.Register("/pods", &webhook.Admission{Handler: &podMutator{logger: logger, client: client, decoder: decoder, resources: resources, mutateResourceLimits: mutateResourceLimits, cpuCap: cpuCap, memoryCap: memoryCap, cpuPriorityScheduling: cpuPriorityScheduling, reporter: reporter}})
server.Register("/pods", &webhook.Admission{Handler: &podMutator{logger: logger, client: client, decoder: decoder, resources: resources, mutateResourceLimits: mutateResourceLimits, cpuCap: cpuCap, memoryCap: memoryCap, cpuPriorityScheduling: cpuPriorityScheduling, bqClient: bqClient, reporter: reporter}})
logger.Info("Serving admission webhooks.")
if err := server.Start(interrupts.Context()); err != nil {
logrus.WithError(err).Fatal("Failed to serve webhooks.")
Expand All @@ -60,6 +60,7 @@ type podMutator struct {
cpuCap int64
memoryCap string
cpuPriorityScheduling int64
bqClient *BigQueryClient
reporter results.PodScalerReporter
Comment on lines +63 to 64
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Bug in AddPodAntiAffinity referenced from this file: existing terms are duplicated instead of merged.

In cmd/pod-scaler/measured.go lines 355-357 (used by this admission flow), there's a bug in merging anti-affinity terms:

existingTerms := pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution
if existingTerms != nil {
    requiredTerms = append(requiredTerms, requiredTerms...)  // BUG: appends requiredTerms to itself
}

This should append existingTerms to requiredTerms, not requiredTerms to itself.

Proposed fix in measured.go
 	// Merge with existing anti-affinity terms instead of overwriting
 	existingTerms := pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution
 	if existingTerms != nil {
-		requiredTerms = append(requiredTerms, requiredTerms...)
+		requiredTerms = append(existingTerms, requiredTerms...)
 	}
 	pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution = requiredTerms
🤖 Prompt for AI Agents
In `@cmd/pod-scaler/admission.go` around lines 63 - 64, The AddPodAntiAffinity
merge incorrectly appends requiredTerms to itself; in the logic where
existingTerms is read from
pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution,
replace the buggy append(requiredTerms, requiredTerms...) with
append(requiredTerms, existingTerms...) so existing terms are merged into
requiredTerms (ensure you reference AddPodAntiAffinity and the local variables
requiredTerms and existingTerms when making the change).

}

Expand Down Expand Up @@ -97,6 +98,15 @@ func (m *podMutator) Handle(ctx context.Context, req admission.Request) admissio
logger.WithError(err).Error("Failed to handle rehearsal Pod.")
return admission.Allowed("Failed to handle rehearsal Pod, ignoring.")
}

// Classify pod as normal or measured (if enabled)
if m.bqClient != nil {
ClassifyPod(pod, m.bqClient, logger)
AddPodAntiAffinity(pod, logger)
// Apply measured pod resources before regular resource mutation
ApplyMeasuredPodResources(pod, m.bqClient, logger)
}

mutatePodResources(pod, m.resources, m.mutateResourceLimits, m.cpuCap, m.memoryCap, m.reporter, logger)
m.addPriorityClass(pod)

Expand Down
51 changes: 44 additions & 7 deletions cmd/pod-scaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,13 @@ type options struct {
}

type producerOptions struct {
kubernetesOptions prowflagutil.KubernetesOptions
once bool
ignoreLatest time.Duration
kubernetesOptions prowflagutil.KubernetesOptions
once bool
ignoreLatest time.Duration
enableMeasuredPods bool
bigQueryProjectID string
bigQueryDatasetID string
bigQueryCredentialsFile string
}

type consumerOptions struct {
Expand All @@ -74,8 +78,12 @@ func bindOptions(fs *flag.FlagSet) *options {
o.instrumentationOptions.AddFlags(fs)
fs.StringVar(&o.mode, "mode", "", "Which mode to run in.")
o.producerOptions.kubernetesOptions.AddFlags(fs)
fs.DurationVar(&o.ignoreLatest, "ignore-latest", 0, "Duration of latest time series to ignore when querying Prometheus. For instance, 1h will ignore the latest hour of data.")
fs.BoolVar(&o.once, "produce-once", false, "Query Prometheus and refresh cached data only once before exiting.")
fs.DurationVar(&o.producerOptions.ignoreLatest, "ignore-latest", 0, "Duration of latest time series to ignore when querying Prometheus. For instance, 1h will ignore the latest hour of data.")
fs.BoolVar(&o.producerOptions.once, "produce-once", false, "Query Prometheus and refresh cached data only once before exiting.")
fs.BoolVar(&o.producerOptions.enableMeasuredPods, "enable-measured-pods", false, "Enable measured pods feature. When enabled, pods are classified as 'normal' or 'measured' and measured pods run on isolated nodes to get accurate CPU/memory utilization data.")
fs.StringVar(&o.producerOptions.bigQueryProjectID, "bigquery-project-id", "", "Google Cloud project ID for BigQuery (required if enable-measured-pods is true)")
fs.StringVar(&o.producerOptions.bigQueryDatasetID, "bigquery-dataset-id", "", "BigQuery dataset ID for pod metrics (required if enable-measured-pods is true)")
fs.StringVar(&o.producerOptions.bigQueryCredentialsFile, "bigquery-credentials-file", "", "Path to Google Cloud credentials file for BigQuery access")
fs.IntVar(&o.port, "port", 0, "Port to serve admission webhooks on.")
fs.IntVar(&o.uiPort, "ui-port", 0, "Port to serve frontend on.")
fs.StringVar(&o.certDir, "serving-cert-dir", "", "Path to directory with serving certificate and key for the admission webhook server.")
Expand Down Expand Up @@ -244,7 +252,21 @@ func mainProduce(opts *options, cache Cache) {
logger.Debugf("Loaded Prometheus client.")
}

produce(clients, cache, opts.ignoreLatest, opts.once)
var bqClient *BigQueryClient
if opts.producerOptions.enableMeasuredPods {
if opts.producerOptions.bigQueryProjectID == "" || opts.producerOptions.bigQueryDatasetID == "" {
logrus.Fatal("bigquery-project-id and bigquery-dataset-id are required when enable-measured-pods is true")
}
cache := NewMeasuredPodCache(logrus.WithField("component", "measured-pods-producer"))
var err error
bqClient, err = NewBigQueryClient(opts.producerOptions.bigQueryProjectID, opts.producerOptions.bigQueryDatasetID, opts.producerOptions.bigQueryCredentialsFile, cache, logrus.WithField("component", "measured-pods-producer"))
if err != nil {
logrus.WithError(err).Fatal("Failed to create BigQuery client for measured pods")
}
logrus.Info("Measured pods feature enabled with BigQuery integration")
}

produce(clients, cache, opts.producerOptions.ignoreLatest, opts.producerOptions.once, bqClient)

}

Expand All @@ -268,7 +290,22 @@ func mainAdmission(opts *options, cache Cache) {
logrus.WithError(err).Fatal("Failed to create pod-scaler reporter.")
}

go admit(opts.port, opts.instrumentationOptions.HealthPort, opts.certDir, client, loaders(cache), opts.mutateResourceLimits, opts.cpuCap, opts.memoryCap, opts.cpuPriorityScheduling, reporter)
// Use producerOptions for measured pods config (shared between producer and consumer modes)
var bqClient *BigQueryClient
if opts.producerOptions.enableMeasuredPods {
if opts.producerOptions.bigQueryProjectID == "" || opts.producerOptions.bigQueryDatasetID == "" {
logrus.Fatal("bigquery-project-id and bigquery-dataset-id are required when enable-measured-pods is true")
}
cache := NewMeasuredPodCache(logrus.WithField("component", "measured-pods-admission"))
var err error
bqClient, err = NewBigQueryClient(opts.producerOptions.bigQueryProjectID, opts.producerOptions.bigQueryDatasetID, opts.producerOptions.bigQueryCredentialsFile, cache, logrus.WithField("component", "measured-pods-admission"))
if err != nil {
logrus.WithError(err).Fatal("Failed to create BigQuery client for measured pods")
}
logrus.Info("Measured pods feature enabled with BigQuery integration")
}

go admit(opts.port, opts.instrumentationOptions.HealthPort, opts.certDir, client, loaders(cache), opts.mutateResourceLimits, opts.cpuCap, opts.memoryCap, opts.cpuPriorityScheduling, bqClient, reporter)
}

func loaders(cache Cache) map[string][]*cacheReloader {
Expand Down
Loading