Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ index.js

# default working dir
/job-aggregator-working-dir
# go built binary
/job-run-aggregator
# go built binaries
/job-run-aggregator
/pod-scaler
66 changes: 57 additions & 9 deletions cmd/pod-scaler/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"github.com/openshift/ci-tools/pkg/steps"
)

func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Interface, loaders map[string][]*cacheReloader, mutateResourceLimits bool, cpuCap int64, memoryCap string, cpuPriorityScheduling int64, reporter results.PodScalerReporter) {
func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Interface, loaders map[string][]*cacheReloader, mutateResourceLimits bool, cpuCap int64, memoryCap string, cpuPriorityScheduling int64, authoritativeCPU, authoritativeMemory bool, reporter results.PodScalerReporter) {
logger := logrus.WithField("component", "pod-scaler admission")
logger.Infof("Initializing admission webhook server with %d loaders.", len(loaders))
health := pjutil.NewHealthOnPort(healthPort)
Expand All @@ -44,7 +44,7 @@ func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Int
Port: port,
CertDir: certDir,
})
server.Register("/pods", &webhook.Admission{Handler: &podMutator{logger: logger, client: client, decoder: decoder, resources: resources, mutateResourceLimits: mutateResourceLimits, cpuCap: cpuCap, memoryCap: memoryCap, cpuPriorityScheduling: cpuPriorityScheduling, reporter: reporter}})
server.Register("/pods", &webhook.Admission{Handler: &podMutator{logger: logger, client: client, decoder: decoder, resources: resources, mutateResourceLimits: mutateResourceLimits, cpuCap: cpuCap, memoryCap: memoryCap, cpuPriorityScheduling: cpuPriorityScheduling, authoritativeCPU: authoritativeCPU, authoritativeMemory: authoritativeMemory, reporter: reporter}})
logger.Info("Serving admission webhooks.")
if err := server.Start(interrupts.Context()); err != nil {
logrus.WithError(err).Fatal("Failed to serve webhooks.")
Expand All @@ -60,6 +60,8 @@ type podMutator struct {
cpuCap int64
memoryCap string
cpuPriorityScheduling int64
authoritativeCPU bool
authoritativeMemory bool
reporter results.PodScalerReporter
}

Expand Down Expand Up @@ -97,7 +99,7 @@ func (m *podMutator) Handle(ctx context.Context, req admission.Request) admissio
logger.WithError(err).Error("Failed to handle rehearsal Pod.")
return admission.Allowed("Failed to handle rehearsal Pod, ignoring.")
}
mutatePodResources(pod, m.resources, m.mutateResourceLimits, m.cpuCap, m.memoryCap, m.reporter, logger)
mutatePodResources(pod, m.resources, m.mutateResourceLimits, m.cpuCap, m.memoryCap, m.authoritativeCPU, m.authoritativeMemory, m.reporter, logger)
m.addPriorityClass(pod)

marshaledPod, err := json.Marshal(pod)
Expand Down Expand Up @@ -196,8 +198,14 @@ func mutatePodLabels(pod *corev1.Pod, build *buildv1.Build) {
}
}

