From 37e07173d131b7e38b905672f776ae5f2eccad9e Mon Sep 17 00:00:00 2001 From: Deep Mistry Date: Tue, 6 Jan 2026 10:55:32 -0500 Subject: [PATCH 1/2] Add measured pods feature for accurate resource measurement on isolated nodes Implements DPTP-4613 to address the issue where pod-scaler recommendations are skewed by node contention. When multiple pods with poor CPU configurations are scheduled on the same node, CPU is maxed out but pods finish eventually. The pod-scaler observes low CPU utilization (due to node contention) and incorrectly concludes requests should not be increased, leading to a cycle of reduced limits and tighter packing. This change introduces a measured pods system: - Pods are classified as 'normal' or 'measured' based on whether they need fresh measurement data (measured if last measurement >10 days ago or never measured) - Measured pods use podAntiAffinity rules to run on isolated nodes with no other CI workloads, ensuring accurate CPU/memory utilization measurement - BigQuery integration queries and caches max CPU/memory utilization from measured pod runs, refreshing daily to keep data current - Resource recommendations are applied only to the longest-running container in each pod, using actual measured utilization data instead of Prometheus metrics that may be skewed by node contention The feature is opt-in via --enable-measured-pods flag and requires BigQuery configuration (--bigquery-project-id and --bigquery-dataset-id). --- cmd/pod-scaler/admission.go | 52 +++- cmd/pod-scaler/admission_test.go | 2 +- cmd/pod-scaler/main.go | 13 +- cmd/pod-scaler/measured.go | 410 +++++++++++++++++++++++++++++++ cmd/pod-scaler/measured_test.go | 253 +++++++++++++++++++ 5 files changed, 715 insertions(+), 15 deletions(-) create mode 100644 cmd/pod-scaler/measured.go create mode 100644 cmd/pod-scaler/measured_test.go diff --git a/cmd/pod-scaler/admission.go b/cmd/pod-scaler/admission.go index f17d62e1e3f..06d618429d1 100644 --- a/cmd/pod-scaler/admission.go +++ b/cmd/pod-scaler/admission.go @@ -33,18 +33,32 @@ import ( "github.com/openshift/ci-tools/pkg/steps" ) -func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Interface, loaders map[string][]*cacheReloader, mutateResourceLimits bool, cpuCap int64, memoryCap string, cpuPriorityScheduling int64, reporter results.PodScalerReporter) { +func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Interface, loaders map[string][]*cacheReloader, mutateResourceLimits bool, cpuCap int64, memoryCap string, cpuPriorityScheduling int64, authoritativeCPURequests, authoritativeMemoryRequests bool, enableMeasuredPods bool, bigQueryProjectID, bigQueryDatasetID, bigQueryCredentialsFile string, reporter results.PodScalerReporter) { logger := logrus.WithField("component", "pod-scaler admission") logger.Infof("Initializing admission webhook server with %d loaders.", len(loaders)) health := pjutil.NewHealthOnPort(healthPort) resources := newResourceServer(loaders, health) decoder := admission.NewDecoder(scheme.Scheme) + var bqClient *BigQueryClient + if enableMeasuredPods { + if bigQueryProjectID == "" || bigQueryDatasetID == "" { + logrus.Fatal("bigquery-project-id and bigquery-dataset-id are required when enable-measured-pods is true") + } + cache := NewMeasuredPodCache(logger) + var err error + bqClient, err = NewBigQueryClient(bigQueryProjectID, bigQueryDatasetID, bigQueryCredentialsFile, cache, logger) + if err != nil { + logrus.WithError(err).Fatal("Failed to create BigQuery client for measured pods") + } + logger.Info("Measured pods feature enabled with BigQuery integration") + } + server := webhook.NewServer(webhook.Options{ Port: port, CertDir: certDir, }) - server.Register("/pods", &webhook.Admission{Handler: &podMutator{logger: logger, client: client, decoder: decoder, resources: resources, mutateResourceLimits: mutateResourceLimits, cpuCap: cpuCap, memoryCap: memoryCap, cpuPriorityScheduling: cpuPriorityScheduling, reporter: reporter}}) + server.Register("/pods", &webhook.Admission{Handler: &podMutator{logger: logger, client: client, decoder: decoder, resources: resources, mutateResourceLimits: mutateResourceLimits, cpuCap: cpuCap, memoryCap: memoryCap, cpuPriorityScheduling: cpuPriorityScheduling, authoritativeCPURequests: authoritativeCPURequests, authoritativeMemoryRequests: authoritativeMemoryRequests, bqClient: bqClient, reporter: reporter}}) logger.Info("Serving admission webhooks.") if err := server.Start(interrupts.Context()); err != nil { logrus.WithError(err).Fatal("Failed to serve webhooks.") @@ -52,15 +66,18 @@ func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Int } type podMutator struct { - logger *logrus.Entry - client buildclientv1.BuildV1Interface - resources *resourceServer - mutateResourceLimits bool - decoder admission.Decoder - cpuCap int64 - memoryCap string - cpuPriorityScheduling int64 - reporter results.PodScalerReporter + logger *logrus.Entry + client buildclientv1.BuildV1Interface + resources *resourceServer + mutateResourceLimits bool + decoder admission.Decoder + cpuCap int64 + memoryCap string + cpuPriorityScheduling int64 + authoritativeCPURequests bool + authoritativeMemoryRequests bool + bqClient *BigQueryClient + reporter results.PodScalerReporter } func (m *podMutator) Handle(ctx context.Context, req admission.Request) admission.Response { @@ -97,7 +114,16 @@ func (m *podMutator) Handle(ctx context.Context, req admission.Request) admissio logger.WithError(err).Error("Failed to handle rehearsal Pod.") return admission.Allowed("Failed to handle rehearsal Pod, ignoring.") } - mutatePodResources(pod, m.resources, m.mutateResourceLimits, m.cpuCap, m.memoryCap, m.reporter, logger) + + // Classify pod as normal or measured (if enabled) + if m.bqClient != nil { + ClassifyPod(pod, m.bqClient, logger) + AddPodAntiAffinity(pod, logger) + // Apply measured pod resources before regular resource mutation + ApplyMeasuredPodResources(pod, m.bqClient, logger) + } + + mutatePodResources(pod, m.resources, m.mutateResourceLimits, m.cpuCap, m.memoryCap, m.authoritativeCPURequests, m.authoritativeMemoryRequests, m.reporter, logger) m.addPriorityClass(pod) marshaledPod, err := json.Marshal(pod) @@ -292,7 +318,7 @@ func preventUnschedulable(resources *corev1.ResourceRequirements, cpuCap int64, } } -func mutatePodResources(pod *corev1.Pod, server *resourceServer, mutateResourceLimits bool, cpuCap int64, memoryCap string, reporter results.PodScalerReporter, logger *logrus.Entry) { +func mutatePodResources(pod *corev1.Pod, server *resourceServer, mutateResourceLimits bool, cpuCap int64, memoryCap string, authoritativeCPU, authoritativeMemory bool, reporter results.PodScalerReporter, logger *logrus.Entry) { mutateResources := func(containers []corev1.Container) { for i := range containers { meta := podscaler.MetadataFor(pod.ObjectMeta.Labels, pod.ObjectMeta.Name, containers[i].Name) diff --git a/cmd/pod-scaler/admission_test.go b/cmd/pod-scaler/admission_test.go index beac4cdeff2..16c957a3c34 100644 --- a/cmd/pod-scaler/admission_test.go +++ b/cmd/pod-scaler/admission_test.go @@ -554,7 +554,7 @@ func TestMutatePodResources(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { original := testCase.pod.DeepCopy() - mutatePodResources(testCase.pod, testCase.server, testCase.mutateResourceLimits, 10, "20Gi", &defaultReporter, logrus.WithField("test", testCase.name)) + mutatePodResources(testCase.pod, testCase.server, testCase.mutateResourceLimits, 10, "20Gi", false, false, &defaultReporter, logrus.WithField("test", testCase.name)) diff := cmp.Diff(original, testCase.pod) // In some cases, cmp.Diff decides to use non-breaking spaces, and it's not // particularly deterministic about this. We don't care. diff --git a/cmd/pod-scaler/main.go b/cmd/pod-scaler/main.go index 03f7758a5d8..34977df2e83 100644 --- a/cmd/pod-scaler/main.go +++ b/cmd/pod-scaler/main.go @@ -67,6 +67,13 @@ type consumerOptions struct { cpuCap int64 memoryCap string cpuPriorityScheduling int64 + + // Measured pods options - when enabled, pods are classified as "normal" or "measured" + // Measured pods run on isolated nodes to get accurate CPU/memory utilization data + enableMeasuredPods bool + bigQueryProjectID string + bigQueryDatasetID string + bigQueryCredentialsFile string } func bindOptions(fs *flag.FlagSet) *options { @@ -89,6 +96,10 @@ func bindOptions(fs *flag.FlagSet) *options { fs.Int64Var(&o.cpuCap, "cpu-cap", 10, "The maximum CPU request value, ex: 10") fs.StringVar(&o.memoryCap, "memory-cap", "20Gi", "The maximum memory request value, ex: '20Gi'") fs.Int64Var(&o.cpuPriorityScheduling, "cpu-priority-scheduling", 8, "Pods with CPU requests at, or above, this value will be admitted with priority scheduling") + fs.BoolVar(&o.enableMeasuredPods, "enable-measured-pods", false, "Enable measured pods feature. When enabled, pods are classified as 'normal' or 'measured' and measured pods run on isolated nodes to get accurate CPU/memory utilization data.") + fs.StringVar(&o.bigQueryProjectID, "bigquery-project-id", "", "Google Cloud project ID for BigQuery queries (required if enable-measured-pods is true)") + fs.StringVar(&o.bigQueryDatasetID, "bigquery-dataset-id", "", "BigQuery dataset ID for pod metrics (required if enable-measured-pods is true)") + fs.StringVar(&o.bigQueryCredentialsFile, "bigquery-credentials-file", "", "Path to Google Cloud credentials file for BigQuery access") o.resultsOptions.Bind(fs) return &o } @@ -268,7 +279,7 @@ func mainAdmission(opts *options, cache Cache) { logrus.WithError(err).Fatal("Failed to create pod-scaler reporter.") } - go admit(opts.port, opts.instrumentationOptions.HealthPort, opts.certDir, client, loaders(cache), opts.mutateResourceLimits, opts.cpuCap, opts.memoryCap, opts.cpuPriorityScheduling, reporter) + go admit(opts.port, opts.instrumentationOptions.HealthPort, opts.certDir, client, loaders(cache), opts.mutateResourceLimits, opts.cpuCap, opts.memoryCap, opts.cpuPriorityScheduling, false, false, opts.enableMeasuredPods, opts.bigQueryProjectID, opts.bigQueryDatasetID, opts.bigQueryCredentialsFile, reporter) } func loaders(cache Cache) map[string][]*cacheReloader { diff --git a/cmd/pod-scaler/measured.go b/cmd/pod-scaler/measured.go new file mode 100644 index 00000000000..3b1b217e14f --- /dev/null +++ b/cmd/pod-scaler/measured.go @@ -0,0 +1,410 @@ +package main + +import ( + "context" + "fmt" + "sync" + "time" + + "cloud.google.com/go/bigquery" + "github.com/sirupsen/logrus" + "google.golang.org/api/iterator" + "google.golang.org/api/option" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/prow/pkg/interrupts" + + "github.com/openshift/ci-tools/pkg/api" + podscaler "github.com/openshift/ci-tools/pkg/pod-scaler" +) + +const ( + // PodScalerLabelKey is the label we use to mark pods as either "normal" or "measured". + // This helps the scheduler know which pods can share nodes and which need isolation. + PodScalerLabelKey = "pod-scaler" + // PodScalerLabelValueNormal means this pod can run alongside other CI workloads. + // These pods use the regular resource recommendations from Prometheus data. + PodScalerLabelValueNormal = "normal" + // PodScalerLabelValueMeasured means this pod needs to run on an isolated node (no other CI pods). + // We do this so we can accurately measure what resources it actually uses without interference. + PodScalerLabelValueMeasured = "measured" + // MeasuredPodDataRetentionDays is how long we trust measured pod data before we need to re-measure. + // After 10 days, we mark the pod as "measured" again to get fresh data. + MeasuredPodDataRetentionDays = 10 + // BigQueryRefreshInterval is how often we pull fresh data from BigQuery. + // We refresh once a day to keep our cache up to date with the latest measured pod metrics. + BigQueryRefreshInterval = 24 * time.Hour +) + +// MeasuredPodData holds what we learned about a pod when it ran in isolation. +// This tells us the real resource needs without interference from other workloads. +type MeasuredPodData struct { + // Metadata tells us which pod this is (org, repo, branch, container name, etc.) + Metadata podscaler.FullMetadata `json:"metadata"` + // MaxCPUUtilization is the highest CPU usage we saw when this pod ran alone (in cores). + // This is the real number - not limited by node contention. + MaxCPUUtilization float64 + // MaxMemoryUtilization is the highest memory usage we saw when this pod ran alone (in bytes). + // Again, this is the real number without interference. + MaxMemoryUtilization int64 + // LastMeasuredTime tells us when we last ran this pod in isolation. + // If it's been more than 10 days, we should measure it again. + LastMeasuredTime time.Time + // ContainerDurations tells us how long each container ran. + // We use this to find the longest-running container, which gets the resource increases. + ContainerDurations map[string]time.Duration +} + +// MeasuredPodCache keeps measured pod data in memory so we can quickly check if a pod needs measuring. +// We refresh this from BigQuery once a day, so it's always reasonably fresh. +type MeasuredPodCache struct { + mu sync.RWMutex + data map[podscaler.FullMetadata]*MeasuredPodData + logger *logrus.Entry +} + +// NewMeasuredPodCache creates a new cache for measured pod data +func NewMeasuredPodCache(logger *logrus.Entry) *MeasuredPodCache { + return &MeasuredPodCache{ + data: make(map[podscaler.FullMetadata]*MeasuredPodData), + logger: logger, + } +} + +// Get retrieves measured pod data for the given metadata +func (c *MeasuredPodCache) Get(meta podscaler.FullMetadata) (*MeasuredPodData, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + data, exists := c.data[meta] + return data, exists +} + +// Update updates the cache with new data +func (c *MeasuredPodCache) Update(data map[podscaler.FullMetadata]*MeasuredPodData) { + c.mu.Lock() + defer c.mu.Unlock() + c.data = data + c.logger.Infof("Updated measured pod cache with %d entries", len(data)) +} + +// BigQueryClient handles queries to BigQuery for measured pod data +type BigQueryClient struct { + client *bigquery.Client + projectID string + datasetID string + logger *logrus.Entry + cache *MeasuredPodCache + lastRefresh time.Time +} + +// NewBigQueryClient creates a new BigQuery client for measured pods +func NewBigQueryClient(projectID, datasetID, credentialsFile string, cache *MeasuredPodCache, logger *logrus.Entry) (*BigQueryClient, error) { + ctx := interrupts.Context() + var opts []option.ClientOption + if credentialsFile != "" { + opts = append(opts, option.WithCredentialsFile(credentialsFile)) + } + + client, err := bigquery.NewClient(ctx, projectID, opts...) + if err != nil { + return nil, fmt.Errorf("failed to create BigQuery client: %w", err) + } + + bq := &BigQueryClient{ + client: client, + projectID: projectID, + datasetID: datasetID, + logger: logger, + cache: cache, + } + + // Initial refresh on startup + if err := bq.Refresh(ctx); err != nil { + logger.WithError(err).Warn("Failed to refresh BigQuery data on startup") + } + + // Schedule daily refresh + interrupts.TickLiteral(func() { + if err := bq.Refresh(interrupts.Context()); err != nil { + logger.WithError(err).Error("Failed to refresh BigQuery data") + } + }, BigQueryRefreshInterval) + + return bq, nil +} + +// Refresh pulls the latest measured pod data from BigQuery and updates our cache. +// We call this on startup and then once per day to keep the data fresh. +func (bq *BigQueryClient) Refresh(ctx context.Context) error { + bq.logger.Info("Refreshing measured pod data from BigQuery") + + // TODO: Replace with actual BigQuery query based on ci-metrics structure. + // This is a placeholder query - the actual query will depend on the BigQuery schema + // for ci-metrics pod CPU utilization data. We need to query the table that stores + // max CPU/memory utilization for pods that ran with the "measured" label. + query := bq.client.Query(fmt.Sprintf(` + SELECT + org, + repo, + branch, + container, + MAX(cpu_utilization) as max_cpu, + MAX(memory_utilization) as max_memory, + MAX(timestamp) as last_measured, + ANY_VALUE(container_durations) as container_durations + FROM + `+"`%s.%s.pod_metrics`"+` + WHERE + pod_scaler_label = 'measured' + AND timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL %d DAY) + GROUP BY + org, repo, branch, container + `, bq.projectID, bq.datasetID, MeasuredPodDataRetentionDays)) + + query.QueryConfig.Labels = map[string]string{ + "client-application": "pod-scaler", + "query-details": "measured-pod-cpu-utilization", + } + + it, err := query.Read(ctx) + if err != nil { + return fmt.Errorf("failed to execute BigQuery query: %w", err) + } + + data := make(map[podscaler.FullMetadata]*MeasuredPodData) + for { + var row struct { + Org string `bigquery:"org"` + Repo string `bigquery:"repo"` + Branch string `bigquery:"branch"` + Container string `bigquery:"container"` + MaxCPU float64 `bigquery:"max_cpu"` + MaxMemory int64 `bigquery:"max_memory"` + LastMeasured time.Time `bigquery:"last_measured"` + ContainerDurations string `bigquery:"container_durations"` // JSON string + } + if err := it.Next(&row); err != nil { + if err == iterator.Done { + break + } + bq.logger.WithError(err).Warn("Failed to read BigQuery row") + continue + } + + meta := podscaler.FullMetadata{ + Metadata: api.Metadata{ + Org: row.Org, + Repo: row.Repo, + Branch: row.Branch, + }, + Container: row.Container, + } + + // TODO: Parse container_durations JSON string into map[string]time.Duration + containerDurations := make(map[string]time.Duration) + + data[meta] = &MeasuredPodData{ + Metadata: meta, + MaxCPUUtilization: row.MaxCPU, + MaxMemoryUtilization: row.MaxMemory, + LastMeasuredTime: row.LastMeasured, + ContainerDurations: containerDurations, + } + } + + bq.cache.Update(data) + bq.lastRefresh = time.Now() + bq.logger.Infof("Refreshed measured pod data: %d entries, last refresh: %v", len(data), bq.lastRefresh) + return nil +} + +// ShouldBeMeasured checks if we need to run this pod in isolation to measure it. +// We measure it if we've never measured it before, or if it's been more than 10 days +// since the last measurement (data gets stale). +func (bq *BigQueryClient) ShouldBeMeasured(meta podscaler.FullMetadata) bool { + data, exists := bq.cache.Get(meta) + if !exists { + // Never measured this pod before, so we should measure it now. + return true + } + + // If it's been more than 10 days, the data is stale and we should re-measure. + cutoff := time.Now().Add(-MeasuredPodDataRetentionDays * 24 * time.Hour) + return data.LastMeasuredTime.Before(cutoff) +} + +// GetMeasuredData returns the measured pod data for the given metadata +func (bq *BigQueryClient) GetMeasuredData(meta podscaler.FullMetadata) (*MeasuredPodData, bool) { + return bq.cache.Get(meta) +} + +// ClassifyPod decides whether this pod should run in isolation ("measured") or with others ("normal"). +// We check each container - if any container needs measuring, the whole pod gets the "measured" label. +// This label tells the scheduler to keep it away from other CI workloads. +func ClassifyPod(pod *corev1.Pod, bqClient *BigQueryClient, logger *logrus.Entry) { + if pod.Labels == nil { + pod.Labels = make(map[string]string) + } + + // Check each container to see if we need fresh measurement data for it. + // If any container needs measuring, we mark the whole pod as "measured". + shouldBeMeasured := false + if bqClient != nil { + for _, container := range pod.Spec.Containers { + fullMeta := podscaler.MetadataFor(pod.ObjectMeta.Labels, pod.ObjectMeta.Name, container.Name) + if bqClient.ShouldBeMeasured(fullMeta) { + // This container needs fresh data, so mark the pod as measured. + shouldBeMeasured = true + break + } + } + } else { + // If BigQuery isn't configured, default to measuring new pods. + shouldBeMeasured = true + } + + if shouldBeMeasured { + pod.Labels[PodScalerLabelKey] = PodScalerLabelValueMeasured + logger.Debugf("Classified pod as measured - will run on isolated node") + } else { + pod.Labels[PodScalerLabelKey] = PodScalerLabelValueNormal + logger.Debugf("Classified pod as normal - can share node with other workloads") + } +} + +// AddPodAntiAffinity sets up scheduling rules so measured pods get isolated nodes. +// Measured pods avoid ALL other pod-scaler labeled pods (they need the whole node). +// Normal pods avoid measured pods (so measured pods can have their isolation). +func AddPodAntiAffinity(pod *corev1.Pod, logger *logrus.Entry) { + podScalerLabel, hasLabel := pod.Labels[PodScalerLabelKey] + if !hasLabel { + logger.Debug("Pod does not have pod-scaler label, skipping anti-affinity") + return + } + + // Set up the affinity rules if they don't exist yet. + if pod.Spec.Affinity == nil { + pod.Spec.Affinity = &corev1.Affinity{} + } + if pod.Spec.Affinity.PodAntiAffinity == nil { + pod.Spec.Affinity.PodAntiAffinity = &corev1.PodAntiAffinity{} + } + + var requiredTerms []corev1.PodAffinityTerm + + if podScalerLabel == PodScalerLabelValueMeasured { + // Measured pods need complete isolation - they can't share a node with ANY other pod-scaler pod. + // This ensures they get the full node resources for accurate measurement. + requiredTerms = append(requiredTerms, corev1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: PodScalerLabelKey, + Operator: metav1.LabelSelectorOpExists, + }, + }, + }, + TopologyKey: "kubernetes.io/hostname", + }) + logger.Debug("Added podAntiAffinity for measured pod - will avoid all pod-scaler labeled pods") + } else if podScalerLabel == PodScalerLabelValueNormal { + // Normal pods stay away from measured pods so measured pods can have their isolation. + requiredTerms = append(requiredTerms, corev1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: PodScalerLabelKey, + Operator: metav1.LabelSelectorOpIn, + Values: []string{PodScalerLabelValueMeasured}, + }, + }, + }, + TopologyKey: "kubernetes.io/hostname", + }) + logger.Debug("Added podAntiAffinity for normal pod - will avoid measured pods") + } + + if len(requiredTerms) > 0 { + pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution = requiredTerms + } +} + +// ApplyMeasuredPodResources uses the real resource data we collected when this pod ran in isolation. +// We only increase resources for the longest-running container (the main workload), not all containers. +// This is based on actual measured usage, not Prometheus data that might be skewed by node contention. +func ApplyMeasuredPodResources(pod *corev1.Pod, bqClient *BigQueryClient, logger *logrus.Entry) { + if bqClient == nil { + return + } + + podScalerLabel, hasLabel := pod.Labels[PodScalerLabelKey] + if !hasLabel || podScalerLabel != PodScalerLabelValueMeasured { + // Only apply measured resources to pods that are actually being measured. + return + } + + // Find the container that runs the longest - that's the one that needs the resource increases. + // The other containers are usually sidecars or helpers that don't need as much. + var longestContainer *corev1.Container + var longestDuration time.Duration + + for i := range pod.Spec.Containers { + container := &pod.Spec.Containers[i] + fullMeta := podscaler.MetadataFor(pod.ObjectMeta.Labels, pod.ObjectMeta.Name, container.Name) + + measuredData, exists := bqClient.GetMeasuredData(fullMeta) + if !exists { + continue + } + + // Track which container ran the longest - that's our main workload. + if duration, ok := measuredData.ContainerDurations[container.Name]; ok { + if duration > longestDuration { + longestDuration = duration + longestContainer = container + } + } + } + + // If we don't have duration data, just use the first container as a fallback. + if longestContainer == nil && len(pod.Spec.Containers) > 0 { + longestContainer = &pod.Spec.Containers[0] + } + + if longestContainer == nil { + logger.Debug("No containers found for measured pod resource application") + return + } + + // Get the measured data for the longest-running container. + fullMeta := podscaler.MetadataFor(pod.ObjectMeta.Labels, pod.ObjectMeta.Name, longestContainer.Name) + + measuredData, exists := bqClient.GetMeasuredData(fullMeta) + if !exists { + logger.Debugf("No measured data for container %s", longestContainer.Name) + return + } + + // Set up the resource requests if they don't exist yet. + if longestContainer.Resources.Requests == nil { + longestContainer.Resources.Requests = corev1.ResourceList{} + } + + // Apply CPU request based on what we actually saw when it ran in isolation, plus 20% buffer for safety. + cpuRequest := measuredData.MaxCPUUtilization * 1.2 + if cpuRequest > 0 { + cpuQuantity := resource.NewMilliQuantity(int64(cpuRequest*1000), resource.DecimalSI) + longestContainer.Resources.Requests[corev1.ResourceCPU] = *cpuQuantity + logger.Debugf("Applied CPU request %v to container %s based on measured data", cpuQuantity, longestContainer.Name) + } + + // Apply memory request based on what we actually saw, plus 20% buffer for safety. + if measuredData.MaxMemoryUtilization > 0 { + memoryRequest := int64(float64(measuredData.MaxMemoryUtilization) * 1.2) + memoryQuantity := resource.NewQuantity(memoryRequest, resource.BinarySI) + longestContainer.Resources.Requests[corev1.ResourceMemory] = *memoryQuantity + logger.Debugf("Applied memory request %v to container %s based on measured data", memoryQuantity, longestContainer.Name) + } +} diff --git a/cmd/pod-scaler/measured_test.go b/cmd/pod-scaler/measured_test.go new file mode 100644 index 00000000000..763191f586b --- /dev/null +++ b/cmd/pod-scaler/measured_test.go @@ -0,0 +1,253 @@ +package main + +import ( + "testing" + "time" + + "github.com/sirupsen/logrus" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/openshift/ci-tools/pkg/api" + podscaler "github.com/openshift/ci-tools/pkg/pod-scaler" +) + +func TestShouldBeMeasured(t *testing.T) { + logger := logrus.WithField("test", "TestShouldBeMeasured") + cache := NewMeasuredPodCache(logger) + bqClient := &BigQueryClient{ + cache: cache, + logger: logger, + } + + meta := podscaler.FullMetadata{ + Metadata: api.Metadata{ + Org: "test-org", + Repo: "test-repo", + Branch: "main", + }, + Container: "test-container", + } + + // Test case 1: No data exists - should be measured + if !bqClient.ShouldBeMeasured(meta) { + t.Error("Expected pod to be measured when no data exists") + } + + // Test case 2: Recent data exists - should not be measured + recentData := map[podscaler.FullMetadata]*MeasuredPodData{ + meta: { + Metadata: meta, + LastMeasuredTime: time.Now().Add(-5 * 24 * time.Hour), // 5 days ago + }, + } + cache.Update(recentData) + if bqClient.ShouldBeMeasured(meta) { + t.Error("Expected pod not to be measured when recent data exists") + } + + // Test case 3: Stale data exists - should be measured + staleData := map[podscaler.FullMetadata]*MeasuredPodData{ + meta: { + Metadata: meta, + LastMeasuredTime: time.Now().Add(-15 * 24 * time.Hour), // 15 days ago + }, + } + cache.Update(staleData) + if !bqClient.ShouldBeMeasured(meta) { + t.Error("Expected pod to be measured when data is stale (>10 days)") + } +} + +func TestClassifyPod(t *testing.T) { + logger := logrus.WithField("test", "TestClassifyPod") + cache := NewMeasuredPodCache(logger) + bqClient := &BigQueryClient{ + cache: cache, + logger: logger, + } + + // Test case 1: Pod with no data - should be classified as measured + pod1 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-1", + Labels: map[string]string{ + "ci.openshift.io/metadata.org": "test-org", + "ci.openshift.io/metadata.repo": "test-repo", + "ci.openshift.io/metadata.branch": "main", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "test-container"}, + }, + }, + } + + ClassifyPod(pod1, bqClient, logger) + if pod1.Labels[PodScalerLabelKey] != PodScalerLabelValueMeasured { + t.Errorf("Expected pod to be classified as measured, got %s", pod1.Labels[PodScalerLabelKey]) + } + + // Test case 2: Pod with recent data - should be classified as normal + // Note: We need to use the same metadata structure that MetadataFor creates + meta2 := podscaler.MetadataFor( + map[string]string{ + "ci.openshift.io/metadata.org": "test-org", + "ci.openshift.io/metadata.repo": "test-repo", + "ci.openshift.io/metadata.branch": "main", + }, + "test-pod-2", + "test-container", + ) + recentData := map[podscaler.FullMetadata]*MeasuredPodData{ + meta2: { + Metadata: meta2, + LastMeasuredTime: time.Now().Add(-5 * 24 * time.Hour), // 5 days ago + }, + } + cache.Update(recentData) + + pod2 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-2", + Labels: map[string]string{ + "ci.openshift.io/metadata.org": "test-org", + "ci.openshift.io/metadata.repo": "test-repo", + "ci.openshift.io/metadata.branch": "main", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "test-container"}, + }, + }, + } + + ClassifyPod(pod2, bqClient, logger) + if pod2.Labels[PodScalerLabelKey] != PodScalerLabelValueNormal { + t.Errorf("Expected pod to be classified as normal, got %s", pod2.Labels[PodScalerLabelKey]) + } + + // Test case 3: Pod with nil BigQuery client - should default to measured + pod3 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-3", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "test-container"}, + }, + }, + } + + ClassifyPod(pod3, nil, logger) + if pod3.Labels[PodScalerLabelKey] != PodScalerLabelValueMeasured { + t.Errorf("Expected pod to be classified as measured when BigQuery client is nil, got %s", pod3.Labels[PodScalerLabelKey]) + } +} + +func TestAddPodAntiAffinity(t *testing.T) { + logger := logrus.WithField("test", "TestAddPodAntiAffinity") + + // Test case 1: Measured pod should avoid all pod-scaler labeled pods + measuredPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "measured-pod", + Labels: map[string]string{ + PodScalerLabelKey: PodScalerLabelValueMeasured, + }, + }, + Spec: corev1.PodSpec{}, + } + + AddPodAntiAffinity(measuredPod, logger) + if measuredPod.Spec.Affinity == nil { + t.Fatal("Expected affinity to be set") + } + if measuredPod.Spec.Affinity.PodAntiAffinity == nil { + t.Fatal("Expected podAntiAffinity to be set") + } + if len(measuredPod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) == 0 { + t.Fatal("Expected required anti-affinity terms to be set") + } + + // Test case 2: Normal pod should avoid measured pods + normalPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "normal-pod", + Labels: map[string]string{ + PodScalerLabelKey: PodScalerLabelValueNormal, + }, + }, + Spec: corev1.PodSpec{}, + } + + AddPodAntiAffinity(normalPod, logger) + if normalPod.Spec.Affinity == nil { + t.Fatal("Expected affinity to be set") + } + if normalPod.Spec.Affinity.PodAntiAffinity == nil { + t.Fatal("Expected podAntiAffinity to be set") + } + if len(normalPod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) == 0 { + t.Fatal("Expected required anti-affinity terms to be set") + } + + // Test case 3: Pod without label should not get anti-affinity + unlabeledPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "unlabeled-pod", + }, + Spec: corev1.PodSpec{}, + } + + AddPodAntiAffinity(unlabeledPod, logger) + if unlabeledPod.Spec.Affinity != nil { + t.Error("Expected affinity not to be set for unlabeled pod") + } +} + +func TestMeasuredPodCache(t *testing.T) { + logger := logrus.WithField("test", "TestMeasuredPodCache") + cache := NewMeasuredPodCache(logger) + + meta := podscaler.FullMetadata{ + Metadata: api.Metadata{ + Org: "test-org", + Repo: "test-repo", + Branch: "main", + }, + Container: "test-container", + } + + // Test Get with no data + _, exists := cache.Get(meta) + if exists { + t.Error("Expected no data to exist initially") + } + + // Test Update and Get + data := map[podscaler.FullMetadata]*MeasuredPodData{ + meta: { + Metadata: meta, + MaxCPUUtilization: 2.5, + MaxMemoryUtilization: 4 * 1024 * 1024 * 1024, // 4Gi + LastMeasuredTime: time.Now(), + ContainerDurations: make(map[string]time.Duration), + }, + } + cache.Update(data) + + retrieved, exists := cache.Get(meta) + if !exists { + t.Error("Expected data to exist after update") + } + if retrieved.MaxCPUUtilization != 2.5 { + t.Errorf("Expected MaxCPUUtilization to be 2.5, got %f", retrieved.MaxCPUUtilization) + } + if retrieved.MaxMemoryUtilization != 4*1024*1024*1024 { + t.Errorf("Expected MaxMemoryUtilization to be 4Gi, got %d", retrieved.MaxMemoryUtilization) + } +} From bfc4f68ef615314e08d5486ac3d988d65d63fcd3 Mon Sep 17 00:00:00 2001 From: Deep Mistry Date: Mon, 12 Jan 2026 16:13:27 -0500 Subject: [PATCH 2/2] Add data collection for measured pods to populate ci_operator_metrics table --- cmd/pod-scaler/admission.go | 44 +-- cmd/pod-scaler/admission_test.go | 2 +- cmd/pod-scaler/main.go | 62 +++- cmd/pod-scaler/measured.go | 143 +++++--- cmd/pod-scaler/measured_test.go | 6 +- cmd/pod-scaler/producer.go | 508 ++++++++++++++++++++++++++- pkg/manifestpusher/manifestpusher.go | 76 +++- 7 files changed, 719 insertions(+), 122 deletions(-) diff --git a/cmd/pod-scaler/admission.go b/cmd/pod-scaler/admission.go index 06d618429d1..f216af6defd 100644 --- a/cmd/pod-scaler/admission.go +++ b/cmd/pod-scaler/admission.go @@ -33,32 +33,18 @@ import ( "github.com/openshift/ci-tools/pkg/steps" ) -func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Interface, loaders map[string][]*cacheReloader, mutateResourceLimits bool, cpuCap int64, memoryCap string, cpuPriorityScheduling int64, authoritativeCPURequests, authoritativeMemoryRequests bool, enableMeasuredPods bool, bigQueryProjectID, bigQueryDatasetID, bigQueryCredentialsFile string, reporter results.PodScalerReporter) { +func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Interface, loaders map[string][]*cacheReloader, mutateResourceLimits bool, cpuCap int64, memoryCap string, cpuPriorityScheduling int64, bqClient *BigQueryClient, reporter results.PodScalerReporter) { logger := logrus.WithField("component", "pod-scaler admission") logger.Infof("Initializing admission webhook server with %d loaders.", len(loaders)) health := pjutil.NewHealthOnPort(healthPort) resources := newResourceServer(loaders, health) decoder := admission.NewDecoder(scheme.Scheme) - var bqClient *BigQueryClient - if enableMeasuredPods { - if bigQueryProjectID == "" || bigQueryDatasetID == "" { - logrus.Fatal("bigquery-project-id and bigquery-dataset-id are required when enable-measured-pods is true") - } - cache := NewMeasuredPodCache(logger) - var err error - bqClient, err = NewBigQueryClient(bigQueryProjectID, bigQueryDatasetID, bigQueryCredentialsFile, cache, logger) - if err != nil { - logrus.WithError(err).Fatal("Failed to create BigQuery client for measured pods") - } - logger.Info("Measured pods feature enabled with BigQuery integration") - } - server := webhook.NewServer(webhook.Options{ Port: port, CertDir: certDir, }) - server.Register("/pods", &webhook.Admission{Handler: &podMutator{logger: logger, client: client, decoder: decoder, resources: resources, mutateResourceLimits: mutateResourceLimits, cpuCap: cpuCap, memoryCap: memoryCap, cpuPriorityScheduling: cpuPriorityScheduling, authoritativeCPURequests: authoritativeCPURequests, authoritativeMemoryRequests: authoritativeMemoryRequests, bqClient: bqClient, reporter: reporter}}) + server.Register("/pods", &webhook.Admission{Handler: &podMutator{logger: logger, client: client, decoder: decoder, resources: resources, mutateResourceLimits: mutateResourceLimits, cpuCap: cpuCap, memoryCap: memoryCap, cpuPriorityScheduling: cpuPriorityScheduling, bqClient: bqClient, reporter: reporter}}) logger.Info("Serving admission webhooks.") if err := server.Start(interrupts.Context()); err != nil { logrus.WithError(err).Fatal("Failed to serve webhooks.") @@ -66,18 +52,16 @@ func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Int } type podMutator struct { - logger *logrus.Entry - client buildclientv1.BuildV1Interface - resources *resourceServer - mutateResourceLimits bool - decoder admission.Decoder - cpuCap int64 - memoryCap string - cpuPriorityScheduling int64 - authoritativeCPURequests bool - authoritativeMemoryRequests bool - bqClient *BigQueryClient - reporter results.PodScalerReporter + logger *logrus.Entry + client buildclientv1.BuildV1Interface + resources *resourceServer + mutateResourceLimits bool + decoder admission.Decoder + cpuCap int64 + memoryCap string + cpuPriorityScheduling int64 + bqClient *BigQueryClient + reporter results.PodScalerReporter } func (m *podMutator) Handle(ctx context.Context, req admission.Request) admission.Response { @@ -123,7 +107,7 @@ func (m *podMutator) Handle(ctx context.Context, req admission.Request) admissio ApplyMeasuredPodResources(pod, m.bqClient, logger) } - mutatePodResources(pod, m.resources, m.mutateResourceLimits, m.cpuCap, m.memoryCap, m.authoritativeCPURequests, m.authoritativeMemoryRequests, m.reporter, logger) + mutatePodResources(pod, m.resources, m.mutateResourceLimits, m.cpuCap, m.memoryCap, m.reporter, logger) m.addPriorityClass(pod) marshaledPod, err := json.Marshal(pod) @@ -318,7 +302,7 @@ func preventUnschedulable(resources *corev1.ResourceRequirements, cpuCap int64, } } -func mutatePodResources(pod *corev1.Pod, server *resourceServer, mutateResourceLimits bool, cpuCap int64, memoryCap string, authoritativeCPU, authoritativeMemory bool, reporter results.PodScalerReporter, logger *logrus.Entry) { +func mutatePodResources(pod *corev1.Pod, server *resourceServer, mutateResourceLimits bool, cpuCap int64, memoryCap string, reporter results.PodScalerReporter, logger *logrus.Entry) { mutateResources := func(containers []corev1.Container) { for i := range containers { meta := podscaler.MetadataFor(pod.ObjectMeta.Labels, pod.ObjectMeta.Name, containers[i].Name) diff --git a/cmd/pod-scaler/admission_test.go b/cmd/pod-scaler/admission_test.go index 16c957a3c34..beac4cdeff2 100644 --- a/cmd/pod-scaler/admission_test.go +++ b/cmd/pod-scaler/admission_test.go @@ -554,7 +554,7 @@ func TestMutatePodResources(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { original := testCase.pod.DeepCopy() - mutatePodResources(testCase.pod, testCase.server, testCase.mutateResourceLimits, 10, "20Gi", false, false, &defaultReporter, logrus.WithField("test", testCase.name)) + mutatePodResources(testCase.pod, testCase.server, testCase.mutateResourceLimits, 10, "20Gi", &defaultReporter, logrus.WithField("test", testCase.name)) diff := cmp.Diff(original, testCase.pod) // In some cases, cmp.Diff decides to use non-breaking spaces, and it's not // particularly deterministic about this. We don't care. diff --git a/cmd/pod-scaler/main.go b/cmd/pod-scaler/main.go index 34977df2e83..db844652266 100644 --- a/cmd/pod-scaler/main.go +++ b/cmd/pod-scaler/main.go @@ -52,9 +52,13 @@ type options struct { } type producerOptions struct { - kubernetesOptions prowflagutil.KubernetesOptions - once bool - ignoreLatest time.Duration + kubernetesOptions prowflagutil.KubernetesOptions + once bool + ignoreLatest time.Duration + enableMeasuredPods bool + bigQueryProjectID string + bigQueryDatasetID string + bigQueryCredentialsFile string } type consumerOptions struct { @@ -67,13 +71,6 @@ type consumerOptions struct { cpuCap int64 memoryCap string cpuPriorityScheduling int64 - - // Measured pods options - when enabled, pods are classified as "normal" or "measured" - // Measured pods run on isolated nodes to get accurate CPU/memory utilization data - enableMeasuredPods bool - bigQueryProjectID string - bigQueryDatasetID string - bigQueryCredentialsFile string } func bindOptions(fs *flag.FlagSet) *options { @@ -81,8 +78,12 @@ func bindOptions(fs *flag.FlagSet) *options { o.instrumentationOptions.AddFlags(fs) fs.StringVar(&o.mode, "mode", "", "Which mode to run in.") o.producerOptions.kubernetesOptions.AddFlags(fs) - fs.DurationVar(&o.ignoreLatest, "ignore-latest", 0, "Duration of latest time series to ignore when querying Prometheus. For instance, 1h will ignore the latest hour of data.") - fs.BoolVar(&o.once, "produce-once", false, "Query Prometheus and refresh cached data only once before exiting.") + fs.DurationVar(&o.producerOptions.ignoreLatest, "ignore-latest", 0, "Duration of latest time series to ignore when querying Prometheus. For instance, 1h will ignore the latest hour of data.") + fs.BoolVar(&o.producerOptions.once, "produce-once", false, "Query Prometheus and refresh cached data only once before exiting.") + fs.BoolVar(&o.producerOptions.enableMeasuredPods, "enable-measured-pods", false, "Enable measured pods feature. When enabled, pods are classified as 'normal' or 'measured' and measured pods run on isolated nodes to get accurate CPU/memory utilization data.") + fs.StringVar(&o.producerOptions.bigQueryProjectID, "bigquery-project-id", "", "Google Cloud project ID for BigQuery (required if enable-measured-pods is true)") + fs.StringVar(&o.producerOptions.bigQueryDatasetID, "bigquery-dataset-id", "", "BigQuery dataset ID for pod metrics (required if enable-measured-pods is true)") + fs.StringVar(&o.producerOptions.bigQueryCredentialsFile, "bigquery-credentials-file", "", "Path to Google Cloud credentials file for BigQuery access") fs.IntVar(&o.port, "port", 0, "Port to serve admission webhooks on.") fs.IntVar(&o.uiPort, "ui-port", 0, "Port to serve frontend on.") fs.StringVar(&o.certDir, "serving-cert-dir", "", "Path to directory with serving certificate and key for the admission webhook server.") @@ -96,10 +97,6 @@ func bindOptions(fs *flag.FlagSet) *options { fs.Int64Var(&o.cpuCap, "cpu-cap", 10, "The maximum CPU request value, ex: 10") fs.StringVar(&o.memoryCap, "memory-cap", "20Gi", "The maximum memory request value, ex: '20Gi'") fs.Int64Var(&o.cpuPriorityScheduling, "cpu-priority-scheduling", 8, "Pods with CPU requests at, or above, this value will be admitted with priority scheduling") - fs.BoolVar(&o.enableMeasuredPods, "enable-measured-pods", false, "Enable measured pods feature. When enabled, pods are classified as 'normal' or 'measured' and measured pods run on isolated nodes to get accurate CPU/memory utilization data.") - fs.StringVar(&o.bigQueryProjectID, "bigquery-project-id", "", "Google Cloud project ID for BigQuery queries (required if enable-measured-pods is true)") - fs.StringVar(&o.bigQueryDatasetID, "bigquery-dataset-id", "", "BigQuery dataset ID for pod metrics (required if enable-measured-pods is true)") - fs.StringVar(&o.bigQueryCredentialsFile, "bigquery-credentials-file", "", "Path to Google Cloud credentials file for BigQuery access") o.resultsOptions.Bind(fs) return &o } @@ -255,7 +252,21 @@ func mainProduce(opts *options, cache Cache) { logger.Debugf("Loaded Prometheus client.") } - produce(clients, cache, opts.ignoreLatest, opts.once) + var bqClient *BigQueryClient + if opts.producerOptions.enableMeasuredPods { + if opts.producerOptions.bigQueryProjectID == "" || opts.producerOptions.bigQueryDatasetID == "" { + logrus.Fatal("bigquery-project-id and bigquery-dataset-id are required when enable-measured-pods is true") + } + cache := NewMeasuredPodCache(logrus.WithField("component", "measured-pods-producer")) + var err error + bqClient, err = NewBigQueryClient(opts.producerOptions.bigQueryProjectID, opts.producerOptions.bigQueryDatasetID, opts.producerOptions.bigQueryCredentialsFile, cache, logrus.WithField("component", "measured-pods-producer")) + if err != nil { + logrus.WithError(err).Fatal("Failed to create BigQuery client for measured pods") + } + logrus.Info("Measured pods feature enabled with BigQuery integration") + } + + produce(clients, cache, opts.producerOptions.ignoreLatest, opts.producerOptions.once, bqClient) } @@ -279,7 +290,22 @@ func mainAdmission(opts *options, cache Cache) { logrus.WithError(err).Fatal("Failed to create pod-scaler reporter.") } - go admit(opts.port, opts.instrumentationOptions.HealthPort, opts.certDir, client, loaders(cache), opts.mutateResourceLimits, opts.cpuCap, opts.memoryCap, opts.cpuPriorityScheduling, false, false, opts.enableMeasuredPods, opts.bigQueryProjectID, opts.bigQueryDatasetID, opts.bigQueryCredentialsFile, reporter) + // Use producerOptions for measured pods config (shared between producer and consumer modes) + var bqClient *BigQueryClient + if opts.producerOptions.enableMeasuredPods { + if opts.producerOptions.bigQueryProjectID == "" || opts.producerOptions.bigQueryDatasetID == "" { + logrus.Fatal("bigquery-project-id and bigquery-dataset-id are required when enable-measured-pods is true") + } + cache := NewMeasuredPodCache(logrus.WithField("component", "measured-pods-admission")) + var err error + bqClient, err = NewBigQueryClient(opts.producerOptions.bigQueryProjectID, opts.producerOptions.bigQueryDatasetID, opts.producerOptions.bigQueryCredentialsFile, cache, logrus.WithField("component", "measured-pods-admission")) + if err != nil { + logrus.WithError(err).Fatal("Failed to create BigQuery client for measured pods") + } + logrus.Info("Measured pods feature enabled with BigQuery integration") + } + + go admit(opts.port, opts.instrumentationOptions.HealthPort, opts.certDir, client, loaders(cache), opts.mutateResourceLimits, opts.cpuCap, opts.memoryCap, opts.cpuPriorityScheduling, bqClient, reporter) } func loaders(cache Cache) map[string][]*cacheReloader { diff --git a/cmd/pod-scaler/measured.go b/cmd/pod-scaler/measured.go index 3b1b217e14f..a688063747e 100644 --- a/cmd/pod-scaler/measured.go +++ b/cmd/pod-scaler/measured.go @@ -2,6 +2,7 @@ package main import ( "context" + "encoding/json" "fmt" "sync" "time" @@ -36,6 +37,9 @@ const ( // BigQueryRefreshInterval is how often we pull fresh data from BigQuery. // We refresh once a day to keep our cache up to date with the latest measured pod metrics. BigQueryRefreshInterval = 24 * time.Hour + // MeasuredPodResourceBuffer is the safety buffer we add to measured resource utilization. + // We apply 20% buffer (1.2x) to measured CPU and memory usage to account for variability. + MeasuredPodResourceBuffer = 1.2 ) // MeasuredPodData holds what we learned about a pod when it ran in isolation. @@ -140,27 +144,27 @@ func NewBigQueryClient(projectID, datasetID, credentialsFile string, cache *Meas func (bq *BigQueryClient) Refresh(ctx context.Context) error { bq.logger.Info("Refreshing measured pod data from BigQuery") - // TODO: Replace with actual BigQuery query based on ci-metrics structure. - // This is a placeholder query - the actual query will depend on the BigQuery schema - // for ci-metrics pod CPU utilization data. We need to query the table that stores - // max CPU/memory utilization for pods that ran with the "measured" label. + // Query ci_operator_metrics table for measured pod data + // This queries the table that stores max CPU/memory utilization for pods that ran with the "measured" label query := bq.client.Query(fmt.Sprintf(` SELECT org, repo, branch, + target, container, - MAX(cpu_utilization) as max_cpu, - MAX(memory_utilization) as max_memory, - MAX(timestamp) as last_measured, + pod_name, + MAX(max_cpu) as max_cpu, + MAX(max_memory) as max_memory, + MAX(created) as last_measured, ANY_VALUE(container_durations) as container_durations FROM - `+"`%s.%s.pod_metrics`"+` + `+"`%s.%s.ci_operator_metrics`"+` WHERE pod_scaler_label = 'measured' - AND timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL %d DAY) + AND created >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL %d DAY) GROUP BY - org, repo, branch, container + org, repo, branch, target, container, pod_name `, bq.projectID, bq.datasetID, MeasuredPodDataRetentionDays)) query.QueryConfig.Labels = map[string]string{ @@ -179,7 +183,9 @@ func (bq *BigQueryClient) Refresh(ctx context.Context) error { Org string `bigquery:"org"` Repo string `bigquery:"repo"` Branch string `bigquery:"branch"` + Target string `bigquery:"target"` Container string `bigquery:"container"` + PodName string `bigquery:"pod_name"` MaxCPU float64 `bigquery:"max_cpu"` MaxMemory int64 `bigquery:"max_memory"` LastMeasured time.Time `bigquery:"last_measured"` @@ -199,11 +205,33 @@ func (bq *BigQueryClient) Refresh(ctx context.Context) error { Repo: row.Repo, Branch: row.Branch, }, + Target: row.Target, + Pod: row.PodName, Container: row.Container, } - // TODO: Parse container_durations JSON string into map[string]time.Duration + // Parse container_durations JSON string into map[string]time.Duration + // BigQuery stores this as a JSON string. time.Duration serializes as int64 (nanoseconds) + // so the format is: {"container1": 3600000000000, "container2": 245000000000} containerDurations := make(map[string]time.Duration) + if row.ContainerDurations != "" { + var durationsMap map[string]int64 + if err := json.Unmarshal([]byte(row.ContainerDurations), &durationsMap); err == nil { + for container, nanoseconds := range durationsMap { + containerDurations[container] = time.Duration(nanoseconds) + } + } else { + // Fallback: try parsing as string format (for backwards compatibility) + var durationsMapStr map[string]string + if err := json.Unmarshal([]byte(row.ContainerDurations), &durationsMapStr); err == nil { + for container, durationStr := range durationsMapStr { + if duration, err := time.ParseDuration(durationStr); err == nil { + containerDurations[container] = duration + } + } + } + } + } data[meta] = &MeasuredPodData{ Metadata: meta, @@ -248,30 +276,28 @@ func ClassifyPod(pod *corev1.Pod, bqClient *BigQueryClient, logger *logrus.Entry pod.Labels = make(map[string]string) } + // If BigQuery client is not available, default to normal to avoid overwhelming isolated nodes. + if bqClient == nil { + pod.Labels[PodScalerLabelKey] = PodScalerLabelValueNormal + logger.Debugf("Classified pod as normal - BigQuery client not available") + return + } + // Check each container to see if we need fresh measurement data for it. // If any container needs measuring, we mark the whole pod as "measured". - shouldBeMeasured := false - if bqClient != nil { - for _, container := range pod.Spec.Containers { - fullMeta := podscaler.MetadataFor(pod.ObjectMeta.Labels, pod.ObjectMeta.Name, container.Name) - if bqClient.ShouldBeMeasured(fullMeta) { - // This container needs fresh data, so mark the pod as measured. - shouldBeMeasured = true - break - } + for _, container := range pod.Spec.Containers { + fullMeta := podscaler.MetadataFor(pod.ObjectMeta.Labels, pod.ObjectMeta.Name, container.Name) + if bqClient.ShouldBeMeasured(fullMeta) { + // This container needs fresh data, so mark the pod as measured. + pod.Labels[PodScalerLabelKey] = PodScalerLabelValueMeasured + logger.Debugf("Classified pod as measured - will run on isolated node") + return } - } else { - // If BigQuery isn't configured, default to measuring new pods. - shouldBeMeasured = true } - if shouldBeMeasured { - pod.Labels[PodScalerLabelKey] = PodScalerLabelValueMeasured - logger.Debugf("Classified pod as measured - will run on isolated node") - } else { - pod.Labels[PodScalerLabelKey] = PodScalerLabelValueNormal - logger.Debugf("Classified pod as normal - can share node with other workloads") - } + // No container needs measuring, so mark as normal. + pod.Labels[PodScalerLabelKey] = PodScalerLabelValueNormal + logger.Debugf("Classified pod as normal - can share node with other workloads") } // AddPodAntiAffinity sets up scheduling rules so measured pods get isolated nodes. @@ -294,7 +320,8 @@ func AddPodAntiAffinity(pod *corev1.Pod, logger *logrus.Entry) { var requiredTerms []corev1.PodAffinityTerm - if podScalerLabel == PodScalerLabelValueMeasured { + switch podScalerLabel { + case PodScalerLabelValueMeasured: // Measured pods need complete isolation - they can't share a node with ANY other pod-scaler pod. // This ensures they get the full node resources for accurate measurement. requiredTerms = append(requiredTerms, corev1.PodAffinityTerm{ @@ -309,7 +336,7 @@ func AddPodAntiAffinity(pod *corev1.Pod, logger *logrus.Entry) { TopologyKey: "kubernetes.io/hostname", }) logger.Debug("Added podAntiAffinity for measured pod - will avoid all pod-scaler labeled pods") - } else if podScalerLabel == PodScalerLabelValueNormal { + case PodScalerLabelValueNormal: // Normal pods stay away from measured pods so measured pods can have their isolation. requiredTerms = append(requiredTerms, corev1.PodAffinityTerm{ LabelSelector: &metav1.LabelSelector{ @@ -326,22 +353,27 @@ func AddPodAntiAffinity(pod *corev1.Pod, logger *logrus.Entry) { logger.Debug("Added podAntiAffinity for normal pod - will avoid measured pods") } - if len(requiredTerms) > 0 { - pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution = requiredTerms + // Merge with existing anti-affinity terms instead of overwriting + existingTerms := pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution + if existingTerms != nil { + requiredTerms = append(existingTerms, requiredTerms...) } + pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution = requiredTerms } // ApplyMeasuredPodResources uses the real resource data we collected when this pod ran in isolation. // We only increase resources for the longest-running container (the main workload), not all containers. // This is based on actual measured usage, not Prometheus data that might be skewed by node contention. +// This function applies resources to pods labeled "normal" (which have measured data), not "measured" (which need measurement). func ApplyMeasuredPodResources(pod *corev1.Pod, bqClient *BigQueryClient, logger *logrus.Entry) { if bqClient == nil { return } podScalerLabel, hasLabel := pod.Labels[PodScalerLabelKey] - if !hasLabel || podScalerLabel != PodScalerLabelValueMeasured { - // Only apply measured resources to pods that are actually being measured. + // Apply measured resources to pods labeled "normal" (which have fresh measured data) + // Pods labeled "measured" don't have data yet - they're being measured for the first time. + if !hasLabel || podScalerLabel != PodScalerLabelValueNormal { return } @@ -349,41 +381,42 @@ func ApplyMeasuredPodResources(pod *corev1.Pod, bqClient *BigQueryClient, logger // The other containers are usually sidecars or helpers that don't need as much. var longestContainer *corev1.Container var longestDuration time.Duration + var measuredData *MeasuredPodData for i := range pod.Spec.Containers { container := &pod.Spec.Containers[i] fullMeta := podscaler.MetadataFor(pod.ObjectMeta.Labels, pod.ObjectMeta.Name, container.Name) - measuredData, exists := bqClient.GetMeasuredData(fullMeta) + containerMeasuredData, exists := bqClient.GetMeasuredData(fullMeta) if !exists { continue } // Track which container ran the longest - that's our main workload. - if duration, ok := measuredData.ContainerDurations[container.Name]; ok { + if duration, ok := containerMeasuredData.ContainerDurations[container.Name]; ok { if duration > longestDuration { longestDuration = duration longestContainer = container + measuredData = containerMeasuredData } } } - // If we don't have duration data, just use the first container as a fallback. + // If we don't have duration data, just use the first container with measured data as a fallback. if longestContainer == nil && len(pod.Spec.Containers) > 0 { - longestContainer = &pod.Spec.Containers[0] - } - - if longestContainer == nil { - logger.Debug("No containers found for measured pod resource application") - return + for i := range pod.Spec.Containers { + container := &pod.Spec.Containers[i] + fullMeta := podscaler.MetadataFor(pod.ObjectMeta.Labels, pod.ObjectMeta.Name, container.Name) + if containerMeasuredData, exists := bqClient.GetMeasuredData(fullMeta); exists { + longestContainer = container + measuredData = containerMeasuredData + break + } + } } - // Get the measured data for the longest-running container. - fullMeta := podscaler.MetadataFor(pod.ObjectMeta.Labels, pod.ObjectMeta.Name, longestContainer.Name) - - measuredData, exists := bqClient.GetMeasuredData(fullMeta) - if !exists { - logger.Debugf("No measured data for container %s", longestContainer.Name) + if longestContainer == nil || measuredData == nil { + logger.Debug("No containers with measured data found for measured pod resource application") return } @@ -392,17 +425,17 @@ func ApplyMeasuredPodResources(pod *corev1.Pod, bqClient *BigQueryClient, logger longestContainer.Resources.Requests = corev1.ResourceList{} } - // Apply CPU request based on what we actually saw when it ran in isolation, plus 20% buffer for safety. - cpuRequest := measuredData.MaxCPUUtilization * 1.2 + // Apply CPU request based on what we actually saw when it ran in isolation, with safety buffer. + cpuRequest := measuredData.MaxCPUUtilization * MeasuredPodResourceBuffer if cpuRequest > 0 { cpuQuantity := resource.NewMilliQuantity(int64(cpuRequest*1000), resource.DecimalSI) longestContainer.Resources.Requests[corev1.ResourceCPU] = *cpuQuantity logger.Debugf("Applied CPU request %v to container %s based on measured data", cpuQuantity, longestContainer.Name) } - // Apply memory request based on what we actually saw, plus 20% buffer for safety. + // Apply memory request based on what we actually saw, with safety buffer. if measuredData.MaxMemoryUtilization > 0 { - memoryRequest := int64(float64(measuredData.MaxMemoryUtilization) * 1.2) + memoryRequest := int64(float64(measuredData.MaxMemoryUtilization) * MeasuredPodResourceBuffer) memoryQuantity := resource.NewQuantity(memoryRequest, resource.BinarySI) longestContainer.Resources.Requests[corev1.ResourceMemory] = *memoryQuantity logger.Debugf("Applied memory request %v to container %s based on measured data", memoryQuantity, longestContainer.Name) diff --git a/cmd/pod-scaler/measured_test.go b/cmd/pod-scaler/measured_test.go index 763191f586b..107514e1232 100644 --- a/cmd/pod-scaler/measured_test.go +++ b/cmd/pod-scaler/measured_test.go @@ -130,7 +130,7 @@ func TestClassifyPod(t *testing.T) { t.Errorf("Expected pod to be classified as normal, got %s", pod2.Labels[PodScalerLabelKey]) } - // Test case 3: Pod with nil BigQuery client - should default to measured + // Test case 3: Pod with nil BigQuery client - should default to normal to avoid overwhelming isolated nodes pod3 := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "test-pod-3", @@ -143,8 +143,8 @@ func TestClassifyPod(t *testing.T) { } ClassifyPod(pod3, nil, logger) - if pod3.Labels[PodScalerLabelKey] != PodScalerLabelValueMeasured { - t.Errorf("Expected pod to be classified as measured when BigQuery client is nil, got %s", pod3.Labels[PodScalerLabelKey]) + if pod3.Labels[PodScalerLabelKey] != PodScalerLabelValueNormal { + t.Errorf("Expected pod to be classified as normal when BigQuery client is nil (to avoid overwhelming isolated nodes), got %s", pod3.Labels[PodScalerLabelKey]) } } diff --git a/cmd/pod-scaler/producer.go b/cmd/pod-scaler/producer.go index 786ce43972e..8fb1b8ce13c 100644 --- a/cmd/pod-scaler/producer.go +++ b/cmd/pod-scaler/producer.go @@ -2,12 +2,14 @@ package main import ( "context" + "encoding/json" "errors" "fmt" "strings" "sync" "time" + "cloud.google.com/go/bigquery" "github.com/openhistogram/circonusllhist" prometheusapi "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/prometheus/common/model" @@ -36,6 +38,20 @@ const ( StepsCachePrefix = "steps" ) +// escapePromQLLabelValue escapes special characters in PromQL label values. +// PromQL label values in selectors must be quoted and have backslashes and quotes escaped. +func escapePromQLLabelValue(value string) string { + // Replace backslashes first (before replacing quotes) + value = strings.ReplaceAll(value, `\`, `\\`) + // Escape double quotes + value = strings.ReplaceAll(value, `"`, `\"`) + // Escape newlines + value = strings.ReplaceAll(value, "\n", `\n`) + // Escape carriage returns + value = strings.ReplaceAll(value, "\r", `\r`) + return value +} + // queriesByMetric returns a mapping of Prometheus query by metric name for all queries we want to execute func queriesByMetric() map[string]string { queries := map[string]string{} @@ -70,7 +86,7 @@ func queriesByMetric() map[string]string { return queries } -func produce(clients map[string]prometheusapi.API, dataCache Cache, ignoreLatest time.Duration, once bool) { +func produce(clients map[string]prometheusapi.API, dataCache Cache, ignoreLatest time.Duration, once bool, bqClient *BigQueryClient) { var execute func(func()) if once { execute = func(f func()) { @@ -81,6 +97,35 @@ func produce(clients map[string]prometheusapi.API, dataCache Cache, ignoreLatest interrupts.TickLiteral(f, 2*time.Hour) } } + + // Run measured pod data collection on startup and then periodically if BigQuery client is available + if bqClient != nil { + // Use sync.Once to ensure the client is only closed once + var closeOnce sync.Once + closeClient := func() { + closeOnce.Do(func() { + if err := bqClient.client.Close(); err != nil { + logrus.WithError(err).Error("Failed to close BigQuery client") + } + }) + } + // Register cleanup handler to close BigQuery client on shutdown + // This ensures the client is closed when the program exits, not when produce() returns + // In non-once mode, this handles graceful shutdown. In once mode, this provides a safety net. + interrupts.OnInterrupt(closeClient) + + // Run measured pod data collection on startup and then periodically + execute(func() { + if err := collectMeasuredPodMetrics(interrupts.Context(), clients, bqClient, logrus.WithField("component", "measured-pods-collector")); err != nil { + logrus.WithError(err).Error("Failed to collect measured pod metrics") + } + // In once mode, close the client after work is done since the program will exit + if once { + closeClient() + } + }) + } + execute(func() { for name, query := range queriesByMetric() { name := name @@ -372,3 +417,464 @@ func (q *querier) executeOverRange(ctx context.Context, c *clusterMetadata, r pr q.lock.Unlock() logger.Debugf("Saved Prometheus response after %s.", time.Since(saveStart).Round(time.Second)) } + +// collectMeasuredPodMetrics queries Prometheus for completed measured pods and writes to BigQuery +func collectMeasuredPodMetrics(ctx context.Context, clients map[string]prometheusapi.API, bqClient *BigQueryClient, logger *logrus.Entry) error { + logger.Info("Starting measured pod data collection") + + // Query window: last 4 hours (to catch recently completed pods) + until := time.Now() + from := until.Add(-4 * time.Hour) + + // Query Prometheus for pods with pod-scaler=measured label + // kube_pod_labels metric exposes labels with a label_ prefix + measuredPodSelector := `{label_pod_scaler="measured",label_created_by_ci="true"}` + + // Query for pod labels to identify measured pods + podLabelsQuery := fmt.Sprintf(`kube_pod_labels%s`, measuredPodSelector) + + var allMeasuredPods []measuredPodData + for clusterName, client := range clients { + clusterLogger := logger.WithField("cluster", clusterName) + + // Query for pod labels to find measured pods + labelsResult, _, err := client.QueryRange(ctx, podLabelsQuery, prometheusapi.Range{ + Start: from, + End: until, + Step: 30 * time.Second, + }) + if err != nil { + clusterLogger.WithError(err).Warn("Failed to query pod labels") + continue + } + + // Extract pod names and metadata from labels + matrix, ok := labelsResult.(model.Matrix) + if !ok { + clusterLogger.Warn("Unexpected result type for pod labels query") + continue + } + + // For each measured pod, get CPU and memory usage + // Deduplicate pods by name to avoid processing the same pod multiple times + seenPods := make(map[string]bool) + for _, sampleStream := range matrix { + podName := string(sampleStream.Metric["pod"]) + namespace := string(sampleStream.Metric["namespace"]) + podKey := fmt.Sprintf("%s/%s", namespace, podName) + + // Skip if we've already processed this pod + if seenPods[podKey] { + continue + } + seenPods[podKey] = true + + podDataList, err := extractMeasuredPodData(ctx, client, sampleStream, from, until, clusterLogger) + if err != nil { + clusterLogger.WithError(err).Warn("Failed to extract measured pod data") + continue + } + if len(podDataList) > 0 { + allMeasuredPods = append(allMeasuredPods, podDataList...) + } + } + } + + // Write all collected data to BigQuery + if len(allMeasuredPods) > 0 { + if err := writeMeasuredPodsToBigQuery(ctx, bqClient.client, bqClient.projectID, bqClient.datasetID, allMeasuredPods, logger); err != nil { + return fmt.Errorf("failed to write measured pods to BigQuery: %w", err) + } + logger.Infof("Collected and wrote %d measured pod records to BigQuery", len(allMeasuredPods)) + } else { + logger.Info("No measured pods found in the query window") + } + + return nil +} + +type measuredPodData struct { + Org string + Repo string + Branch string + Target string + Container string + MinCPU float64 + MaxCPU float64 + MinMemory int64 + MaxMemory int64 + ContainerDurations map[string]time.Duration + NodeName string + PodName string + Timestamp time.Time + // Node-level metrics for validation (since pod is isolated, node metrics should match) + NodeMinCPU float64 + NodeMaxCPU float64 + NodeMinMemory int64 + NodeMaxMemory int64 +} + +func extractMeasuredPodData(ctx context.Context, client prometheusapi.API, sampleStream *model.SampleStream, from, until time.Time, logger *logrus.Entry) ([]measuredPodData, error) { + // Extract pod metadata from labels + podName := string(sampleStream.Metric["pod"]) + namespace := string(sampleStream.Metric["namespace"]) + org := string(sampleStream.Metric["label_ci_openshift_io_metadata_org"]) + repo := string(sampleStream.Metric["label_ci_openshift_io_metadata_repo"]) + branch := string(sampleStream.Metric["label_ci_openshift_io_metadata_branch"]) + target := string(sampleStream.Metric["label_ci_openshift_io_metadata_target"]) + + if org == "" || repo == "" { + // Skip pods without proper metadata + return nil, nil + } + + // Query Prometheus for pod metrics + cpuResult, memoryResult, containerInfoResult, err := queryPodMetrics(ctx, client, namespace, podName, from, until, logger) + if err != nil { + return nil, err + } + + // Process CPU and memory results to get min/max per container + minCPUByContainer, maxCPUByContainer := extractCPUUsage(cpuResult) + minMemoryByContainer, maxMemoryByContainer := extractMemoryUsage(memoryResult) + + // Get node name and query node-level metrics for validation + nodeName := string(sampleStream.Metric["node"]) + nodeMinCPU, nodeMaxCPU, nodeMinMemory, nodeMaxMemory := queryNodeMetrics(ctx, client, nodeName, from, until, logger) + + // Extract container durations + containerDurations := extractContainerDurations(containerInfoResult) + + // Build records for each container + records := buildMeasuredPodRecords(org, repo, branch, target, podName, nodeName, cpuResult, minCPUByContainer, maxCPUByContainer, minMemoryByContainer, maxMemoryByContainer, containerDurations, nodeMinCPU, nodeMaxCPU, nodeMinMemory, nodeMaxMemory, from, until) + + return records, nil +} + +// queryPodMetrics queries Prometheus for CPU, memory, and container info metrics +func queryPodMetrics(ctx context.Context, client prometheusapi.API, namespace, podName string, from, until time.Time, logger *logrus.Entry) (model.Value, model.Value, model.Value, error) { + // Escape label values to prevent PromQL injection + escapedNamespace := escapePromQLLabelValue(namespace) + escapedPodName := escapePromQLLabelValue(podName) + + queryRange := prometheusapi.Range{ + Start: from, + End: until, + Step: 30 * time.Second, + } + + // Query CPU usage for this pod - we'll compute min/max from the time series + // Using rate with a 3-minute window to get per-second CPU usage + cpuQuery := fmt.Sprintf(`rate(container_cpu_usage_seconds_total{namespace="%s",pod="%s",container!="POD",container!=""}[3m])`, escapedNamespace, escapedPodName) + cpuResult, _, err := client.QueryRange(ctx, cpuQuery, queryRange) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to query CPU: %w", err) + } + + // Query memory usage for this pod - we'll compute min/max from the time series + memoryQuery := fmt.Sprintf(`container_memory_working_set_bytes{namespace="%s",pod="%s",container!="POD",container!=""}`, escapedNamespace, escapedPodName) + memoryResult, _, err := client.QueryRange(ctx, memoryQuery, queryRange) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to query memory: %w", err) + } + + // Query container lifecycle info to get actual container durations + containerInfoQuery := fmt.Sprintf(`kube_pod_container_info{namespace="%s",pod="%s",container!="POD",container!=""}`, escapedNamespace, escapedPodName) + containerInfoResult, _, err := client.QueryRange(ctx, containerInfoQuery, queryRange) + if err != nil { + logger.WithError(err).Debug("Failed to query container info, will estimate duration") + } + + return cpuResult, memoryResult, containerInfoResult, nil +} + +// extractCPUUsage processes CPU query results and returns min/max CPU usage per container +func extractCPUUsage(cpuResult model.Value) (map[string]float64, map[string]float64) { + minCPUByContainer := make(map[string]float64) + maxCPUByContainer := make(map[string]float64) + + if cpuMatrix, ok := cpuResult.(model.Matrix); ok { + for _, stream := range cpuMatrix { + container := string(stream.Metric["container"]) + var minCPU, maxCPU float64 + first := true + for _, sample := range stream.Values { + cpuVal := float64(sample.Value) + if first { + minCPU = cpuVal + maxCPU = cpuVal + first = false + } else { + if cpuVal < minCPU { + minCPU = cpuVal + } + if cpuVal > maxCPU { + maxCPU = cpuVal + } + } + } + if maxCPU > 0 { + minCPUByContainer[container] = minCPU + maxCPUByContainer[container] = maxCPU + } + } + } + + return minCPUByContainer, maxCPUByContainer +} + +// extractMemoryUsage processes memory query results and returns min/max memory usage per container +func extractMemoryUsage(memoryResult model.Value) (map[string]int64, map[string]int64) { + minMemoryByContainer := make(map[string]int64) + maxMemoryByContainer := make(map[string]int64) + + if memoryMatrix, ok := memoryResult.(model.Matrix); ok { + for _, stream := range memoryMatrix { + container := string(stream.Metric["container"]) + var minMemory, maxMemory int64 + first := true + for _, sample := range stream.Values { + memVal := int64(sample.Value) + if first { + minMemory = memVal + maxMemory = memVal + first = false + } else { + if memVal < minMemory { + minMemory = memVal + } + if memVal > maxMemory { + maxMemory = memVal + } + } + } + if maxMemory > 0 { + minMemoryByContainer[container] = minMemory + maxMemoryByContainer[container] = maxMemory + } + } + } + + return minMemoryByContainer, maxMemoryByContainer +} + +// queryNodeMetrics queries node-level metrics for validation (since pod is isolated, node metrics should match pod metrics) +func queryNodeMetrics(ctx context.Context, client prometheusapi.API, nodeName string, from, until time.Time, logger *logrus.Entry) (float64, float64, int64, int64) { + var nodeMinCPU, nodeMaxCPU float64 + var nodeMinMemory, nodeMaxMemory int64 + + if nodeName == "" { + return nodeMinCPU, nodeMaxCPU, nodeMinMemory, nodeMaxMemory + } + + // Escape node name for PromQL + escapedNodeName := escapePromQLLabelValue(nodeName) + queryRange := prometheusapi.Range{ + Start: from, + End: until, + Step: 30 * time.Second, + } + + // Query node CPU utilization - sum all containers on the node + nodeCPUQuery := fmt.Sprintf(`sum(rate(container_cpu_usage_seconds_total{node="%s",container!="POD",container!=""}[3m]))`, escapedNodeName) + nodeCPUResult, _, err := client.QueryRange(ctx, nodeCPUQuery, queryRange) + if err == nil { + if nodeCPUMatrix, ok := nodeCPUResult.(model.Matrix); ok && len(nodeCPUMatrix) > 0 { + first := true + for _, stream := range nodeCPUMatrix { + for _, sample := range stream.Values { + cpuVal := float64(sample.Value) + if first { + nodeMinCPU = cpuVal + nodeMaxCPU = cpuVal + first = false + } else { + if cpuVal < nodeMinCPU { + nodeMinCPU = cpuVal + } + if cpuVal > nodeMaxCPU { + nodeMaxCPU = cpuVal + } + } + } + } + } + } else { + logger.WithError(err).Debug("Failed to query node CPU metrics for validation") + } + + // Query node memory utilization - sum all containers on the node + nodeMemoryQuery := fmt.Sprintf(`sum(container_memory_working_set_bytes{node="%s",container!="POD",container!=""})`, escapedNodeName) + nodeMemoryResult, _, err := client.QueryRange(ctx, nodeMemoryQuery, queryRange) + if err == nil { + if nodeMemoryMatrix, ok := nodeMemoryResult.(model.Matrix); ok && len(nodeMemoryMatrix) > 0 { + first := true + for _, stream := range nodeMemoryMatrix { + for _, sample := range stream.Values { + memVal := int64(sample.Value) + if first { + nodeMinMemory = memVal + nodeMaxMemory = memVal + first = false + } else { + if memVal < nodeMinMemory { + nodeMinMemory = memVal + } + if memVal > nodeMaxMemory { + nodeMaxMemory = memVal + } + } + } + } + } + } else { + logger.WithError(err).Debug("Failed to query node memory metrics for validation") + } + + return nodeMinCPU, nodeMaxCPU, nodeMinMemory, nodeMaxMemory +} + +// extractContainerDurations extracts container durations from container info query results +func extractContainerDurations(containerInfoResult model.Value) map[string]time.Duration { + containerDurations := make(map[string]time.Duration) + if containerInfoMatrix, ok := containerInfoResult.(model.Matrix); ok { + for _, stream := range containerInfoMatrix { + container := string(stream.Metric["container"]) + if len(stream.Values) > 0 { + // Container duration is from first to last sample + firstTime := stream.Values[0].Timestamp.Time() + lastTime := stream.Values[len(stream.Values)-1].Timestamp.Time() + containerDurations[container] = lastTime.Sub(firstTime) + } + } + } + return containerDurations +} + +// buildMeasuredPodRecords builds measuredPodData records for each container +func buildMeasuredPodRecords(org, repo, branch, target, podName, nodeName string, cpuResult model.Value, minCPUByContainer, maxCPUByContainer map[string]float64, minMemoryByContainer, maxMemoryByContainer map[string]int64, containerDurations map[string]time.Duration, nodeMinCPU, nodeMaxCPU float64, nodeMinMemory, nodeMaxMemory int64, from, until time.Time) []measuredPodData { + var records []measuredPodData + + for container, maxCPU := range maxCPUByContainer { + minCPU := minCPUByContainer[container] + maxMemory := maxMemoryByContainer[container] + minMemory := minMemoryByContainer[container] + + // Use actual container duration if available, otherwise estimate + containerDuration := containerDurations[container] + if containerDuration == 0 { + // Estimate based on query window if we don't have container info + containerDuration = until.Sub(from) + } + + // Determine pod execution timestamp (use first sample time if available) + podTimestamp := until + if cpuMatrix, ok := cpuResult.(model.Matrix); ok { + for _, stream := range cpuMatrix { + if string(stream.Metric["container"]) == container && len(stream.Values) > 0 { + podTimestamp = stream.Values[0].Timestamp.Time() + break + } + } + } + + // Use all container durations for this pod, not just the current container + // This gives us complete duration information for all containers in the pod + podContainerDurations := make(map[string]time.Duration) + for c, d := range containerDurations { + podContainerDurations[c] = d + } + // If this container's duration isn't in the map, add it + if _, exists := podContainerDurations[container]; !exists { + podContainerDurations[container] = containerDuration + } + + records = append(records, measuredPodData{ + Org: org, + Repo: repo, + Branch: branch, + Target: target, + Container: container, + MinCPU: minCPU, + MaxCPU: maxCPU, + MinMemory: minMemory, + MaxMemory: maxMemory, + ContainerDurations: podContainerDurations, + NodeName: nodeName, + PodName: podName, + Timestamp: podTimestamp, + NodeMinCPU: nodeMinCPU, + NodeMaxCPU: nodeMaxCPU, + NodeMinMemory: nodeMinMemory, + NodeMaxMemory: nodeMaxMemory, + }) + } + + return records +} + +type bigQueryPodMetricsRow struct { + Org string `bigquery:"org"` + Repo string `bigquery:"repo"` + Branch string `bigquery:"branch"` + Target string `bigquery:"target"` + Container string `bigquery:"container"` + PodName string `bigquery:"pod_name"` + PodScalerLabel string `bigquery:"pod_scaler_label"` + MinCPU float64 `bigquery:"min_cpu"` + MaxCPU float64 `bigquery:"max_cpu"` + MinMemory int64 `bigquery:"min_memory"` + MaxMemory int64 `bigquery:"max_memory"` + ContainerDurations string `bigquery:"container_durations"` + NodeName string `bigquery:"node_name"` + NodeMinCPU float64 `bigquery:"node_min_cpu"` + NodeMaxCPU float64 `bigquery:"node_max_cpu"` + NodeMinMemory int64 `bigquery:"node_min_memory"` + NodeMaxMemory int64 `bigquery:"node_max_memory"` + Created time.Time `bigquery:"created"` + LastMeasured time.Time `bigquery:"last_measured"` +} + +func writeMeasuredPodsToBigQuery(ctx context.Context, bqClient *bigquery.Client, _ /* projectID */, datasetID string, pods []measuredPodData, logger *logrus.Entry) error { + // Use ci_operator_metrics table with additional fields for measured pod data + inserter := bqClient.Dataset(datasetID).Table("ci_operator_metrics").Inserter() + + // Convert to BigQuery rows + rows := make([]*bigQueryPodMetricsRow, 0, len(pods)) + for _, pod := range pods { + // Serialize container durations as JSON + durationsJSON, err := json.Marshal(pod.ContainerDurations) + if err != nil { + logger.WithError(err).Warn("Failed to marshal container durations") + continue + } + + rows = append(rows, &bigQueryPodMetricsRow{ + Org: pod.Org, + Repo: pod.Repo, + Branch: pod.Branch, + Target: pod.Target, + Container: pod.Container, + PodName: pod.PodName, + PodScalerLabel: "measured", + MinCPU: pod.MinCPU, + MaxCPU: pod.MaxCPU, + MinMemory: pod.MinMemory, + MaxMemory: pod.MaxMemory, + ContainerDurations: string(durationsJSON), + NodeName: pod.NodeName, + NodeMinCPU: pod.NodeMinCPU, + NodeMaxCPU: pod.NodeMaxCPU, + NodeMinMemory: pod.NodeMinMemory, + NodeMaxMemory: pod.NodeMaxMemory, + Created: pod.Timestamp, + LastMeasured: pod.Timestamp, + }) + } + + if err := inserter.Put(ctx, rows); err != nil { + return fmt.Errorf("failed to insert rows: %w", err) + } + + return nil +} diff --git a/pkg/manifestpusher/manifestpusher.go b/pkg/manifestpusher/manifestpusher.go index f0381b4b7e8..103282164b4 100644 --- a/pkg/manifestpusher/manifestpusher.go +++ b/pkg/manifestpusher/manifestpusher.go @@ -2,12 +2,16 @@ package manifestpusher import ( "fmt" + "strings" + "time" "github.com/estesp/manifest-tool/v2/pkg/registry" "github.com/estesp/manifest-tool/v2/pkg/types" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/util/wait" + buildv1 "github.com/openshift/api/build/v1" ) @@ -37,29 +41,73 @@ func (m manifestPusher) PushImageWithManifest(builds []buildv1.Build, targetImag srcImages := []types.ManifestEntry{} for _, build := range builds { + arch := build.Spec.NodeSelector[nodeArchitectureLabel] + if arch == "" { + return fmt.Errorf("build %s has no architecture label in nodeSelector", build.Name) + } + imageRef := fmt.Sprintf("%s/%s/%s", m.registryURL, build.Spec.Output.To.Namespace, build.Spec.Output.To.Name) + m.logger.Infof("Adding architecture %s: %s", arch, imageRef) srcImages = append(srcImages, types.ManifestEntry{ - Image: fmt.Sprintf("%s/%s/%s", m.registryURL, build.Spec.Output.To.Namespace, build.Spec.Output.To.Name), + Image: imageRef, Platform: ocispec.Platform{ OS: "linux", - Architecture: build.Spec.NodeSelector[nodeArchitectureLabel], + Architecture: arch, }, }) } - digest, _, err := registry.PushManifestList( - "", // username: we don't we use basic auth - "", // password: " - types.YAMLInput{Image: fmt.Sprintf("%s/%s", m.registryURL, targetImageRef), Manifests: srcImages}, - false, // --ignore-missing. We don't want to ignore missing images. - true, // --insecure to allow pushing to the local registry. - false, // --plain-http is false by default in manifest-tool. False for the OpenShift registry. - types.Docker, // we only need docker type manifest. - m.dockercfgPath, - ) + if len(srcImages) == 0 { + return fmt.Errorf("no source images to create manifest list for %s", targetImageRef) + } + + targetImage := fmt.Sprintf("%s/%s", m.registryURL, targetImageRef) + m.logger.Infof("Creating manifest list for %s with %d architectures", targetImage, len(srcImages)) + + // Wait for all images to be available in the registry before creating the manifest list. + // There's a race condition where builds are marked complete but images aren't fully + // available in the registry yet. We retry with exponential backoff to handle this. + backoff := wait.Backoff{ + Duration: 5 * time.Second, + Factor: 1.5, + Steps: 10, // Max ~2 minutes total wait time + } + + var digest string + var length int + var err error + + err = wait.ExponentialBackoff(backoff, func() (bool, error) { + digest, length, err = registry.PushManifestList( + "", // username: we don't we use basic auth + "", // password: " + types.YAMLInput{Image: targetImage, Manifests: srcImages}, + false, // --ignore-missing. We don't want to ignore missing images. + true, // --insecure to allow pushing to the local registry. + false, // --plain-http is false by default in manifest-tool. False for the OpenShift registry. + types.Docker, // we only need docker type manifest. + m.dockercfgPath, + ) + if err != nil { + // Check if the error indicates missing images (common race condition) + errStr := err.Error() + if strings.Contains(errStr, "no image found in manifest list") || + strings.Contains(errStr, "inspect of image") || + strings.Contains(errStr, "failed to pull image") || + strings.Contains(errStr, "choosing an image from manifest list") || + strings.Contains(errStr, "PullBuilderImageFailed") { + m.logger.Warnf("Images not yet available in registry, retrying: %v", err) + return false, nil // Retry + } + // For other errors, fail immediately + return false, err + } + return true, nil + }) + if err != nil { - return err + return fmt.Errorf("failed to push manifest list for %s after retries: %w", targetImageRef, err) } - m.logger.WithField("digest", digest).Infof("Image %s created", targetImageRef) + m.logger.WithField("digest", digest).WithField("length", length).Infof("Successfully created manifest list for %s", targetImageRef) return nil }