diff --git a/cmd/pod-scaler/admission.go b/cmd/pod-scaler/admission.go index f17d62e1e3f..f216af6defd 100644 --- a/cmd/pod-scaler/admission.go +++ b/cmd/pod-scaler/admission.go @@ -33,7 +33,7 @@ import ( "github.com/openshift/ci-tools/pkg/steps" ) -func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Interface, loaders map[string][]*cacheReloader, mutateResourceLimits bool, cpuCap int64, memoryCap string, cpuPriorityScheduling int64, reporter results.PodScalerReporter) { +func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Interface, loaders map[string][]*cacheReloader, mutateResourceLimits bool, cpuCap int64, memoryCap string, cpuPriorityScheduling int64, bqClient *BigQueryClient, reporter results.PodScalerReporter) { logger := logrus.WithField("component", "pod-scaler admission") logger.Infof("Initializing admission webhook server with %d loaders.", len(loaders)) health := pjutil.NewHealthOnPort(healthPort) @@ -44,7 +44,7 @@ func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Int Port: port, CertDir: certDir, }) - server.Register("/pods", &webhook.Admission{Handler: &podMutator{logger: logger, client: client, decoder: decoder, resources: resources, mutateResourceLimits: mutateResourceLimits, cpuCap: cpuCap, memoryCap: memoryCap, cpuPriorityScheduling: cpuPriorityScheduling, reporter: reporter}}) + server.Register("/pods", &webhook.Admission{Handler: &podMutator{logger: logger, client: client, decoder: decoder, resources: resources, mutateResourceLimits: mutateResourceLimits, cpuCap: cpuCap, memoryCap: memoryCap, cpuPriorityScheduling: cpuPriorityScheduling, bqClient: bqClient, reporter: reporter}}) logger.Info("Serving admission webhooks.") if err := server.Start(interrupts.Context()); err != nil { logrus.WithError(err).Fatal("Failed to serve webhooks.") @@ -60,6 +60,7 @@ type podMutator struct { cpuCap int64 memoryCap string cpuPriorityScheduling int64 + bqClient *BigQueryClient reporter results.PodScalerReporter } @@ -97,6 +98,15 @@ func (m *podMutator) Handle(ctx context.Context, req admission.Request) admissio logger.WithError(err).Error("Failed to handle rehearsal Pod.") return admission.Allowed("Failed to handle rehearsal Pod, ignoring.") } + + // Classify pod as normal or measured (if enabled) + if m.bqClient != nil { + ClassifyPod(pod, m.bqClient, logger) + AddPodAntiAffinity(pod, logger) + // Apply measured pod resources before regular resource mutation + ApplyMeasuredPodResources(pod, m.bqClient, logger) + } + mutatePodResources(pod, m.resources, m.mutateResourceLimits, m.cpuCap, m.memoryCap, m.reporter, logger) m.addPriorityClass(pod) diff --git a/cmd/pod-scaler/main.go b/cmd/pod-scaler/main.go index 03f7758a5d8..db844652266 100644 --- a/cmd/pod-scaler/main.go +++ b/cmd/pod-scaler/main.go @@ -52,9 +52,13 @@ type options struct { } type producerOptions struct { - kubernetesOptions prowflagutil.KubernetesOptions - once bool - ignoreLatest time.Duration + kubernetesOptions prowflagutil.KubernetesOptions + once bool + ignoreLatest time.Duration + enableMeasuredPods bool + bigQueryProjectID string + bigQueryDatasetID string + bigQueryCredentialsFile string } type consumerOptions struct { @@ -74,8 +78,12 @@ func bindOptions(fs *flag.FlagSet) *options { o.instrumentationOptions.AddFlags(fs) fs.StringVar(&o.mode, "mode", "", "Which mode to run in.") o.producerOptions.kubernetesOptions.AddFlags(fs) - fs.DurationVar(&o.ignoreLatest, "ignore-latest", 0, "Duration of latest time series to ignore when querying Prometheus. For instance, 1h will ignore the latest hour of data.") - fs.BoolVar(&o.once, "produce-once", false, "Query Prometheus and refresh cached data only once before exiting.") + fs.DurationVar(&o.producerOptions.ignoreLatest, "ignore-latest", 0, "Duration of latest time series to ignore when querying Prometheus. For instance, 1h will ignore the latest hour of data.") + fs.BoolVar(&o.producerOptions.once, "produce-once", false, "Query Prometheus and refresh cached data only once before exiting.") + fs.BoolVar(&o.producerOptions.enableMeasuredPods, "enable-measured-pods", false, "Enable measured pods feature. When enabled, pods are classified as 'normal' or 'measured' and measured pods run on isolated nodes to get accurate CPU/memory utilization data.") + fs.StringVar(&o.producerOptions.bigQueryProjectID, "bigquery-project-id", "", "Google Cloud project ID for BigQuery (required if enable-measured-pods is true)") + fs.StringVar(&o.producerOptions.bigQueryDatasetID, "bigquery-dataset-id", "", "BigQuery dataset ID for pod metrics (required if enable-measured-pods is true)") + fs.StringVar(&o.producerOptions.bigQueryCredentialsFile, "bigquery-credentials-file", "", "Path to Google Cloud credentials file for BigQuery access") fs.IntVar(&o.port, "port", 0, "Port to serve admission webhooks on.") fs.IntVar(&o.uiPort, "ui-port", 0, "Port to serve frontend on.") fs.StringVar(&o.certDir, "serving-cert-dir", "", "Path to directory with serving certificate and key for the admission webhook server.") @@ -244,7 +252,21 @@ func mainProduce(opts *options, cache Cache) { logger.Debugf("Loaded Prometheus client.") } - produce(clients, cache, opts.ignoreLatest, opts.once) + var bqClient *BigQueryClient + if opts.producerOptions.enableMeasuredPods { + if opts.producerOptions.bigQueryProjectID == "" || opts.producerOptions.bigQueryDatasetID == "" { + logrus.Fatal("bigquery-project-id and bigquery-dataset-id are required when enable-measured-pods is true") + } + cache := NewMeasuredPodCache(logrus.WithField("component", "measured-pods-producer")) + var err error + bqClient, err = NewBigQueryClient(opts.producerOptions.bigQueryProjectID, opts.producerOptions.bigQueryDatasetID, opts.producerOptions.bigQueryCredentialsFile, cache, logrus.WithField("component", "measured-pods-producer")) + if err != nil { + logrus.WithError(err).Fatal("Failed to create BigQuery client for measured pods") + } + logrus.Info("Measured pods feature enabled with BigQuery integration") + } + + produce(clients, cache, opts.producerOptions.ignoreLatest, opts.producerOptions.once, bqClient) } @@ -268,7 +290,22 @@ func mainAdmission(opts *options, cache Cache) { logrus.WithError(err).Fatal("Failed to create pod-scaler reporter.") } - go admit(opts.port, opts.instrumentationOptions.HealthPort, opts.certDir, client, loaders(cache), opts.mutateResourceLimits, opts.cpuCap, opts.memoryCap, opts.cpuPriorityScheduling, reporter) + // Use producerOptions for measured pods config (shared between producer and consumer modes) + var bqClient *BigQueryClient + if opts.producerOptions.enableMeasuredPods { + if opts.producerOptions.bigQueryProjectID == "" || opts.producerOptions.bigQueryDatasetID == "" { + logrus.Fatal("bigquery-project-id and bigquery-dataset-id are required when enable-measured-pods is true") + } + cache := NewMeasuredPodCache(logrus.WithField("component", "measured-pods-admission")) + var err error + bqClient, err = NewBigQueryClient(opts.producerOptions.bigQueryProjectID, opts.producerOptions.bigQueryDatasetID, opts.producerOptions.bigQueryCredentialsFile, cache, logrus.WithField("component", "measured-pods-admission")) + if err != nil { + logrus.WithError(err).Fatal("Failed to create BigQuery client for measured pods") + } + logrus.Info("Measured pods feature enabled with BigQuery integration") + } + + go admit(opts.port, opts.instrumentationOptions.HealthPort, opts.certDir, client, loaders(cache), opts.mutateResourceLimits, opts.cpuCap, opts.memoryCap, opts.cpuPriorityScheduling, bqClient, reporter) } func loaders(cache Cache) map[string][]*cacheReloader { diff --git a/cmd/pod-scaler/measured.go b/cmd/pod-scaler/measured.go new file mode 100644 index 00000000000..a688063747e --- /dev/null +++ b/cmd/pod-scaler/measured.go @@ -0,0 +1,443 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "time" + + "cloud.google.com/go/bigquery" + "github.com/sirupsen/logrus" + "google.golang.org/api/iterator" + "google.golang.org/api/option" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/prow/pkg/interrupts" + + "github.com/openshift/ci-tools/pkg/api" + podscaler "github.com/openshift/ci-tools/pkg/pod-scaler" +) + +const ( + // PodScalerLabelKey is the label we use to mark pods as either "normal" or "measured". + // This helps the scheduler know which pods can share nodes and which need isolation. + PodScalerLabelKey = "pod-scaler" + // PodScalerLabelValueNormal means this pod can run alongside other CI workloads. + // These pods use the regular resource recommendations from Prometheus data. + PodScalerLabelValueNormal = "normal" + // PodScalerLabelValueMeasured means this pod needs to run on an isolated node (no other CI pods). + // We do this so we can accurately measure what resources it actually uses without interference. + PodScalerLabelValueMeasured = "measured" + // MeasuredPodDataRetentionDays is how long we trust measured pod data before we need to re-measure. + // After 10 days, we mark the pod as "measured" again to get fresh data. + MeasuredPodDataRetentionDays = 10 + // BigQueryRefreshInterval is how often we pull fresh data from BigQuery. + // We refresh once a day to keep our cache up to date with the latest measured pod metrics. + BigQueryRefreshInterval = 24 * time.Hour + // MeasuredPodResourceBuffer is the safety buffer we add to measured resource utilization. + // We apply 20% buffer (1.2x) to measured CPU and memory usage to account for variability. + MeasuredPodResourceBuffer = 1.2 +) + +// MeasuredPodData holds what we learned about a pod when it ran in isolation. +// This tells us the real resource needs without interference from other workloads. +type MeasuredPodData struct { + // Metadata tells us which pod this is (org, repo, branch, container name, etc.) + Metadata podscaler.FullMetadata `json:"metadata"` + // MaxCPUUtilization is the highest CPU usage we saw when this pod ran alone (in cores). + // This is the real number - not limited by node contention. + MaxCPUUtilization float64 + // MaxMemoryUtilization is the highest memory usage we saw when this pod ran alone (in bytes). + // Again, this is the real number without interference. + MaxMemoryUtilization int64 + // LastMeasuredTime tells us when we last ran this pod in isolation. + // If it's been more than 10 days, we should measure it again. + LastMeasuredTime time.Time + // ContainerDurations tells us how long each container ran. + // We use this to find the longest-running container, which gets the resource increases. + ContainerDurations map[string]time.Duration +} + +// MeasuredPodCache keeps measured pod data in memory so we can quickly check if a pod needs measuring. +// We refresh this from BigQuery once a day, so it's always reasonably fresh. +type MeasuredPodCache struct { + mu sync.RWMutex + data map[podscaler.FullMetadata]*MeasuredPodData + logger *logrus.Entry +} + +// NewMeasuredPodCache creates a new cache for measured pod data +func NewMeasuredPodCache(logger *logrus.Entry) *MeasuredPodCache { + return &MeasuredPodCache{ + data: make(map[podscaler.FullMetadata]*MeasuredPodData), + logger: logger, + } +} + +// Get retrieves measured pod data for the given metadata +func (c *MeasuredPodCache) Get(meta podscaler.FullMetadata) (*MeasuredPodData, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + data, exists := c.data[meta] + return data, exists +} + +// Update updates the cache with new data +func (c *MeasuredPodCache) Update(data map[podscaler.FullMetadata]*MeasuredPodData) { + c.mu.Lock() + defer c.mu.Unlock() + c.data = data + c.logger.Infof("Updated measured pod cache with %d entries", len(data)) +} + +// BigQueryClient handles queries to BigQuery for measured pod data +type BigQueryClient struct { + client *bigquery.Client + projectID string + datasetID string + logger *logrus.Entry + cache *MeasuredPodCache + lastRefresh time.Time +} + +// NewBigQueryClient creates a new BigQuery client for measured pods +func NewBigQueryClient(projectID, datasetID, credentialsFile string, cache *MeasuredPodCache, logger *logrus.Entry) (*BigQueryClient, error) { + ctx := interrupts.Context() + var opts []option.ClientOption + if credentialsFile != "" { + opts = append(opts, option.WithCredentialsFile(credentialsFile)) + } + + client, err := bigquery.NewClient(ctx, projectID, opts...) + if err != nil { + return nil, fmt.Errorf("failed to create BigQuery client: %w", err) + } + + bq := &BigQueryClient{ + client: client, + projectID: projectID, + datasetID: datasetID, + logger: logger, + cache: cache, + } + + // Initial refresh on startup + if err := bq.Refresh(ctx); err != nil { + logger.WithError(err).Warn("Failed to refresh BigQuery data on startup") + } + + // Schedule daily refresh + interrupts.TickLiteral(func() { + if err := bq.Refresh(interrupts.Context()); err != nil { + logger.WithError(err).Error("Failed to refresh BigQuery data") + } + }, BigQueryRefreshInterval) + + return bq, nil +} + +// Refresh pulls the latest measured pod data from BigQuery and updates our cache. +// We call this on startup and then once per day to keep the data fresh. +func (bq *BigQueryClient) Refresh(ctx context.Context) error { + bq.logger.Info("Refreshing measured pod data from BigQuery") + + // Query ci_operator_metrics table for measured pod data + // This queries the table that stores max CPU/memory utilization for pods that ran with the "measured" label + query := bq.client.Query(fmt.Sprintf(` + SELECT + org, + repo, + branch, + target, + container, + pod_name, + MAX(max_cpu) as max_cpu, + MAX(max_memory) as max_memory, + MAX(created) as last_measured, + ANY_VALUE(container_durations) as container_durations + FROM + `+"`%s.%s.ci_operator_metrics`"+` + WHERE + pod_scaler_label = 'measured' + AND created >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL %d DAY) + GROUP BY + org, repo, branch, target, container, pod_name + `, bq.projectID, bq.datasetID, MeasuredPodDataRetentionDays)) + + query.QueryConfig.Labels = map[string]string{ + "client-application": "pod-scaler", + "query-details": "measured-pod-cpu-utilization", + } + + it, err := query.Read(ctx) + if err != nil { + return fmt.Errorf("failed to execute BigQuery query: %w", err) + } + + data := make(map[podscaler.FullMetadata]*MeasuredPodData) + for { + var row struct { + Org string `bigquery:"org"` + Repo string `bigquery:"repo"` + Branch string `bigquery:"branch"` + Target string `bigquery:"target"` + Container string `bigquery:"container"` + PodName string `bigquery:"pod_name"` + MaxCPU float64 `bigquery:"max_cpu"` + MaxMemory int64 `bigquery:"max_memory"` + LastMeasured time.Time `bigquery:"last_measured"` + ContainerDurations string `bigquery:"container_durations"` // JSON string + } + if err := it.Next(&row); err != nil { + if err == iterator.Done { + break + } + bq.logger.WithError(err).Warn("Failed to read BigQuery row") + continue + } + + meta := podscaler.FullMetadata{ + Metadata: api.Metadata{ + Org: row.Org, + Repo: row.Repo, + Branch: row.Branch, + }, + Target: row.Target, + Pod: row.PodName, + Container: row.Container, + } + + // Parse container_durations JSON string into map[string]time.Duration + // BigQuery stores this as a JSON string. time.Duration serializes as int64 (nanoseconds) + // so the format is: {"container1": 3600000000000, "container2": 245000000000} + containerDurations := make(map[string]time.Duration) + if row.ContainerDurations != "" { + var durationsMap map[string]int64 + if err := json.Unmarshal([]byte(row.ContainerDurations), &durationsMap); err == nil { + for container, nanoseconds := range durationsMap { + containerDurations[container] = time.Duration(nanoseconds) + } + } else { + // Fallback: try parsing as string format (for backwards compatibility) + var durationsMapStr map[string]string + if err := json.Unmarshal([]byte(row.ContainerDurations), &durationsMapStr); err == nil { + for container, durationStr := range durationsMapStr { + if duration, err := time.ParseDuration(durationStr); err == nil { + containerDurations[container] = duration + } + } + } + } + } + + data[meta] = &MeasuredPodData{ + Metadata: meta, + MaxCPUUtilization: row.MaxCPU, + MaxMemoryUtilization: row.MaxMemory, + LastMeasuredTime: row.LastMeasured, + ContainerDurations: containerDurations, + } + } + + bq.cache.Update(data) + bq.lastRefresh = time.Now() + bq.logger.Infof("Refreshed measured pod data: %d entries, last refresh: %v", len(data), bq.lastRefresh) + return nil +} + +// ShouldBeMeasured checks if we need to run this pod in isolation to measure it. +// We measure it if we've never measured it before, or if it's been more than 10 days +// since the last measurement (data gets stale). +func (bq *BigQueryClient) ShouldBeMeasured(meta podscaler.FullMetadata) bool { + data, exists := bq.cache.Get(meta) + if !exists { + // Never measured this pod before, so we should measure it now. + return true + } + + // If it's been more than 10 days, the data is stale and we should re-measure. + cutoff := time.Now().Add(-MeasuredPodDataRetentionDays * 24 * time.Hour) + return data.LastMeasuredTime.Before(cutoff) +} + +// GetMeasuredData returns the measured pod data for the given metadata +func (bq *BigQueryClient) GetMeasuredData(meta podscaler.FullMetadata) (*MeasuredPodData, bool) { + return bq.cache.Get(meta) +} + +// ClassifyPod decides whether this pod should run in isolation ("measured") or with others ("normal"). +// We check each container - if any container needs measuring, the whole pod gets the "measured" label. +// This label tells the scheduler to keep it away from other CI workloads. +func ClassifyPod(pod *corev1.Pod, bqClient *BigQueryClient, logger *logrus.Entry) { + if pod.Labels == nil { + pod.Labels = make(map[string]string) + } + + // If BigQuery client is not available, default to normal to avoid overwhelming isolated nodes. + if bqClient == nil { + pod.Labels[PodScalerLabelKey] = PodScalerLabelValueNormal + logger.Debugf("Classified pod as normal - BigQuery client not available") + return + } + + // Check each container to see if we need fresh measurement data for it. + // If any container needs measuring, we mark the whole pod as "measured". + for _, container := range pod.Spec.Containers { + fullMeta := podscaler.MetadataFor(pod.ObjectMeta.Labels, pod.ObjectMeta.Name, container.Name) + if bqClient.ShouldBeMeasured(fullMeta) { + // This container needs fresh data, so mark the pod as measured. + pod.Labels[PodScalerLabelKey] = PodScalerLabelValueMeasured + logger.Debugf("Classified pod as measured - will run on isolated node") + return + } + } + + // No container needs measuring, so mark as normal. + pod.Labels[PodScalerLabelKey] = PodScalerLabelValueNormal + logger.Debugf("Classified pod as normal - can share node with other workloads") +} + +// AddPodAntiAffinity sets up scheduling rules so measured pods get isolated nodes. +// Measured pods avoid ALL other pod-scaler labeled pods (they need the whole node). +// Normal pods avoid measured pods (so measured pods can have their isolation). +func AddPodAntiAffinity(pod *corev1.Pod, logger *logrus.Entry) { + podScalerLabel, hasLabel := pod.Labels[PodScalerLabelKey] + if !hasLabel { + logger.Debug("Pod does not have pod-scaler label, skipping anti-affinity") + return + } + + // Set up the affinity rules if they don't exist yet. + if pod.Spec.Affinity == nil { + pod.Spec.Affinity = &corev1.Affinity{} + } + if pod.Spec.Affinity.PodAntiAffinity == nil { + pod.Spec.Affinity.PodAntiAffinity = &corev1.PodAntiAffinity{} + } + + var requiredTerms []corev1.PodAffinityTerm + + switch podScalerLabel { + case PodScalerLabelValueMeasured: + // Measured pods need complete isolation - they can't share a node with ANY other pod-scaler pod. + // This ensures they get the full node resources for accurate measurement. + requiredTerms = append(requiredTerms, corev1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: PodScalerLabelKey, + Operator: metav1.LabelSelectorOpExists, + }, + }, + }, + TopologyKey: "kubernetes.io/hostname", + }) + logger.Debug("Added podAntiAffinity for measured pod - will avoid all pod-scaler labeled pods") + case PodScalerLabelValueNormal: + // Normal pods stay away from measured pods so measured pods can have their isolation. + requiredTerms = append(requiredTerms, corev1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: PodScalerLabelKey, + Operator: metav1.LabelSelectorOpIn, + Values: []string{PodScalerLabelValueMeasured}, + }, + }, + }, + TopologyKey: "kubernetes.io/hostname", + }) + logger.Debug("Added podAntiAffinity for normal pod - will avoid measured pods") + } + + // Merge with existing anti-affinity terms instead of overwriting + existingTerms := pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution + if existingTerms != nil { + requiredTerms = append(existingTerms, requiredTerms...) + } + pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution = requiredTerms +} + +// ApplyMeasuredPodResources uses the real resource data we collected when this pod ran in isolation. +// We only increase resources for the longest-running container (the main workload), not all containers. +// This is based on actual measured usage, not Prometheus data that might be skewed by node contention. +// This function applies resources to pods labeled "normal" (which have measured data), not "measured" (which need measurement). +func ApplyMeasuredPodResources(pod *corev1.Pod, bqClient *BigQueryClient, logger *logrus.Entry) { + if bqClient == nil { + return + } + + podScalerLabel, hasLabel := pod.Labels[PodScalerLabelKey] + // Apply measured resources to pods labeled "normal" (which have fresh measured data) + // Pods labeled "measured" don't have data yet - they're being measured for the first time. + if !hasLabel || podScalerLabel != PodScalerLabelValueNormal { + return + } + + // Find the container that runs the longest - that's the one that needs the resource increases. + // The other containers are usually sidecars or helpers that don't need as much. + var longestContainer *corev1.Container + var longestDuration time.Duration + var measuredData *MeasuredPodData + + for i := range pod.Spec.Containers { + container := &pod.Spec.Containers[i] + fullMeta := podscaler.MetadataFor(pod.ObjectMeta.Labels, pod.ObjectMeta.Name, container.Name) + + containerMeasuredData, exists := bqClient.GetMeasuredData(fullMeta) + if !exists { + continue + } + + // Track which container ran the longest - that's our main workload. + if duration, ok := containerMeasuredData.ContainerDurations[container.Name]; ok { + if duration > longestDuration { + longestDuration = duration + longestContainer = container + measuredData = containerMeasuredData + } + } + } + + // If we don't have duration data, just use the first container with measured data as a fallback. + if longestContainer == nil && len(pod.Spec.Containers) > 0 { + for i := range pod.Spec.Containers { + container := &pod.Spec.Containers[i] + fullMeta := podscaler.MetadataFor(pod.ObjectMeta.Labels, pod.ObjectMeta.Name, container.Name) + if containerMeasuredData, exists := bqClient.GetMeasuredData(fullMeta); exists { + longestContainer = container + measuredData = containerMeasuredData + break + } + } + } + + if longestContainer == nil || measuredData == nil { + logger.Debug("No containers with measured data found for measured pod resource application") + return + } + + // Set up the resource requests if they don't exist yet. + if longestContainer.Resources.Requests == nil { + longestContainer.Resources.Requests = corev1.ResourceList{} + } + + // Apply CPU request based on what we actually saw when it ran in isolation, with safety buffer. + cpuRequest := measuredData.MaxCPUUtilization * MeasuredPodResourceBuffer + if cpuRequest > 0 { + cpuQuantity := resource.NewMilliQuantity(int64(cpuRequest*1000), resource.DecimalSI) + longestContainer.Resources.Requests[corev1.ResourceCPU] = *cpuQuantity + logger.Debugf("Applied CPU request %v to container %s based on measured data", cpuQuantity, longestContainer.Name) + } + + // Apply memory request based on what we actually saw, with safety buffer. + if measuredData.MaxMemoryUtilization > 0 { + memoryRequest := int64(float64(measuredData.MaxMemoryUtilization) * MeasuredPodResourceBuffer) + memoryQuantity := resource.NewQuantity(memoryRequest, resource.BinarySI) + longestContainer.Resources.Requests[corev1.ResourceMemory] = *memoryQuantity + logger.Debugf("Applied memory request %v to container %s based on measured data", memoryQuantity, longestContainer.Name) + } +} diff --git a/cmd/pod-scaler/measured_test.go b/cmd/pod-scaler/measured_test.go new file mode 100644 index 00000000000..107514e1232 --- /dev/null +++ b/cmd/pod-scaler/measured_test.go @@ -0,0 +1,253 @@ +package main + +import ( + "testing" + "time" + + "github.com/sirupsen/logrus" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/openshift/ci-tools/pkg/api" + podscaler "github.com/openshift/ci-tools/pkg/pod-scaler" +) + +func TestShouldBeMeasured(t *testing.T) { + logger := logrus.WithField("test", "TestShouldBeMeasured") + cache := NewMeasuredPodCache(logger) + bqClient := &BigQueryClient{ + cache: cache, + logger: logger, + } + + meta := podscaler.FullMetadata{ + Metadata: api.Metadata{ + Org: "test-org", + Repo: "test-repo", + Branch: "main", + }, + Container: "test-container", + } + + // Test case 1: No data exists - should be measured + if !bqClient.ShouldBeMeasured(meta) { + t.Error("Expected pod to be measured when no data exists") + } + + // Test case 2: Recent data exists - should not be measured + recentData := map[podscaler.FullMetadata]*MeasuredPodData{ + meta: { + Metadata: meta, + LastMeasuredTime: time.Now().Add(-5 * 24 * time.Hour), // 5 days ago + }, + } + cache.Update(recentData) + if bqClient.ShouldBeMeasured(meta) { + t.Error("Expected pod not to be measured when recent data exists") + } + + // Test case 3: Stale data exists - should be measured + staleData := map[podscaler.FullMetadata]*MeasuredPodData{ + meta: { + Metadata: meta, + LastMeasuredTime: time.Now().Add(-15 * 24 * time.Hour), // 15 days ago + }, + } + cache.Update(staleData) + if !bqClient.ShouldBeMeasured(meta) { + t.Error("Expected pod to be measured when data is stale (>10 days)") + } +} + +func TestClassifyPod(t *testing.T) { + logger := logrus.WithField("test", "TestClassifyPod") + cache := NewMeasuredPodCache(logger) + bqClient := &BigQueryClient{ + cache: cache, + logger: logger, + } + + // Test case 1: Pod with no data - should be classified as measured + pod1 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-1", + Labels: map[string]string{ + "ci.openshift.io/metadata.org": "test-org", + "ci.openshift.io/metadata.repo": "test-repo", + "ci.openshift.io/metadata.branch": "main", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "test-container"}, + }, + }, + } + + ClassifyPod(pod1, bqClient, logger) + if pod1.Labels[PodScalerLabelKey] != PodScalerLabelValueMeasured { + t.Errorf("Expected pod to be classified as measured, got %s", pod1.Labels[PodScalerLabelKey]) + } + + // Test case 2: Pod with recent data - should be classified as normal + // Note: We need to use the same metadata structure that MetadataFor creates + meta2 := podscaler.MetadataFor( + map[string]string{ + "ci.openshift.io/metadata.org": "test-org", + "ci.openshift.io/metadata.repo": "test-repo", + "ci.openshift.io/metadata.branch": "main", + }, + "test-pod-2", + "test-container", + ) + recentData := map[podscaler.FullMetadata]*MeasuredPodData{ + meta2: { + Metadata: meta2, + LastMeasuredTime: time.Now().Add(-5 * 24 * time.Hour), // 5 days ago + }, + } + cache.Update(recentData) + + pod2 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-2", + Labels: map[string]string{ + "ci.openshift.io/metadata.org": "test-org", + "ci.openshift.io/metadata.repo": "test-repo", + "ci.openshift.io/metadata.branch": "main", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "test-container"}, + }, + }, + } + + ClassifyPod(pod2, bqClient, logger) + if pod2.Labels[PodScalerLabelKey] != PodScalerLabelValueNormal { + t.Errorf("Expected pod to be classified as normal, got %s", pod2.Labels[PodScalerLabelKey]) + } + + // Test case 3: Pod with nil BigQuery client - should default to normal to avoid overwhelming isolated nodes + pod3 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-3", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "test-container"}, + }, + }, + } + + ClassifyPod(pod3, nil, logger) + if pod3.Labels[PodScalerLabelKey] != PodScalerLabelValueNormal { + t.Errorf("Expected pod to be classified as normal when BigQuery client is nil (to avoid overwhelming isolated nodes), got %s", pod3.Labels[PodScalerLabelKey]) + } +} + +func TestAddPodAntiAffinity(t *testing.T) { + logger := logrus.WithField("test", "TestAddPodAntiAffinity") + + // Test case 1: Measured pod should avoid all pod-scaler labeled pods + measuredPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "measured-pod", + Labels: map[string]string{ + PodScalerLabelKey: PodScalerLabelValueMeasured, + }, + }, + Spec: corev1.PodSpec{}, + } + + AddPodAntiAffinity(measuredPod, logger) + if measuredPod.Spec.Affinity == nil { + t.Fatal("Expected affinity to be set") + } + if measuredPod.Spec.Affinity.PodAntiAffinity == nil { + t.Fatal("Expected podAntiAffinity to be set") + } + if len(measuredPod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) == 0 { + t.Fatal("Expected required anti-affinity terms to be set") + } + + // Test case 2: Normal pod should avoid measured pods + normalPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "normal-pod", + Labels: map[string]string{ + PodScalerLabelKey: PodScalerLabelValueNormal, + }, + }, + Spec: corev1.PodSpec{}, + } + + AddPodAntiAffinity(normalPod, logger) + if normalPod.Spec.Affinity == nil { + t.Fatal("Expected affinity to be set") + } + if normalPod.Spec.Affinity.PodAntiAffinity == nil { + t.Fatal("Expected podAntiAffinity to be set") + } + if len(normalPod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) == 0 { + t.Fatal("Expected required anti-affinity terms to be set") + } + + // Test case 3: Pod without label should not get anti-affinity + unlabeledPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "unlabeled-pod", + }, + Spec: corev1.PodSpec{}, + } + + AddPodAntiAffinity(unlabeledPod, logger) + if unlabeledPod.Spec.Affinity != nil { + t.Error("Expected affinity not to be set for unlabeled pod") + } +} + +func TestMeasuredPodCache(t *testing.T) { + logger := logrus.WithField("test", "TestMeasuredPodCache") + cache := NewMeasuredPodCache(logger) + + meta := podscaler.FullMetadata{ + Metadata: api.Metadata{ + Org: "test-org", + Repo: "test-repo", + Branch: "main", + }, + Container: "test-container", + } + + // Test Get with no data + _, exists := cache.Get(meta) + if exists { + t.Error("Expected no data to exist initially") + } + + // Test Update and Get + data := map[podscaler.FullMetadata]*MeasuredPodData{ + meta: { + Metadata: meta, + MaxCPUUtilization: 2.5, + MaxMemoryUtilization: 4 * 1024 * 1024 * 1024, // 4Gi + LastMeasuredTime: time.Now(), + ContainerDurations: make(map[string]time.Duration), + }, + } + cache.Update(data) + + retrieved, exists := cache.Get(meta) + if !exists { + t.Error("Expected data to exist after update") + } + if retrieved.MaxCPUUtilization != 2.5 { + t.Errorf("Expected MaxCPUUtilization to be 2.5, got %f", retrieved.MaxCPUUtilization) + } + if retrieved.MaxMemoryUtilization != 4*1024*1024*1024 { + t.Errorf("Expected MaxMemoryUtilization to be 4Gi, got %d", retrieved.MaxMemoryUtilization) + } +} diff --git a/cmd/pod-scaler/producer.go b/cmd/pod-scaler/producer.go index 786ce43972e..8fb1b8ce13c 100644 --- a/cmd/pod-scaler/producer.go +++ b/cmd/pod-scaler/producer.go @@ -2,12 +2,14 @@ package main import ( "context" + "encoding/json" "errors" "fmt" "strings" "sync" "time" + "cloud.google.com/go/bigquery" "github.com/openhistogram/circonusllhist" prometheusapi "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/prometheus/common/model" @@ -36,6 +38,20 @@ const ( StepsCachePrefix = "steps" ) +// escapePromQLLabelValue escapes special characters in PromQL label values. +// PromQL label values in selectors must be quoted and have backslashes and quotes escaped. +func escapePromQLLabelValue(value string) string { + // Replace backslashes first (before replacing quotes) + value = strings.ReplaceAll(value, `\`, `\\`) + // Escape double quotes + value = strings.ReplaceAll(value, `"`, `\"`) + // Escape newlines + value = strings.ReplaceAll(value, "\n", `\n`) + // Escape carriage returns + value = strings.ReplaceAll(value, "\r", `\r`) + return value +} + // queriesByMetric returns a mapping of Prometheus query by metric name for all queries we want to execute func queriesByMetric() map[string]string { queries := map[string]string{} @@ -70,7 +86,7 @@ func queriesByMetric() map[string]string { return queries } -func produce(clients map[string]prometheusapi.API, dataCache Cache, ignoreLatest time.Duration, once bool) { +func produce(clients map[string]prometheusapi.API, dataCache Cache, ignoreLatest time.Duration, once bool, bqClient *BigQueryClient) { var execute func(func()) if once { execute = func(f func()) { @@ -81,6 +97,35 @@ func produce(clients map[string]prometheusapi.API, dataCache Cache, ignoreLatest interrupts.TickLiteral(f, 2*time.Hour) } } + + // Run measured pod data collection on startup and then periodically if BigQuery client is available + if bqClient != nil { + // Use sync.Once to ensure the client is only closed once + var closeOnce sync.Once + closeClient := func() { + closeOnce.Do(func() { + if err := bqClient.client.Close(); err != nil { + logrus.WithError(err).Error("Failed to close BigQuery client") + } + }) + } + // Register cleanup handler to close BigQuery client on shutdown + // This ensures the client is closed when the program exits, not when produce() returns + // In non-once mode, this handles graceful shutdown. In once mode, this provides a safety net. + interrupts.OnInterrupt(closeClient) + + // Run measured pod data collection on startup and then periodically + execute(func() { + if err := collectMeasuredPodMetrics(interrupts.Context(), clients, bqClient, logrus.WithField("component", "measured-pods-collector")); err != nil { + logrus.WithError(err).Error("Failed to collect measured pod metrics") + } + // In once mode, close the client after work is done since the program will exit + if once { + closeClient() + } + }) + } + execute(func() { for name, query := range queriesByMetric() { name := name @@ -372,3 +417,464 @@ func (q *querier) executeOverRange(ctx context.Context, c *clusterMetadata, r pr q.lock.Unlock() logger.Debugf("Saved Prometheus response after %s.", time.Since(saveStart).Round(time.Second)) } + +// collectMeasuredPodMetrics queries Prometheus for completed measured pods and writes to BigQuery +func collectMeasuredPodMetrics(ctx context.Context, clients map[string]prometheusapi.API, bqClient *BigQueryClient, logger *logrus.Entry) error { + logger.Info("Starting measured pod data collection") + + // Query window: last 4 hours (to catch recently completed pods) + until := time.Now() + from := until.Add(-4 * time.Hour) + + // Query Prometheus for pods with pod-scaler=measured label + // kube_pod_labels metric exposes labels with a label_ prefix + measuredPodSelector := `{label_pod_scaler="measured",label_created_by_ci="true"}` + + // Query for pod labels to identify measured pods + podLabelsQuery := fmt.Sprintf(`kube_pod_labels%s`, measuredPodSelector) + + var allMeasuredPods []measuredPodData + for clusterName, client := range clients { + clusterLogger := logger.WithField("cluster", clusterName) + + // Query for pod labels to find measured pods + labelsResult, _, err := client.QueryRange(ctx, podLabelsQuery, prometheusapi.Range{ + Start: from, + End: until, + Step: 30 * time.Second, + }) + if err != nil { + clusterLogger.WithError(err).Warn("Failed to query pod labels") + continue + } + + // Extract pod names and metadata from labels + matrix, ok := labelsResult.(model.Matrix) + if !ok { + clusterLogger.Warn("Unexpected result type for pod labels query") + continue + } + + // For each measured pod, get CPU and memory usage + // Deduplicate pods by name to avoid processing the same pod multiple times + seenPods := make(map[string]bool) + for _, sampleStream := range matrix { + podName := string(sampleStream.Metric["pod"]) + namespace := string(sampleStream.Metric["namespace"]) + podKey := fmt.Sprintf("%s/%s", namespace, podName) + + // Skip if we've already processed this pod + if seenPods[podKey] { + continue + } + seenPods[podKey] = true + + podDataList, err := extractMeasuredPodData(ctx, client, sampleStream, from, until, clusterLogger) + if err != nil { + clusterLogger.WithError(err).Warn("Failed to extract measured pod data") + continue + } + if len(podDataList) > 0 { + allMeasuredPods = append(allMeasuredPods, podDataList...) + } + } + } + + // Write all collected data to BigQuery + if len(allMeasuredPods) > 0 { + if err := writeMeasuredPodsToBigQuery(ctx, bqClient.client, bqClient.projectID, bqClient.datasetID, allMeasuredPods, logger); err != nil { + return fmt.Errorf("failed to write measured pods to BigQuery: %w", err) + } + logger.Infof("Collected and wrote %d measured pod records to BigQuery", len(allMeasuredPods)) + } else { + logger.Info("No measured pods found in the query window") + } + + return nil +} + +type measuredPodData struct { + Org string + Repo string + Branch string + Target string + Container string + MinCPU float64 + MaxCPU float64 + MinMemory int64 + MaxMemory int64 + ContainerDurations map[string]time.Duration + NodeName string + PodName string + Timestamp time.Time + // Node-level metrics for validation (since pod is isolated, node metrics should match) + NodeMinCPU float64 + NodeMaxCPU float64 + NodeMinMemory int64 + NodeMaxMemory int64 +} + +func extractMeasuredPodData(ctx context.Context, client prometheusapi.API, sampleStream *model.SampleStream, from, until time.Time, logger *logrus.Entry) ([]measuredPodData, error) { + // Extract pod metadata from labels + podName := string(sampleStream.Metric["pod"]) + namespace := string(sampleStream.Metric["namespace"]) + org := string(sampleStream.Metric["label_ci_openshift_io_metadata_org"]) + repo := string(sampleStream.Metric["label_ci_openshift_io_metadata_repo"]) + branch := string(sampleStream.Metric["label_ci_openshift_io_metadata_branch"]) + target := string(sampleStream.Metric["label_ci_openshift_io_metadata_target"]) + + if org == "" || repo == "" { + // Skip pods without proper metadata + return nil, nil + } + + // Query Prometheus for pod metrics + cpuResult, memoryResult, containerInfoResult, err := queryPodMetrics(ctx, client, namespace, podName, from, until, logger) + if err != nil { + return nil, err + } + + // Process CPU and memory results to get min/max per container + minCPUByContainer, maxCPUByContainer := extractCPUUsage(cpuResult) + minMemoryByContainer, maxMemoryByContainer := extractMemoryUsage(memoryResult) + + // Get node name and query node-level metrics for validation + nodeName := string(sampleStream.Metric["node"]) + nodeMinCPU, nodeMaxCPU, nodeMinMemory, nodeMaxMemory := queryNodeMetrics(ctx, client, nodeName, from, until, logger) + + // Extract container durations + containerDurations := extractContainerDurations(containerInfoResult) + + // Build records for each container + records := buildMeasuredPodRecords(org, repo, branch, target, podName, nodeName, cpuResult, minCPUByContainer, maxCPUByContainer, minMemoryByContainer, maxMemoryByContainer, containerDurations, nodeMinCPU, nodeMaxCPU, nodeMinMemory, nodeMaxMemory, from, until) + + return records, nil +} + +// queryPodMetrics queries Prometheus for CPU, memory, and container info metrics +func queryPodMetrics(ctx context.Context, client prometheusapi.API, namespace, podName string, from, until time.Time, logger *logrus.Entry) (model.Value, model.Value, model.Value, error) { + // Escape label values to prevent PromQL injection + escapedNamespace := escapePromQLLabelValue(namespace) + escapedPodName := escapePromQLLabelValue(podName) + + queryRange := prometheusapi.Range{ + Start: from, + End: until, + Step: 30 * time.Second, + } + + // Query CPU usage for this pod - we'll compute min/max from the time series + // Using rate with a 3-minute window to get per-second CPU usage + cpuQuery := fmt.Sprintf(`rate(container_cpu_usage_seconds_total{namespace="%s",pod="%s",container!="POD",container!=""}[3m])`, escapedNamespace, escapedPodName) + cpuResult, _, err := client.QueryRange(ctx, cpuQuery, queryRange) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to query CPU: %w", err) + } + + // Query memory usage for this pod - we'll compute min/max from the time series + memoryQuery := fmt.Sprintf(`container_memory_working_set_bytes{namespace="%s",pod="%s",container!="POD",container!=""}`, escapedNamespace, escapedPodName) + memoryResult, _, err := client.QueryRange(ctx, memoryQuery, queryRange) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to query memory: %w", err) + } + + // Query container lifecycle info to get actual container durations + containerInfoQuery := fmt.Sprintf(`kube_pod_container_info{namespace="%s",pod="%s",container!="POD",container!=""}`, escapedNamespace, escapedPodName) + containerInfoResult, _, err := client.QueryRange(ctx, containerInfoQuery, queryRange) + if err != nil { + logger.WithError(err).Debug("Failed to query container info, will estimate duration") + } + + return cpuResult, memoryResult, containerInfoResult, nil +} + +// extractCPUUsage processes CPU query results and returns min/max CPU usage per container +func extractCPUUsage(cpuResult model.Value) (map[string]float64, map[string]float64) { + minCPUByContainer := make(map[string]float64) + maxCPUByContainer := make(map[string]float64) + + if cpuMatrix, ok := cpuResult.(model.Matrix); ok { + for _, stream := range cpuMatrix { + container := string(stream.Metric["container"]) + var minCPU, maxCPU float64 + first := true + for _, sample := range stream.Values { + cpuVal := float64(sample.Value) + if first { + minCPU = cpuVal + maxCPU = cpuVal + first = false + } else { + if cpuVal < minCPU { + minCPU = cpuVal + } + if cpuVal > maxCPU { + maxCPU = cpuVal + } + } + } + if maxCPU > 0 { + minCPUByContainer[container] = minCPU + maxCPUByContainer[container] = maxCPU + } + } + } + + return minCPUByContainer, maxCPUByContainer +} + +// extractMemoryUsage processes memory query results and returns min/max memory usage per container +func extractMemoryUsage(memoryResult model.Value) (map[string]int64, map[string]int64) { + minMemoryByContainer := make(map[string]int64) + maxMemoryByContainer := make(map[string]int64) + + if memoryMatrix, ok := memoryResult.(model.Matrix); ok { + for _, stream := range memoryMatrix { + container := string(stream.Metric["container"]) + var minMemory, maxMemory int64 + first := true + for _, sample := range stream.Values { + memVal := int64(sample.Value) + if first { + minMemory = memVal + maxMemory = memVal + first = false + } else { + if memVal < minMemory { + minMemory = memVal + } + if memVal > maxMemory { + maxMemory = memVal + } + } + } + if maxMemory > 0 { + minMemoryByContainer[container] = minMemory + maxMemoryByContainer[container] = maxMemory + } + } + } + + return minMemoryByContainer, maxMemoryByContainer +} + +// queryNodeMetrics queries node-level metrics for validation (since pod is isolated, node metrics should match pod metrics) +func queryNodeMetrics(ctx context.Context, client prometheusapi.API, nodeName string, from, until time.Time, logger *logrus.Entry) (float64, float64, int64, int64) { + var nodeMinCPU, nodeMaxCPU float64 + var nodeMinMemory, nodeMaxMemory int64 + + if nodeName == "" { + return nodeMinCPU, nodeMaxCPU, nodeMinMemory, nodeMaxMemory + } + + // Escape node name for PromQL + escapedNodeName := escapePromQLLabelValue(nodeName) + queryRange := prometheusapi.Range{ + Start: from, + End: until, + Step: 30 * time.Second, + } + + // Query node CPU utilization - sum all containers on the node + nodeCPUQuery := fmt.Sprintf(`sum(rate(container_cpu_usage_seconds_total{node="%s",container!="POD",container!=""}[3m]))`, escapedNodeName) + nodeCPUResult, _, err := client.QueryRange(ctx, nodeCPUQuery, queryRange) + if err == nil { + if nodeCPUMatrix, ok := nodeCPUResult.(model.Matrix); ok && len(nodeCPUMatrix) > 0 { + first := true + for _, stream := range nodeCPUMatrix { + for _, sample := range stream.Values { + cpuVal := float64(sample.Value) + if first { + nodeMinCPU = cpuVal + nodeMaxCPU = cpuVal + first = false + } else { + if cpuVal < nodeMinCPU { + nodeMinCPU = cpuVal + } + if cpuVal > nodeMaxCPU { + nodeMaxCPU = cpuVal + } + } + } + } + } + } else { + logger.WithError(err).Debug("Failed to query node CPU metrics for validation") + } + + // Query node memory utilization - sum all containers on the node + nodeMemoryQuery := fmt.Sprintf(`sum(container_memory_working_set_bytes{node="%s",container!="POD",container!=""})`, escapedNodeName) + nodeMemoryResult, _, err := client.QueryRange(ctx, nodeMemoryQuery, queryRange) + if err == nil { + if nodeMemoryMatrix, ok := nodeMemoryResult.(model.Matrix); ok && len(nodeMemoryMatrix) > 0 { + first := true + for _, stream := range nodeMemoryMatrix { + for _, sample := range stream.Values { + memVal := int64(sample.Value) + if first { + nodeMinMemory = memVal + nodeMaxMemory = memVal + first = false + } else { + if memVal < nodeMinMemory { + nodeMinMemory = memVal + } + if memVal > nodeMaxMemory { + nodeMaxMemory = memVal + } + } + } + } + } + } else { + logger.WithError(err).Debug("Failed to query node memory metrics for validation") + } + + return nodeMinCPU, nodeMaxCPU, nodeMinMemory, nodeMaxMemory +} + +// extractContainerDurations extracts container durations from container info query results +func extractContainerDurations(containerInfoResult model.Value) map[string]time.Duration { + containerDurations := make(map[string]time.Duration) + if containerInfoMatrix, ok := containerInfoResult.(model.Matrix); ok { + for _, stream := range containerInfoMatrix { + container := string(stream.Metric["container"]) + if len(stream.Values) > 0 { + // Container duration is from first to last sample + firstTime := stream.Values[0].Timestamp.Time() + lastTime := stream.Values[len(stream.Values)-1].Timestamp.Time() + containerDurations[container] = lastTime.Sub(firstTime) + } + } + } + return containerDurations +} + +// buildMeasuredPodRecords builds measuredPodData records for each container +func buildMeasuredPodRecords(org, repo, branch, target, podName, nodeName string, cpuResult model.Value, minCPUByContainer, maxCPUByContainer map[string]float64, minMemoryByContainer, maxMemoryByContainer map[string]int64, containerDurations map[string]time.Duration, nodeMinCPU, nodeMaxCPU float64, nodeMinMemory, nodeMaxMemory int64, from, until time.Time) []measuredPodData { + var records []measuredPodData + + for container, maxCPU := range maxCPUByContainer { + minCPU := minCPUByContainer[container] + maxMemory := maxMemoryByContainer[container] + minMemory := minMemoryByContainer[container] + + // Use actual container duration if available, otherwise estimate + containerDuration := containerDurations[container] + if containerDuration == 0 { + // Estimate based on query window if we don't have container info + containerDuration = until.Sub(from) + } + + // Determine pod execution timestamp (use first sample time if available) + podTimestamp := until + if cpuMatrix, ok := cpuResult.(model.Matrix); ok { + for _, stream := range cpuMatrix { + if string(stream.Metric["container"]) == container && len(stream.Values) > 0 { + podTimestamp = stream.Values[0].Timestamp.Time() + break + } + } + } + + // Use all container durations for this pod, not just the current container + // This gives us complete duration information for all containers in the pod + podContainerDurations := make(map[string]time.Duration) + for c, d := range containerDurations { + podContainerDurations[c] = d + } + // If this container's duration isn't in the map, add it + if _, exists := podContainerDurations[container]; !exists { + podContainerDurations[container] = containerDuration + } + + records = append(records, measuredPodData{ + Org: org, + Repo: repo, + Branch: branch, + Target: target, + Container: container, + MinCPU: minCPU, + MaxCPU: maxCPU, + MinMemory: minMemory, + MaxMemory: maxMemory, + ContainerDurations: podContainerDurations, + NodeName: nodeName, + PodName: podName, + Timestamp: podTimestamp, + NodeMinCPU: nodeMinCPU, + NodeMaxCPU: nodeMaxCPU, + NodeMinMemory: nodeMinMemory, + NodeMaxMemory: nodeMaxMemory, + }) + } + + return records +} + +type bigQueryPodMetricsRow struct { + Org string `bigquery:"org"` + Repo string `bigquery:"repo"` + Branch string `bigquery:"branch"` + Target string `bigquery:"target"` + Container string `bigquery:"container"` + PodName string `bigquery:"pod_name"` + PodScalerLabel string `bigquery:"pod_scaler_label"` + MinCPU float64 `bigquery:"min_cpu"` + MaxCPU float64 `bigquery:"max_cpu"` + MinMemory int64 `bigquery:"min_memory"` + MaxMemory int64 `bigquery:"max_memory"` + ContainerDurations string `bigquery:"container_durations"` + NodeName string `bigquery:"node_name"` + NodeMinCPU float64 `bigquery:"node_min_cpu"` + NodeMaxCPU float64 `bigquery:"node_max_cpu"` + NodeMinMemory int64 `bigquery:"node_min_memory"` + NodeMaxMemory int64 `bigquery:"node_max_memory"` + Created time.Time `bigquery:"created"` + LastMeasured time.Time `bigquery:"last_measured"` +} + +func writeMeasuredPodsToBigQuery(ctx context.Context, bqClient *bigquery.Client, _ /* projectID */, datasetID string, pods []measuredPodData, logger *logrus.Entry) error { + // Use ci_operator_metrics table with additional fields for measured pod data + inserter := bqClient.Dataset(datasetID).Table("ci_operator_metrics").Inserter() + + // Convert to BigQuery rows + rows := make([]*bigQueryPodMetricsRow, 0, len(pods)) + for _, pod := range pods { + // Serialize container durations as JSON + durationsJSON, err := json.Marshal(pod.ContainerDurations) + if err != nil { + logger.WithError(err).Warn("Failed to marshal container durations") + continue + } + + rows = append(rows, &bigQueryPodMetricsRow{ + Org: pod.Org, + Repo: pod.Repo, + Branch: pod.Branch, + Target: pod.Target, + Container: pod.Container, + PodName: pod.PodName, + PodScalerLabel: "measured", + MinCPU: pod.MinCPU, + MaxCPU: pod.MaxCPU, + MinMemory: pod.MinMemory, + MaxMemory: pod.MaxMemory, + ContainerDurations: string(durationsJSON), + NodeName: pod.NodeName, + NodeMinCPU: pod.NodeMinCPU, + NodeMaxCPU: pod.NodeMaxCPU, + NodeMinMemory: pod.NodeMinMemory, + NodeMaxMemory: pod.NodeMaxMemory, + Created: pod.Timestamp, + LastMeasured: pod.Timestamp, + }) + } + + if err := inserter.Put(ctx, rows); err != nil { + return fmt.Errorf("failed to insert rows: %w", err) + } + + return nil +} diff --git a/pkg/manifestpusher/manifestpusher.go b/pkg/manifestpusher/manifestpusher.go index f0381b4b7e8..103282164b4 100644 --- a/pkg/manifestpusher/manifestpusher.go +++ b/pkg/manifestpusher/manifestpusher.go @@ -2,12 +2,16 @@ package manifestpusher import ( "fmt" + "strings" + "time" "github.com/estesp/manifest-tool/v2/pkg/registry" "github.com/estesp/manifest-tool/v2/pkg/types" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/util/wait" + buildv1 "github.com/openshift/api/build/v1" ) @@ -37,29 +41,73 @@ func (m manifestPusher) PushImageWithManifest(builds []buildv1.Build, targetImag srcImages := []types.ManifestEntry{} for _, build := range builds { + arch := build.Spec.NodeSelector[nodeArchitectureLabel] + if arch == "" { + return fmt.Errorf("build %s has no architecture label in nodeSelector", build.Name) + } + imageRef := fmt.Sprintf("%s/%s/%s", m.registryURL, build.Spec.Output.To.Namespace, build.Spec.Output.To.Name) + m.logger.Infof("Adding architecture %s: %s", arch, imageRef) srcImages = append(srcImages, types.ManifestEntry{ - Image: fmt.Sprintf("%s/%s/%s", m.registryURL, build.Spec.Output.To.Namespace, build.Spec.Output.To.Name), + Image: imageRef, Platform: ocispec.Platform{ OS: "linux", - Architecture: build.Spec.NodeSelector[nodeArchitectureLabel], + Architecture: arch, }, }) } - digest, _, err := registry.PushManifestList( - "", // username: we don't we use basic auth - "", // password: " - types.YAMLInput{Image: fmt.Sprintf("%s/%s", m.registryURL, targetImageRef), Manifests: srcImages}, - false, // --ignore-missing. We don't want to ignore missing images. - true, // --insecure to allow pushing to the local registry. - false, // --plain-http is false by default in manifest-tool. False for the OpenShift registry. - types.Docker, // we only need docker type manifest. - m.dockercfgPath, - ) + if len(srcImages) == 0 { + return fmt.Errorf("no source images to create manifest list for %s", targetImageRef) + } + + targetImage := fmt.Sprintf("%s/%s", m.registryURL, targetImageRef) + m.logger.Infof("Creating manifest list for %s with %d architectures", targetImage, len(srcImages)) + + // Wait for all images to be available in the registry before creating the manifest list. + // There's a race condition where builds are marked complete but images aren't fully + // available in the registry yet. We retry with exponential backoff to handle this. + backoff := wait.Backoff{ + Duration: 5 * time.Second, + Factor: 1.5, + Steps: 10, // Max ~2 minutes total wait time + } + + var digest string + var length int + var err error + + err = wait.ExponentialBackoff(backoff, func() (bool, error) { + digest, length, err = registry.PushManifestList( + "", // username: we don't we use basic auth + "", // password: " + types.YAMLInput{Image: targetImage, Manifests: srcImages}, + false, // --ignore-missing. We don't want to ignore missing images. + true, // --insecure to allow pushing to the local registry. + false, // --plain-http is false by default in manifest-tool. False for the OpenShift registry. + types.Docker, // we only need docker type manifest. + m.dockercfgPath, + ) + if err != nil { + // Check if the error indicates missing images (common race condition) + errStr := err.Error() + if strings.Contains(errStr, "no image found in manifest list") || + strings.Contains(errStr, "inspect of image") || + strings.Contains(errStr, "failed to pull image") || + strings.Contains(errStr, "choosing an image from manifest list") || + strings.Contains(errStr, "PullBuilderImageFailed") { + m.logger.Warnf("Images not yet available in registry, retrying: %v", err) + return false, nil // Retry + } + // For other errors, fail immediately + return false, err + } + return true, nil + }) + if err != nil { - return err + return fmt.Errorf("failed to push manifest list for %s after retries: %w", targetImageRef, err) } - m.logger.WithField("digest", digest).Infof("Image %s created", targetImageRef) + m.logger.WithField("digest", digest).WithField("length", length).Infof("Successfully created manifest list for %s", targetImageRef) return nil }