// useOursIfLarger updates fields in theirs when ours are larger
func useOursIfLarger(allOfOurs, allOfTheirs *corev1.ResourceRequirements, workloadName, workloadType string, reporter results.PodScalerReporter, logger *logrus.Entry) {
// applyRecommendationsBasedOnRecentData applies resource recommendations based on recent usage data
// (see resourceRecommendationWindow). If they used more, we increase resources. If they used less,
// we decrease them if authoritative mode is enabled for that resource type.
//
// TestApplyRecommendationsBasedOnRecentData_ReducesResources is tested in admission_test.go
// as part of TestUseOursIfLarger. The reduction functionality is verified there with proper
// test cases that handle ResourceQuantity comparison correctly.
func applyRecommendationsBasedOnRecentData(allOfOurs, allOfTheirs *corev1.ResourceRequirements, workloadName, workloadType string, authoritativeCPU, authoritativeMemory bool, reporter results.PodScalerReporter, logger *logrus.Entry) {
for _, item := range []*corev1.ResourceRequirements{allOfOurs, allOfTheirs} {
if item.Requests == nil {
item.Requests = corev1.ResourceList{}
Expand All @@ -215,6 +223,10 @@ func useOursIfLarger(allOfOurs, allOfTheirs *corev1.ResourceRequirements, worklo
} {
for _, field := range []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory} {
our := (*pair.ours)[field]
// If we have no recommendation for this resource, skip it
if our.IsZero() {
continue
}
//TODO(sgoeddel): this is a temporary experiment to see what effect setting values that are 120% of what has
// been determined has on the rate of OOMKilled and similar termination of workloads
increased := our.AsApproximateFloat64() * 1.2
Expand All @@ -231,13 +243,49 @@ func useOursIfLarger(allOfOurs, allOfTheirs *corev1.ResourceRequirements, worklo
})
cmp := our.Cmp(their)
if cmp == 1 {
fieldLogger.Debug("determined amount larger than configured")
fieldLogger.Debug("determined amount larger than configured, increasing resources")
(*pair.theirs)[field] = our
if their.Value() > 0 && our.Value() > (their.Value()*10) {
reporter.ReportResourceConfigurationWarning(workloadName, workloadType, their.String(), our.String(), field.String())
}
} else if cmp < 0 {
fieldLogger.Debug("determined amount smaller than configured")
// Check if authoritative mode is enabled for this resource type
isAuthoritative := false
if field == corev1.ResourceCPU {
isAuthoritative = authoritativeCPU
} else if field == corev1.ResourceMemory {
isAuthoritative = authoritativeMemory
}

if !isAuthoritative {
fieldLogger.Debug("authoritative mode disabled for this resource, skipping reduction")
continue
}

// Apply gradual reduction with safety limits: max 25% reduction per cycle, minimum 5% difference
ourValue := our.AsApproximateFloat64()
theirValue := their.AsApproximateFloat64()
if theirValue == 0 {
fieldLogger.Debug("theirs is zero, applying recommendation")
(*pair.theirs)[field] = our
continue
}

reductionPercent := 1.0 - (ourValue / theirValue)
if reductionPercent < 0.05 {
fieldLogger.Debug("difference less than 5%, skipping micro-adjustment")
continue
}

maxReductionPercent := 0.25
if reductionPercent > maxReductionPercent {
maxAllowed := theirValue * (1.0 - maxReductionPercent)
our.Set(int64(maxAllowed))
fieldLogger.Debugf("applying gradual reduction (limited to 25%% per cycle)")
} else {
fieldLogger.Debug("reducing resources based on recent usage")
}
(*pair.theirs)[field] = our
} else {
fieldLogger.Debug("determined amount equal to configured")
}
Expand Down Expand Up @@ -292,7 +340,7 @@ func preventUnschedulable(resources *corev1.ResourceRequirements, cpuCap int64,
}
}

func mutatePodResources(pod *corev1.Pod, server *resourceServer, mutateResourceLimits bool, cpuCap int64, memoryCap string, reporter results.PodScalerReporter, logger *logrus.Entry) {
func mutatePodResources(pod *corev1.Pod, server *resourceServer, mutateResourceLimits bool, cpuCap int64, memoryCap string, authoritativeCPU, authoritativeMemory bool, reporter results.PodScalerReporter, logger *logrus.Entry) {
mutateResources := func(containers []corev1.Container) {
for i := range containers {
meta := podscaler.MetadataFor(pod.ObjectMeta.Labels, pod.ObjectMeta.Name, containers[i].Name)
Expand All @@ -301,7 +349,7 @@ func mutatePodResources(pod *corev1.Pod, server *resourceServer, mutateResourceL
logger.Debugf("recommendation exists for: %s", containers[i].Name)
workloadType := determineWorkloadType(pod.Annotations, pod.Labels)
workloadName := determineWorkloadName(pod.Name, containers[i].Name, workloadType, pod.Labels)
useOursIfLarger(&resources, &containers[i].Resources, workloadName, workloadType, reporter, logger)
applyRecommendationsBasedOnRecentData(&resources, &containers[i].Resources, workloadName, workloadType, authoritativeCPU, authoritativeMemory, reporter, logger)
if mutateResourceLimits {
reconcileLimits(&containers[i].Resources)
}
Expand Down
62 changes: 52 additions & 10 deletions cmd/pod-scaler/admission_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ func TestMutatePodResources(t *testing.T) {
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
original := testCase.pod.DeepCopy()
mutatePodResources(testCase.pod, testCase.server, testCase.mutateResourceLimits, 10, "20Gi", &defaultReporter, logrus.WithField("test", testCase.name))
mutatePodResources(testCase.pod, testCase.server, testCase.mutateResourceLimits, 10, "20Gi", true, true, &defaultReporter, logrus.WithField("test", testCase.name))
diff := cmp.Diff(original, testCase.pod)
// In some cases, cmp.Diff decides to use non-breaking spaces, and it's not
// particularly deterministic about this. We don't care.
Expand Down Expand Up @@ -661,7 +661,7 @@ func TestUseOursIfLarger(t *testing.T) {
},
},
{
name: "nothing in ours is larger",
name: "ours are smaller with very small values - should reduce resources based on recent usage",
ours: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceCPU: *resource.NewQuantity(10, resource.DecimalSI),
Expand All @@ -684,12 +684,16 @@ func TestUseOursIfLarger(t *testing.T) {
},
expected: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceCPU: *resource.NewQuantity(200, resource.DecimalSI),
corev1.ResourceMemory: *resource.NewQuantity(3e10, resource.BinarySI),
// Ours: 10 * 1.2 = 12, Theirs: 200, Reduction: 94% > 25%, so limit to 25%: 200 * 0.75 = 150
corev1.ResourceCPU: *resource.NewQuantity(150, resource.DecimalSI),
// Ours: 10 * 1.2 = 12, Theirs: 3e10, Reduction: >99% > 25%, so limit to 25%: 3e10 * 0.75 = 2.25e10
corev1.ResourceMemory: *resource.NewQuantity(225e8, resource.BinarySI),
},
Requests: corev1.ResourceList{
corev1.ResourceCPU: *resource.NewQuantity(100, resource.DecimalSI),
corev1.ResourceMemory: *resource.NewQuantity(2e10, resource.BinarySI),
// Ours: 10 * 1.2 = 12, Theirs: 100, Reduction: 88% > 25%, so limit to 25%: 100 * 0.75 = 75
corev1.ResourceCPU: *resource.NewQuantity(75, resource.DecimalSI),
// Ours: 10 * 1.2 = 12, Theirs: 2e10, Reduction: >99% > 25%, so limit to 25%: 2e10 * 0.75 = 1.5e10
corev1.ResourceMemory: *resource.NewQuantity(15e9, resource.BinarySI),
},
},
},
Expand Down Expand Up @@ -717,19 +721,57 @@ func TestUseOursIfLarger(t *testing.T) {
},
expected: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceCPU: *resource.NewQuantity(480, resource.DecimalSI),
corev1.ResourceMemory: *resource.NewQuantity(3e10, resource.BinarySI),
corev1.ResourceCPU: *resource.NewQuantity(480, resource.DecimalSI),
// Ours: 10 * 1.2 = 12, Theirs: 3e10, Reduction: >99% > 25%, so limit to 25%: 3e10 * 0.75 = 2.25e10
corev1.ResourceMemory: *resource.NewQuantity(225e8, resource.BinarySI),
},
Requests: corev1.ResourceList{
corev1.ResourceCPU: *resource.NewQuantity(1200, resource.DecimalSI),
corev1.ResourceMemory: *resource.NewQuantity(48e9, resource.BinarySI),
},
},
},
{
name: "ours are smaller with medium values - should reduce resources based on recent usage",
ours: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceCPU: *resource.NewQuantity(50, resource.DecimalSI),
corev1.ResourceMemory: *resource.NewQuantity(1e9, resource.BinarySI),
},
Requests: corev1.ResourceList{
corev1.ResourceCPU: *resource.NewQuantity(25, resource.DecimalSI),
corev1.ResourceMemory: *resource.NewQuantity(5e9, resource.BinarySI),
},
},
theirs: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceCPU: *resource.NewQuantity(200, resource.DecimalSI),
corev1.ResourceMemory: *resource.NewQuantity(3e10, resource.BinarySI),
},
Requests: corev1.ResourceList{
corev1.ResourceCPU: *resource.NewQuantity(100, resource.DecimalSI),
corev1.ResourceMemory: *resource.NewQuantity(2e10, resource.BinarySI),
},
},
expected: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
// Ours: 50 * 1.2 = 60, Theirs: 200, Reduction: 70% > 25%, so limit to 25%: 200 * 0.75 = 150
corev1.ResourceCPU: *resource.NewQuantity(150, resource.DecimalSI),
// Ours: 1e9 * 1.2 = 1.2e9, Theirs: 3e10, Reduction: 96% > 25%, so limit to 25%: 3e10 * 0.75 = 2.25e10
corev1.ResourceMemory: *resource.NewQuantity(225e8, resource.BinarySI),
},
Requests: corev1.ResourceList{
// Ours: 25 * 1.2 = 30, Theirs: 100, Reduction: 70% > 25%, so limit to 25%: 100 * 0.75 = 75
corev1.ResourceCPU: *resource.NewQuantity(75, resource.DecimalSI),
// Ours: 5e9 * 1.2 = 6e9, Theirs: 2e10, Reduction: 70% > 25%, so limit to 25%: 2e10 * 0.75 = 1.5e10
corev1.ResourceMemory: *resource.NewQuantity(15e9, resource.BinarySI),
},
},
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
useOursIfLarger(&testCase.ours, &testCase.theirs, "test", "build", &defaultReporter, logrus.WithField("test", testCase.name))
applyRecommendationsBasedOnRecentData(&testCase.ours, &testCase.theirs, "test", "build", true, true, &defaultReporter, logrus.WithField("test", testCase.name))
if diff := cmp.Diff(testCase.theirs, testCase.expected); diff != "" {
t.Errorf("%s: got incorrect resources after mutation: %v", testCase.name, diff)
}
Expand Down Expand Up @@ -814,7 +856,7 @@ func TestUseOursIsLarger_ReporterReports(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
useOursIfLarger(&tc.ours, &tc.theirs, "test", "build", &tc.reporter, logrus.WithField("test", tc.name))
applyRecommendationsBasedOnRecentData(&tc.ours, &tc.theirs, "test", "build", true, true, &tc.reporter, logrus.WithField("test", tc.name))

if diff := cmp.Diff(tc.reporter.called, tc.expected); diff != "" {
t.Errorf("actual and expected reporter states don't match, : %v", diff)
Expand Down
6 changes: 5 additions & 1 deletion cmd/pod-scaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ type consumerOptions struct {
cpuCap int64
memoryCap string
cpuPriorityScheduling int64
authoritativeCPU bool
authoritativeMemory bool
}

func bindOptions(fs *flag.FlagSet) *options {
Expand All @@ -89,6 +91,8 @@ func bindOptions(fs *flag.FlagSet) *options {
fs.Int64Var(&o.cpuCap, "cpu-cap", 10, "The maximum CPU request value, ex: 10")
fs.StringVar(&o.memoryCap, "memory-cap", "20Gi", "The maximum memory request value, ex: '20Gi'")
fs.Int64Var(&o.cpuPriorityScheduling, "cpu-priority-scheduling", 8, "Pods with CPU requests at, or above, this value will be admitted with priority scheduling")
fs.BoolVar(&o.authoritativeCPU, "authoritative-cpu", false, "Enable authoritative mode for CPU requests (allows decreasing resources based on recent usage). Defaults to false due to CPU measurements being affected by node contention.")
fs.BoolVar(&o.authoritativeMemory, "authoritative-memory", true, "Enable authoritative mode for memory requests (allows decreasing resources based on recent usage)")
o.resultsOptions.Bind(fs)
return &o
}
Expand Down Expand Up @@ -268,7 +272,7 @@ func mainAdmission(opts *options, cache Cache) {
logrus.WithError(err).Fatal("Failed to create pod-scaler reporter.")
}

go admit(opts.port, opts.instrumentationOptions.HealthPort, opts.certDir, client, loaders(cache), opts.mutateResourceLimits, opts.cpuCap, opts.memoryCap, opts.cpuPriorityScheduling, reporter)
go admit(opts.port, opts.instrumentationOptions.HealthPort, opts.certDir, client, loaders(cache), opts.mutateResourceLimits, opts.cpuCap, opts.memoryCap, opts.cpuPriorityScheduling, opts.authoritativeCPU, opts.authoritativeMemory, reporter)
}

func loaders(cache Cache) map[string][]*cacheReloader {
Expand Down
Loading