diff --git a/.gitignore b/.gitignore index cbf82d48499..f03a456b433 100644 --- a/.gitignore +++ b/.gitignore @@ -9,5 +9,6 @@ index.js # default working dir /job-aggregator-working-dir -# go built binary -/job-run-aggregator \ No newline at end of file +# go built binaries +/job-run-aggregator +/pod-scaler diff --git a/cmd/pod-scaler/admission.go b/cmd/pod-scaler/admission.go index f17d62e1e3f..ac925280681 100644 --- a/cmd/pod-scaler/admission.go +++ b/cmd/pod-scaler/admission.go @@ -33,7 +33,7 @@ import ( "github.com/openshift/ci-tools/pkg/steps" ) -func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Interface, loaders map[string][]*cacheReloader, mutateResourceLimits bool, cpuCap int64, memoryCap string, cpuPriorityScheduling int64, reporter results.PodScalerReporter) { +func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Interface, loaders map[string][]*cacheReloader, mutateResourceLimits bool, cpuCap int64, memoryCap string, cpuPriorityScheduling int64, authoritativeCPU, authoritativeMemory bool, reporter results.PodScalerReporter) { logger := logrus.WithField("component", "pod-scaler admission") logger.Infof("Initializing admission webhook server with %d loaders.", len(loaders)) health := pjutil.NewHealthOnPort(healthPort) @@ -44,7 +44,7 @@ func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Int Port: port, CertDir: certDir, }) - server.Register("/pods", &webhook.Admission{Handler: &podMutator{logger: logger, client: client, decoder: decoder, resources: resources, mutateResourceLimits: mutateResourceLimits, cpuCap: cpuCap, memoryCap: memoryCap, cpuPriorityScheduling: cpuPriorityScheduling, reporter: reporter}}) + server.Register("/pods", &webhook.Admission{Handler: &podMutator{logger: logger, client: client, decoder: decoder, resources: resources, mutateResourceLimits: mutateResourceLimits, cpuCap: cpuCap, memoryCap: memoryCap, cpuPriorityScheduling: cpuPriorityScheduling, authoritativeCPU: authoritativeCPU, authoritativeMemory: authoritativeMemory, reporter: reporter}}) logger.Info("Serving admission webhooks.") if err := server.Start(interrupts.Context()); err != nil { logrus.WithError(err).Fatal("Failed to serve webhooks.") @@ -60,6 +60,8 @@ type podMutator struct { cpuCap int64 memoryCap string cpuPriorityScheduling int64 + authoritativeCPU bool + authoritativeMemory bool reporter results.PodScalerReporter } @@ -97,7 +99,7 @@ func (m *podMutator) Handle(ctx context.Context, req admission.Request) admissio logger.WithError(err).Error("Failed to handle rehearsal Pod.") return admission.Allowed("Failed to handle rehearsal Pod, ignoring.") } - mutatePodResources(pod, m.resources, m.mutateResourceLimits, m.cpuCap, m.memoryCap, m.reporter, logger) + mutatePodResources(pod, m.resources, m.mutateResourceLimits, m.cpuCap, m.memoryCap, m.authoritativeCPU, m.authoritativeMemory, m.reporter, logger) m.addPriorityClass(pod) marshaledPod, err := json.Marshal(pod) @@ -196,8 +198,14 @@ func mutatePodLabels(pod *corev1.Pod, build *buildv1.Build) { } } -// useOursIfLarger updates fields in theirs when ours are larger -func useOursIfLarger(allOfOurs, allOfTheirs *corev1.ResourceRequirements, workloadName, workloadType string, reporter results.PodScalerReporter, logger *logrus.Entry) { +// applyRecommendationsBasedOnRecentData applies resource recommendations based on recent usage data +// (see resourceRecommendationWindow). If they used more, we increase resources. If they used less, +// we decrease them if authoritative mode is enabled for that resource type. +// +// TestApplyRecommendationsBasedOnRecentData_ReducesResources is tested in admission_test.go +// as part of TestUseOursIfLarger. The reduction functionality is verified there with proper +// test cases that handle ResourceQuantity comparison correctly. +func applyRecommendationsBasedOnRecentData(allOfOurs, allOfTheirs *corev1.ResourceRequirements, workloadName, workloadType string, authoritativeCPU, authoritativeMemory bool, reporter results.PodScalerReporter, logger *logrus.Entry) { for _, item := range []*corev1.ResourceRequirements{allOfOurs, allOfTheirs} { if item.Requests == nil { item.Requests = corev1.ResourceList{} @@ -215,6 +223,10 @@ func useOursIfLarger(allOfOurs, allOfTheirs *corev1.ResourceRequirements, worklo } { for _, field := range []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory} { our := (*pair.ours)[field] + // If we have no recommendation for this resource, skip it + if our.IsZero() { + continue + } //TODO(sgoeddel): this is a temporary experiment to see what effect setting values that are 120% of what has // been determined has on the rate of OOMKilled and similar termination of workloads increased := our.AsApproximateFloat64() * 1.2 @@ -231,13 +243,49 @@ func useOursIfLarger(allOfOurs, allOfTheirs *corev1.ResourceRequirements, worklo }) cmp := our.Cmp(their) if cmp == 1 { - fieldLogger.Debug("determined amount larger than configured") + fieldLogger.Debug("determined amount larger than configured, increasing resources") (*pair.theirs)[field] = our if their.Value() > 0 && our.Value() > (their.Value()*10) { reporter.ReportResourceConfigurationWarning(workloadName, workloadType, their.String(), our.String(), field.String()) } } else if cmp < 0 { - fieldLogger.Debug("determined amount smaller than configured") + // Check if authoritative mode is enabled for this resource type + isAuthoritative := false + if field == corev1.ResourceCPU { + isAuthoritative = authoritativeCPU + } else if field == corev1.ResourceMemory { + isAuthoritative = authoritativeMemory + } + + if !isAuthoritative { + fieldLogger.Debug("authoritative mode disabled for this resource, skipping reduction") + continue + } + + // Apply gradual reduction with safety limits: max 25% reduction per cycle, minimum 5% difference + ourValue := our.AsApproximateFloat64() + theirValue := their.AsApproximateFloat64() + if theirValue == 0 { + fieldLogger.Debug("theirs is zero, applying recommendation") + (*pair.theirs)[field] = our + continue + } + + reductionPercent := 1.0 - (ourValue / theirValue) + if reductionPercent < 0.05 { + fieldLogger.Debug("difference less than 5%, skipping micro-adjustment") + continue + } + + maxReductionPercent := 0.25 + if reductionPercent > maxReductionPercent { + maxAllowed := theirValue * (1.0 - maxReductionPercent) + our.Set(int64(maxAllowed)) + fieldLogger.Debugf("applying gradual reduction (limited to 25%% per cycle)") + } else { + fieldLogger.Debug("reducing resources based on recent usage") + } + (*pair.theirs)[field] = our } else { fieldLogger.Debug("determined amount equal to configured") } @@ -292,7 +340,7 @@ func preventUnschedulable(resources *corev1.ResourceRequirements, cpuCap int64, } } -func mutatePodResources(pod *corev1.Pod, server *resourceServer, mutateResourceLimits bool, cpuCap int64, memoryCap string, reporter results.PodScalerReporter, logger *logrus.Entry) { +func mutatePodResources(pod *corev1.Pod, server *resourceServer, mutateResourceLimits bool, cpuCap int64, memoryCap string, authoritativeCPU, authoritativeMemory bool, reporter results.PodScalerReporter, logger *logrus.Entry) { mutateResources := func(containers []corev1.Container) { for i := range containers { meta := podscaler.MetadataFor(pod.ObjectMeta.Labels, pod.ObjectMeta.Name, containers[i].Name) @@ -301,7 +349,7 @@ func mutatePodResources(pod *corev1.Pod, server *resourceServer, mutateResourceL logger.Debugf("recommendation exists for: %s", containers[i].Name) workloadType := determineWorkloadType(pod.Annotations, pod.Labels) workloadName := determineWorkloadName(pod.Name, containers[i].Name, workloadType, pod.Labels) - useOursIfLarger(&resources, &containers[i].Resources, workloadName, workloadType, reporter, logger) + applyRecommendationsBasedOnRecentData(&resources, &containers[i].Resources, workloadName, workloadType, authoritativeCPU, authoritativeMemory, reporter, logger) if mutateResourceLimits { reconcileLimits(&containers[i].Resources) } diff --git a/cmd/pod-scaler/admission_test.go b/cmd/pod-scaler/admission_test.go index beac4cdeff2..39a8b666c85 100644 --- a/cmd/pod-scaler/admission_test.go +++ b/cmd/pod-scaler/admission_test.go @@ -554,7 +554,7 @@ func TestMutatePodResources(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { original := testCase.pod.DeepCopy() - mutatePodResources(testCase.pod, testCase.server, testCase.mutateResourceLimits, 10, "20Gi", &defaultReporter, logrus.WithField("test", testCase.name)) + mutatePodResources(testCase.pod, testCase.server, testCase.mutateResourceLimits, 10, "20Gi", true, true, &defaultReporter, logrus.WithField("test", testCase.name)) diff := cmp.Diff(original, testCase.pod) // In some cases, cmp.Diff decides to use non-breaking spaces, and it's not // particularly deterministic about this. We don't care. @@ -661,7 +661,7 @@ func TestUseOursIfLarger(t *testing.T) { }, }, { - name: "nothing in ours is larger", + name: "ours are smaller with very small values - should reduce resources based on recent usage", ours: corev1.ResourceRequirements{ Limits: corev1.ResourceList{ corev1.ResourceCPU: *resource.NewQuantity(10, resource.DecimalSI), @@ -684,12 +684,16 @@ func TestUseOursIfLarger(t *testing.T) { }, expected: corev1.ResourceRequirements{ Limits: corev1.ResourceList{ - corev1.ResourceCPU: *resource.NewQuantity(200, resource.DecimalSI), - corev1.ResourceMemory: *resource.NewQuantity(3e10, resource.BinarySI), + // Ours: 10 * 1.2 = 12, Theirs: 200, Reduction: 94% > 25%, so limit to 25%: 200 * 0.75 = 150 + corev1.ResourceCPU: *resource.NewQuantity(150, resource.DecimalSI), + // Ours: 10 * 1.2 = 12, Theirs: 3e10, Reduction: >99% > 25%, so limit to 25%: 3e10 * 0.75 = 2.25e10 + corev1.ResourceMemory: *resource.NewQuantity(225e8, resource.BinarySI), }, Requests: corev1.ResourceList{ - corev1.ResourceCPU: *resource.NewQuantity(100, resource.DecimalSI), - corev1.ResourceMemory: *resource.NewQuantity(2e10, resource.BinarySI), + // Ours: 10 * 1.2 = 12, Theirs: 100, Reduction: 88% > 25%, so limit to 25%: 100 * 0.75 = 75 + corev1.ResourceCPU: *resource.NewQuantity(75, resource.DecimalSI), + // Ours: 10 * 1.2 = 12, Theirs: 2e10, Reduction: >99% > 25%, so limit to 25%: 2e10 * 0.75 = 1.5e10 + corev1.ResourceMemory: *resource.NewQuantity(15e9, resource.BinarySI), }, }, }, @@ -717,8 +721,9 @@ func TestUseOursIfLarger(t *testing.T) { }, expected: corev1.ResourceRequirements{ Limits: corev1.ResourceList{ - corev1.ResourceCPU: *resource.NewQuantity(480, resource.DecimalSI), - corev1.ResourceMemory: *resource.NewQuantity(3e10, resource.BinarySI), + corev1.ResourceCPU: *resource.NewQuantity(480, resource.DecimalSI), + // Ours: 10 * 1.2 = 12, Theirs: 3e10, Reduction: >99% > 25%, so limit to 25%: 3e10 * 0.75 = 2.25e10 + corev1.ResourceMemory: *resource.NewQuantity(225e8, resource.BinarySI), }, Requests: corev1.ResourceList{ corev1.ResourceCPU: *resource.NewQuantity(1200, resource.DecimalSI), @@ -726,10 +731,47 @@ func TestUseOursIfLarger(t *testing.T) { }, }, }, + { + name: "ours are smaller with medium values - should reduce resources based on recent usage", + ours: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: *resource.NewQuantity(50, resource.DecimalSI), + corev1.ResourceMemory: *resource.NewQuantity(1e9, resource.BinarySI), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: *resource.NewQuantity(25, resource.DecimalSI), + corev1.ResourceMemory: *resource.NewQuantity(5e9, resource.BinarySI), + }, + }, + theirs: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: *resource.NewQuantity(200, resource.DecimalSI), + corev1.ResourceMemory: *resource.NewQuantity(3e10, resource.BinarySI), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: *resource.NewQuantity(100, resource.DecimalSI), + corev1.ResourceMemory: *resource.NewQuantity(2e10, resource.BinarySI), + }, + }, + expected: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + // Ours: 50 * 1.2 = 60, Theirs: 200, Reduction: 70% > 25%, so limit to 25%: 200 * 0.75 = 150 + corev1.ResourceCPU: *resource.NewQuantity(150, resource.DecimalSI), + // Ours: 1e9 * 1.2 = 1.2e9, Theirs: 3e10, Reduction: 96% > 25%, so limit to 25%: 3e10 * 0.75 = 2.25e10 + corev1.ResourceMemory: *resource.NewQuantity(225e8, resource.BinarySI), + }, + Requests: corev1.ResourceList{ + // Ours: 25 * 1.2 = 30, Theirs: 100, Reduction: 70% > 25%, so limit to 25%: 100 * 0.75 = 75 + corev1.ResourceCPU: *resource.NewQuantity(75, resource.DecimalSI), + // Ours: 5e9 * 1.2 = 6e9, Theirs: 2e10, Reduction: 70% > 25%, so limit to 25%: 2e10 * 0.75 = 1.5e10 + corev1.ResourceMemory: *resource.NewQuantity(15e9, resource.BinarySI), + }, + }, + }, } for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { - useOursIfLarger(&testCase.ours, &testCase.theirs, "test", "build", &defaultReporter, logrus.WithField("test", testCase.name)) + applyRecommendationsBasedOnRecentData(&testCase.ours, &testCase.theirs, "test", "build", true, true, &defaultReporter, logrus.WithField("test", testCase.name)) if diff := cmp.Diff(testCase.theirs, testCase.expected); diff != "" { t.Errorf("%s: got incorrect resources after mutation: %v", testCase.name, diff) } @@ -814,7 +856,7 @@ func TestUseOursIsLarger_ReporterReports(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - useOursIfLarger(&tc.ours, &tc.theirs, "test", "build", &tc.reporter, logrus.WithField("test", tc.name)) + applyRecommendationsBasedOnRecentData(&tc.ours, &tc.theirs, "test", "build", true, true, &tc.reporter, logrus.WithField("test", tc.name)) if diff := cmp.Diff(tc.reporter.called, tc.expected); diff != "" { t.Errorf("actual and expected reporter states don't match, : %v", diff) diff --git a/cmd/pod-scaler/main.go b/cmd/pod-scaler/main.go index 03f7758a5d8..0c24f8f3877 100644 --- a/cmd/pod-scaler/main.go +++ b/cmd/pod-scaler/main.go @@ -67,6 +67,8 @@ type consumerOptions struct { cpuCap int64 memoryCap string cpuPriorityScheduling int64 + authoritativeCPU bool + authoritativeMemory bool } func bindOptions(fs *flag.FlagSet) *options { @@ -89,6 +91,8 @@ func bindOptions(fs *flag.FlagSet) *options { fs.Int64Var(&o.cpuCap, "cpu-cap", 10, "The maximum CPU request value, ex: 10") fs.StringVar(&o.memoryCap, "memory-cap", "20Gi", "The maximum memory request value, ex: '20Gi'") fs.Int64Var(&o.cpuPriorityScheduling, "cpu-priority-scheduling", 8, "Pods with CPU requests at, or above, this value will be admitted with priority scheduling") + fs.BoolVar(&o.authoritativeCPU, "authoritative-cpu", false, "Enable authoritative mode for CPU requests (allows decreasing resources based on recent usage). Defaults to false due to CPU measurements being affected by node contention.") + fs.BoolVar(&o.authoritativeMemory, "authoritative-memory", true, "Enable authoritative mode for memory requests (allows decreasing resources based on recent usage)") o.resultsOptions.Bind(fs) return &o } @@ -268,7 +272,7 @@ func mainAdmission(opts *options, cache Cache) { logrus.WithError(err).Fatal("Failed to create pod-scaler reporter.") } - go admit(opts.port, opts.instrumentationOptions.HealthPort, opts.certDir, client, loaders(cache), opts.mutateResourceLimits, opts.cpuCap, opts.memoryCap, opts.cpuPriorityScheduling, reporter) + go admit(opts.port, opts.instrumentationOptions.HealthPort, opts.certDir, client, loaders(cache), opts.mutateResourceLimits, opts.cpuCap, opts.memoryCap, opts.cpuPriorityScheduling, opts.authoritativeCPU, opts.authoritativeMemory, reporter) } func loaders(cache Cache) map[string][]*cacheReloader { diff --git a/cmd/pod-scaler/resources.go b/cmd/pod-scaler/resources.go index 32d23e6ba12..ce31fe15216 100644 --- a/cmd/pod-scaler/resources.go +++ b/cmd/pod-scaler/resources.go @@ -1,7 +1,10 @@ package main import ( + "os" + "strconv" "sync" + "time" "github.com/openhistogram/circonusllhist" "github.com/sirupsen/logrus" @@ -39,6 +42,27 @@ type resourceServer struct { const ( // cpuRequestQuantile is the quantile of CPU core usage data to use as the CPU request cpuRequestQuantile = 0.8 + // resourceRecommendationWindow is the time window for which we consider historical data + // when calculating resource recommendations. We only look at the past 3 weeks of data when + // figuring out how much resources a pod needs. This way we're making decisions based on what's + // actually happening now, not what happened months ago. Old data can be misleading - maybe a job + // used to need more resources but doesn't anymore, or vice versa. By sticking to recent data, + // we can safely adjust resources up or down based on current usage patterns. + resourceRecommendationWindow = 21 * 24 * time.Hour // 3 weeks + // minCPURequestMilli is the minimum CPU request we'll ever recommend (10 millicores). + // This prevents recommending zero or extremely small values that would cause scheduling issues. + minCPURequestMilli = int64(10) + // minMemoryRequestBytes is the minimum memory request we'll ever recommend (10Mi). + // This prevents recommending zero or extremely small values that would cause issues. + minMemoryRequestBytes = int64(10 * 1024 * 1024) +) + +var ( + // minSamplesForRecommendation is the minimum number of recent data points required before + // we make a recommendation. This prevents recommendations based on too few samples which could + // be unreliable or misleading. Can be overridden via POD_SCALER_MIN_SAMPLES environment variable + // (useful for e2e tests where test data may have fewer samples within the time window). + minSamplesForRecommendation = getMinSamplesForRecommendation() ) func formatCPU() toQuantity { @@ -70,16 +94,44 @@ func (s *resourceServer) digestMemory(data *podscaler.CachedQuery) { type toQuantity func(valueAtQuantile float64) (quantity *resource.Quantity) +//nolint:unparam // quantile parameter is kept for flexibility even though currently both CPU and memory use 0.8 func (s *resourceServer) digestData(data *podscaler.CachedQuery, quantile float64, request corev1.ResourceName, quantity toQuantity) { logger := s.logger.WithField("resource", request) logger.Debugf("Digesting %d identifiers.", len(data.DataByMetaData)) + cutoffTime := time.Now().Add(-resourceRecommendationWindow) + now := time.Now() for meta, fingerprintTimes := range data.DataByMetaData { overall := circonusllhist.New() metaLogger := logger.WithField("meta", meta) metaLogger.Tracef("digesting %d fingerprints", len(fingerprintTimes)) + recentCount := 0 for _, fingerprintTime := range fingerprintTimes { - overall.Merge(data.Data[fingerprintTime.Fingerprint].Histogram()) + if fingerprintTime.Added.After(cutoffTime) { + hist := data.Data[fingerprintTime.Fingerprint].Histogram() + // Weight more recent data more heavily to make the scaler more sensitive to recent runs. + // Past week: 3x weight, 1-2 weeks ago: 2x weight, 2-3 weeks ago: 1x weight. + age := now.Sub(fingerprintTime.Added) + weight := 1 + if age < 7*24*time.Hour { + weight = 3 + } else if age < 14*24*time.Hour { + weight = 2 + } + for i := 0; i < weight; i++ { + overall.Merge(hist) + } + recentCount++ + } } + if recentCount == 0 { + metaLogger.Debugf("no recent fingerprints (within %v), skipping recommendation", resourceRecommendationWindow) + continue + } + if recentCount < minSamplesForRecommendation { + metaLogger.Debugf("only %d recent fingerprints (need at least %d), skipping recommendation", recentCount, minSamplesForRecommendation) + continue + } + metaLogger.Tracef("merged %d recent fingerprints (out of %d total)", recentCount, len(fingerprintTimes)) metaLogger.Trace("merged all fingerprints") valueAtQuantile := overall.ValueAtQuantile(quantile) metaLogger.Trace("locking for value update") @@ -91,6 +143,16 @@ func (s *resourceServer) digestData(data *podscaler.CachedQuery, quantile float6 } } q := quantity(valueAtQuantile) + // Apply minimum thresholds to prevent recommending zero or extremely small values + if request == corev1.ResourceCPU { + if q.MilliValue() < minCPURequestMilli { + q = resource.NewMilliQuantity(minCPURequestMilli, resource.DecimalSI) + } + } else if request == corev1.ResourceMemory { + if q.Value() < minMemoryRequestBytes { + q = resource.NewQuantity(minMemoryRequestBytes, resource.BinarySI) + } + } s.byMetaData[meta].Requests[request] = *q metaLogger.Trace("unlocking for meta") s.lock.Unlock() @@ -104,3 +166,15 @@ func (s *resourceServer) recommendedRequestFor(meta podscaler.FullMetadata) (cor data, ok := s.byMetaData[meta] return data, ok } + +// getMinSamplesForRecommendation returns the minimum number of samples required for a recommendation. +// Defaults to 3 for production use, but can be overridden via POD_SCALER_MIN_SAMPLES environment +// variable (useful for e2e tests where test data may have fewer samples within the time window). +func getMinSamplesForRecommendation() int { + if val := os.Getenv("POD_SCALER_MIN_SAMPLES"); val != "" { + if minSamples, err := strconv.Atoi(val); err == nil && minSamples > 0 { + return minSamples + } + } + return 3 +} diff --git a/cmd/pod-scaler/resources_test.go b/cmd/pod-scaler/resources_test.go new file mode 100644 index 00000000000..f698536d1b1 --- /dev/null +++ b/cmd/pod-scaler/resources_test.go @@ -0,0 +1,191 @@ +package main + +import ( + "testing" + "time" + + "github.com/openhistogram/circonusllhist" + "github.com/prometheus/common/model" + "github.com/sirupsen/logrus" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + + "github.com/openshift/ci-tools/pkg/api" + podscaler "github.com/openshift/ci-tools/pkg/pod-scaler" +) + +func TestDigestData_FiltersOldData(t *testing.T) { + now := time.Now() + fourWeeksAgo := now.Add(-28 * 24 * time.Hour) + oneWeekAgo := now.Add(-7 * 24 * time.Hour) + + // Create test data with fingerprints at different times + oldFingerprint := model.Fingerprint(1) + recentFingerprint1 := model.Fingerprint(2) + recentFingerprint2 := model.Fingerprint(3) + recentFingerprint3 := model.Fingerprint(4) + + // Create histograms with different values so we can verify which ones are used + oldHist := circonusllhist.New() + if err := oldHist.RecordValue(100.0); err != nil { // High value - should be ignored + t.Fatalf("failed to record value: %v", err) + } + + recentHist1 := circonusllhist.New() + if err := recentHist1.RecordValue(50.0); err != nil { // Lower value - should be used + t.Fatalf("failed to record value: %v", err) + } + + recentHist2 := circonusllhist.New() + if err := recentHist2.RecordValue(75.0); err != nil { // Medium value - should be used + t.Fatalf("failed to record value: %v", err) + } + + recentHist3 := circonusllhist.New() + if err := recentHist3.RecordValue(60.0); err != nil { // Another recent value - should be used + t.Fatalf("failed to record value: %v", err) + } + + meta := podscaler.FullMetadata{ + Metadata: api.Metadata{ + Org: "test-org", + Repo: "test-repo", + Branch: "main", + }, + Container: "test-container", + } + + data := &podscaler.CachedQuery{ + Data: map[model.Fingerprint]*circonusllhist.HistogramWithoutLookups{ + oldFingerprint: circonusllhist.NewHistogramWithoutLookups(oldHist), + recentFingerprint1: circonusllhist.NewHistogramWithoutLookups(recentHist1), + recentFingerprint2: circonusllhist.NewHistogramWithoutLookups(recentHist2), + recentFingerprint3: circonusllhist.NewHistogramWithoutLookups(recentHist3), + }, + DataByMetaData: map[podscaler.FullMetadata][]podscaler.FingerprintTime{ + meta: { + // Old data - should be filtered out + { + Fingerprint: oldFingerprint, + Added: fourWeeksAgo, + }, + // Recent data - should be included (need at least 3 samples) + { + Fingerprint: recentFingerprint1, + Added: oneWeekAgo, + }, + { + Fingerprint: recentFingerprint2, + Added: oneWeekAgo, + }, + { + Fingerprint: recentFingerprint3, + Added: oneWeekAgo, + }, + }, + }, + } + + server := &resourceServer{ + logger: logrus.WithField("test", "TestDigestData_FiltersOldData"), + byMetaData: map[podscaler.FullMetadata]corev1.ResourceRequirements{}, + } + + // The digestData function uses time.Now() internally to calculate the cutoff. + // Since we're using actual timestamps (fourWeeksAgo and oneWeekAgo), the function + // will correctly filter based on the current time and resourceRecommendationWindow. + + // Digest the data + server.digestData(data, 0.8, corev1.ResourceCPU, func(valueAtQuantile float64) *resource.Quantity { + return resource.NewMilliQuantity(int64(valueAtQuantile*1000), resource.DecimalSI) + }) + + // Verify that the recommendation was created + recommendation, exists := server.recommendedRequestFor(meta) + if !exists { + t.Fatal("Expected recommendation to exist, but it doesn't") + } + + // The recommendation should be based on recent data only (50.0, 75.0, 60.0) + // At 0.8 quantile with weighted averaging (3x weight for past week), it should be + // closer to 75.0 than 100.0. Note: digestData returns the raw quantile value; + // the 1.2 multiplier is applied later in applyRecommendationsBasedOnRecentData. + cpuRequest := recommendation.Requests[corev1.ResourceCPU] + cpuRequestMilli := cpuRequest.MilliValue() + + // The value should be based on recent data (around 70-100 cores worth of millicores) + // We allow some variance due to histogram quantization + if cpuRequestMilli < 70000 || cpuRequestMilli > 100000 { + t.Errorf("Expected CPU request to be based on recent data (around 70000-100000 millicores, i.e., 70-100 cores), got %d millicores", cpuRequestMilli) + } + + // Verify it's not based on the old high value (100.0) + if cpuRequestMilli > 120000 { + t.Errorf("CPU request appears to be based on old data (100.0), got %d millicores", cpuRequestMilli) + } +} + +func TestDigestData_SkipsWhenNoRecentData(t *testing.T) { + now := time.Now() + fourWeeksAgo := now.Add(-28 * 24 * time.Hour) + fiveWeeksAgo := now.Add(-35 * 24 * time.Hour) + + oldFingerprint1 := model.Fingerprint(1) + oldFingerprint2 := model.Fingerprint(2) + + oldHist1 := circonusllhist.New() + if err := oldHist1.RecordValue(100.0); err != nil { + t.Fatalf("failed to record value: %v", err) + } + + oldHist2 := circonusllhist.New() + if err := oldHist2.RecordValue(200.0); err != nil { + t.Fatalf("failed to record value: %v", err) + } + + meta := podscaler.FullMetadata{ + Metadata: api.Metadata{ + Org: "test-org", + Repo: "test-repo", + Branch: "main", + }, + Container: "test-container", + } + + data := &podscaler.CachedQuery{ + Data: map[model.Fingerprint]*circonusllhist.HistogramWithoutLookups{ + oldFingerprint1: circonusllhist.NewHistogramWithoutLookups(oldHist1), + oldFingerprint2: circonusllhist.NewHistogramWithoutLookups(oldHist2), + }, + DataByMetaData: map[podscaler.FullMetadata][]podscaler.FingerprintTime{ + meta: { + // All data is old - should be skipped + { + Fingerprint: oldFingerprint1, + Added: fourWeeksAgo, + }, + { + Fingerprint: oldFingerprint2, + Added: fiveWeeksAgo, + }, + }, + }, + } + + server := &resourceServer{ + logger: logrus.WithField("test", "TestDigestData_SkipsWhenNoRecentData"), + byMetaData: map[podscaler.FullMetadata]corev1.ResourceRequirements{}, + } + + // Digest the data + server.digestData(data, 0.8, corev1.ResourceCPU, func(valueAtQuantile float64) *resource.Quantity { + return resource.NewMilliQuantity(int64(valueAtQuantile*1000), resource.DecimalSI) + }) + + // Verify that no recommendation was created (since all data is old) + recommendation, exists := server.recommendedRequestFor(meta) + if exists { + t.Errorf("Expected no recommendation when all data is old, but got: %v", recommendation) + } +} diff --git a/cmd/pod-scaler/testdata/zz_fixture_TestMutatePodResources_resources_to_add.diff b/cmd/pod-scaler/testdata/zz_fixture_TestMutatePodResources_resources_to_add.diff index c246d3c226b..0c66bf49719 100644 --- a/cmd/pod-scaler/testdata/zz_fixture_TestMutatePodResources_resources_to_add.diff +++ b/cmd/pod-scaler/testdata/zz_fixture_TestMutatePodResources_resources_to_add.diff @@ -13,10 +13,15 @@ Limits: v1.ResourceList{ - s"cpu": {i: resource.int64Amount{value: 16}, Format: "DecimalSI"}, - s"memory": {i: resource.int64Amount{value: 400000000}, Format: "BinarySI"}, -+ s"memory": {i: resource.int64Amount{value: 600000000}, Format: "BinarySI"}, ++ s"memory": {i: resource.int64Amount{value: 480000000}, Format: "BinarySI"}, }, - Requests: {s"cpu": {i: {value: 8}, Format: "DecimalSI"}, s"memory": {i: {value: 300000000}, Format: "BinarySI"}}, - Claims: nil, + Requests: v1.ResourceList{ +- s"cpu": {i: resource.int64Amount{value: 8}, Format: "DecimalSI"}, ++ s"cpu": {i: resource.int64Amount{value: 6}, s: "6", Format: "DecimalSI"}, +- s"memory": {i: resource.int64Amount{value: 300000000}, Format: "BinarySI"}, ++ s"memory": {i: resource.int64Amount{value: 240000000}, s: "234375Ki", Format: "BinarySI"}, + }, + Claims: nil, }, ResizePolicy: nil, RestartPolicy: nil, @@ -29,7 +34,8 @@ Resources: v1.ResourceRequirements{ Limits: {}, Requests: v1.ResourceList{ - s"cpu": {i: {value: 8}, Format: "DecimalSI"}, +- s"cpu": {i: resource.int64Amount{value: 8}, Format: "DecimalSI"}, ++ s"cpu": {i: resource.int64Amount{value: 6}, s: "6", Format: "DecimalSI"}, - s"memory": {i: resource.int64Amount{value: 100000000}, Format: "BinarySI"}, + s"memory": {i: resource.int64Amount{value: 240000000}, s: "234375Ki", Format: "BinarySI"}, }, @@ -64,7 +70,8 @@ Resources: v1.ResourceRequirements{ Limits: {}, Requests: v1.ResourceList{ - s"cpu": {i: {value: 10}, Format: "DecimalSI"}, +- s"cpu": {i: resource.int64Amount{value: 10}, Format: "DecimalSI"}, ++ s"cpu": {i: resource.int64Amount{value: 7}, Format: "DecimalSI"}, - s"memory": {i: resource.int64Amount{value: 100}, Format: "BinarySI"}, + s"memory": {i: resource.int64Amount{value: 240000000}, s: "234375Ki", Format: "BinarySI"}, }, diff --git a/cmd/pod-scaler/testdata/zz_fixture_TestMutatePodResources_resources_to_add__limits_disabled.diff b/cmd/pod-scaler/testdata/zz_fixture_TestMutatePodResources_resources_to_add__limits_disabled.diff index 26c8adb895e..fc1c32eb3b7 100644 --- a/cmd/pod-scaler/testdata/zz_fixture_TestMutatePodResources_resources_to_add__limits_disabled.diff +++ b/cmd/pod-scaler/testdata/zz_fixture_TestMutatePodResources_resources_to_add__limits_disabled.diff @@ -12,7 +12,8 @@ Resources: v1.ResourceRequirements{ Limits: {s"cpu": {i: {value: 16}, Format: "DecimalSI"}, s"memory": {i: {value: 40000000000}, Format: "BinarySI"}}, Requests: v1.ResourceList{ - s"cpu": {i: {value: 8}, Format: "DecimalSI"}, +- s"cpu": {i: resource.int64Amount{value: 8}, Format: "DecimalSI"}, ++ s"cpu": {i: resource.int64Amount{value: 6}, s: "6", Format: "DecimalSI"}, - s"memory": {i: resource.int64Amount{value: 30000000000}, Format: "BinarySI"}, + s"memory": {i: resource.int64Amount{value: 21474836480}, s: "20Gi", Format: "BinarySI"}, }, @@ -29,7 +30,8 @@ Resources: v1.ResourceRequirements{ Limits: {}, Requests: v1.ResourceList{ - s"cpu": {i: {value: 8}, Format: "DecimalSI"}, +- s"cpu": {i: resource.int64Amount{value: 8}, Format: "DecimalSI"}, ++ s"cpu": {i: resource.int64Amount{value: 6}, s: "6", Format: "DecimalSI"}, - s"memory": {i: resource.int64Amount{value: 10000000000}, Format: "BinarySI"}, + s"memory": {i: resource.int64Amount{value: 21474836480}, s: "20Gi", Format: "BinarySI"}, }, diff --git a/test/e2e/pod-scaler/run/consumer.go b/test/e2e/pod-scaler/run/consumer.go index b6d74fca8cf..f455dfba5e9 100644 --- a/test/e2e/pod-scaler/run/consumer.go +++ b/test/e2e/pod-scaler/run/consumer.go @@ -68,7 +68,7 @@ func Admission(t testhelper.TestingTInterface, dataDir, kubeconfig string, paren return []string{"--port", port, "--health-port", healthPort} }, func(port, healthPort string) []string { return []string{port} - }, clientcmd.RecommendedConfigPathEnvVar+"="+kubeconfig) + }, clientcmd.RecommendedConfigPathEnvVar+"="+kubeconfig, "POD_SCALER_MIN_SAMPLES=1") podScaler.RunFromFrameworkRunner(t, parent, stream) podScalerHost := "https://" + serverHostname + ":" + podScaler.ClientFlags()[0] t.Logf("pod-scaler admission is running at %s", podScalerHost) diff --git a/test/e2e/pod-scaler/run/producer.go b/test/e2e/pod-scaler/run/producer.go index 6dbd0319678..3022ffbceb3 100644 --- a/test/e2e/pod-scaler/run/producer.go +++ b/test/e2e/pod-scaler/run/producer.go @@ -5,6 +5,7 @@ package run import ( "fmt" + "os" "os/exec" "time" @@ -27,6 +28,7 @@ func Producer(t testhelper.TestingTInterface, dataDir, kubeconfigFile string, ig start := time.Now() t.Logf("Running pod-scaler %v", podScalerFlags) podScaler := exec.CommandContext(interrupts.Context(), "pod-scaler", podScalerFlags...) + podScaler.Env = append(os.Environ(), "POD_SCALER_MIN_SAMPLES=1") // Set env var for e2e tests out, err := podScaler.CombinedOutput() if err != nil { t.Fatalf("Failed to run pod-scaler: %v: %s", err, string(out))