Skip to content

Commit 1b2a75d

Browse files
Add capacity buffers scale up events
1 parent 81fe5f7 commit 1b2a75d

File tree

6 files changed

+367
-44
lines changed

6 files changed

+367
-44
lines changed

cluster-autoscaler/main.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,9 +192,10 @@ func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot
192192
capacitybufferClient, capacitybufferClientError = capacityclient.NewCapacityBufferClientFromConfig(restConfig)
193193
}
194194
if capacitybufferClientError == nil && capacitybufferClient != nil {
195-
bufferPodInjector := cbprocessor.NewCapacityBufferPodListProcessor(capacitybufferClient, []string{common.ActiveProvisioningStrategy})
195+
buffersPodsRegistry := cbprocessor.NewDefaultCapacityBuffersFakePodsRegistry()
196+
bufferPodInjector := cbprocessor.NewCapacityBufferPodListProcessor(capacitybufferClient, []string{common.ActiveProvisioningStrategy}, buffersPodsRegistry)
196197
podListProcessor = pods.NewCombinedPodListProcessor([]pods.PodListProcessor{bufferPodInjector, podListProcessor})
197-
opts.Processors.ScaleUpStatusProcessor = status.NewCombinedScaleUpStatusProcessor([]status.ScaleUpStatusProcessor{cbprocessor.NewFakePodsScaleUpStatusProcessor(), opts.Processors.ScaleUpStatusProcessor})
198+
opts.Processors.ScaleUpStatusProcessor = status.NewCombinedScaleUpStatusProcessor([]status.ScaleUpStatusProcessor{cbprocessor.NewFakePodsScaleUpStatusProcessor(buffersPodsRegistry), opts.Processors.ScaleUpStatusProcessor})
198199
}
199200
}
200201

cluster-autoscaler/processors/capacitybuffer/pod_list_processor.go

Lines changed: 46 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
"k8s.io/klog/v2"
2727

2828
apiv1 "k8s.io/api/core/v1"
29-
api_v1 "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/autoscaling.x-k8s.io/v1alpha1"
29+
v1alpha1 "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/autoscaling.x-k8s.io/v1alpha1"
3030
client "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/client"
3131
"k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/common"
3232
buffersfilter "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/filters"
@@ -35,8 +35,8 @@ import (
3535

3636
// Pods annotation keys and values for fake pods created by capacity buffer pod list processor
3737
const (
38-
FakeCapacityBufferPodAnnotationKey = "podType"
39-
FakeCapacityBufferPodAnnotationValue = "capacityBufferFakePod"
38+
CapacityBufferFakePodAnnotationKey = "podType"
39+
CapacityBufferFakePodAnnotationValue = "capacityBufferFakePod"
4040
)
4141

4242
// CapacityBufferPodListProcessor processes the pod lists before scale up
@@ -46,10 +46,27 @@ type CapacityBufferPodListProcessor struct {
4646
statusFilter buffersfilter.Filter
4747
podTemplateGenFilter buffersfilter.Filter
4848
provStrategies map[string]bool
49+
buffersRegistry *capacityBuffersFakePodsRegistry
50+
}
51+
52+
// capacityBuffersFakePodsRegistry a struct that keeps the status of capacity buffer
53+
// the fake pods generated for adding buffer event later
54+
type capacityBuffersFakePodsRegistry struct {
55+
fakePodsUIDToBuffer map[string]*v1alpha1.CapacityBuffer
56+
}
57+
58+
// NewCapacityBuffersFakePodsRegistry returns a new pointer to empty capacityBuffersFakePodsRegistry
59+
func NewCapacityBuffersFakePodsRegistry(fakePodsToBuffers map[string]*v1alpha1.CapacityBuffer) *capacityBuffersFakePodsRegistry {
60+
return &capacityBuffersFakePodsRegistry{fakePodsUIDToBuffer: fakePodsToBuffers}
61+
}
62+
63+
// NewDefaultCapacityBuffersFakePodsRegistry returns a new pointer to empty capacityBuffersFakePodsRegistry
64+
func NewDefaultCapacityBuffersFakePodsRegistry() *capacityBuffersFakePodsRegistry {
65+
return &capacityBuffersFakePodsRegistry{fakePodsUIDToBuffer: map[string]*v1alpha1.CapacityBuffer{}}
4966
}
5067

5168
// NewCapacityBufferPodListProcessor creates a new CapacityRequestPodListProcessor.
52-
func NewCapacityBufferPodListProcessor(client *client.CapacityBufferClient, provStrategies []string) *CapacityBufferPodListProcessor {
69+
func NewCapacityBufferPodListProcessor(client *client.CapacityBufferClient, provStrategies []string, buffersRegistry *capacityBuffersFakePodsRegistry) *CapacityBufferPodListProcessor {
5370
provStrategiesMap := map[string]bool{}
5471
for _, ps := range provStrategies {
5572
provStrategiesMap[ps] = true
@@ -62,6 +79,7 @@ func NewCapacityBufferPodListProcessor(client *client.CapacityBufferClient, prov
6279
}),
6380
podTemplateGenFilter: buffersfilter.NewPodTemplateGenerationChangedFilter(client),
6481
provStrategies: provStrategiesMap,
82+
buffersRegistry: buffersRegistry,
6583
}
6684
}
6785

@@ -79,6 +97,7 @@ func (p *CapacityBufferPodListProcessor) Process(ctx *context.AutoscalingContext
7997
totalFakePods := []*apiv1.Pod{}
8098
for _, buffer := range buffers {
8199
fakePods := p.provision(buffer)
100+
p.updateCapacityBufferRegistry(fakePods, buffer)
82101
totalFakePods = append(totalFakePods, fakePods...)
83102
}
84103
klog.V(2).Infof("Capacity pod processor injecting %v fake pods provisioning %v capacity buffers", len(totalFakePods), len(buffers))
@@ -90,7 +109,16 @@ func (p *CapacityBufferPodListProcessor) Process(ctx *context.AutoscalingContext
90109
func (p *CapacityBufferPodListProcessor) CleanUp() {
91110
}
92111

93-
func (p *CapacityBufferPodListProcessor) provision(buffer *api_v1.CapacityBuffer) []*apiv1.Pod {
112+
func (p *CapacityBufferPodListProcessor) updateCapacityBufferRegistry(fakePods []*apiv1.Pod, buffer *v1alpha1.CapacityBuffer) {
113+
p.buffersRegistry.fakePodsUIDToBuffer = make(map[string]*v1alpha1.CapacityBuffer, len(fakePods))
114+
for _, fakePod := range fakePods {
115+
if p.buffersRegistry != nil {
116+
p.buffersRegistry.fakePodsUIDToBuffer[string(fakePod.UID)] = buffer
117+
}
118+
}
119+
}
120+
121+
func (p *CapacityBufferPodListProcessor) provision(buffer *v1alpha1.CapacityBuffer) []*apiv1.Pod {
94122
if buffer.Status.PodTemplateRef == nil || buffer.Status.Replicas == nil {
95123
return []*apiv1.Pod{}
96124
}
@@ -102,7 +130,7 @@ func (p *CapacityBufferPodListProcessor) provision(buffer *api_v1.CapacityBuffer
102130
p.updateBufferStatus(buffer)
103131
return []*apiv1.Pod{}
104132
}
105-
fakePods, err := makeFakePods(buffer.Name, &podTemplate.Template, int(*replicas))
133+
fakePods, err := makeFakePods(buffer, &podTemplate.Template, int(*replicas))
106134
if err != nil {
107135
common.UpdateBufferStatusToFailedProvisioing(buffer, "FailedToMakeFakePods", fmt.Sprintf("failed to create fake pods with error: %v", err.Error()))
108136
p.updateBufferStatus(buffer)
@@ -113,8 +141,8 @@ func (p *CapacityBufferPodListProcessor) provision(buffer *api_v1.CapacityBuffer
113141
return fakePods
114142
}
115143

116-
func (p *CapacityBufferPodListProcessor) filterBuffersProvStrategy(buffers []*api_v1.CapacityBuffer) []*api_v1.CapacityBuffer {
117-
var filteredBuffers []*api_v1.CapacityBuffer
144+
func (p *CapacityBufferPodListProcessor) filterBuffersProvStrategy(buffers []*v1alpha1.CapacityBuffer) []*v1alpha1.CapacityBuffer {
145+
var filteredBuffers []*v1alpha1.CapacityBuffer
118146
for _, buffer := range buffers {
119147

120148
if buffer.Status.ProvisioningStrategy != nil && p.provStrategies[*buffer.Status.ProvisioningStrategy] {
@@ -124,24 +152,24 @@ func (p *CapacityBufferPodListProcessor) filterBuffersProvStrategy(buffers []*ap
124152
return filteredBuffers
125153
}
126154

127-
func (p *CapacityBufferPodListProcessor) updateBufferStatus(buffer *api_v1.CapacityBuffer) {
155+
func (p *CapacityBufferPodListProcessor) updateBufferStatus(buffer *v1alpha1.CapacityBuffer) {
128156
_, err := p.client.UpdateCapacityBuffer(buffer)
129157
if err != nil {
130158
klog.Errorf("Failed to update buffer status for buffer %v, error: %v", buffer.Name, err.Error())
131159
}
132160
}
133161

134162
// makeFakePods creates podCount number of copies of the sample pod
135-
func makeFakePods(bufferName string, samplePodTemplate *apiv1.PodTemplateSpec, podCount int) ([]*apiv1.Pod, error) {
163+
func makeFakePods(buffer *v1alpha1.CapacityBuffer, samplePodTemplate *apiv1.PodTemplateSpec, podCount int) ([]*apiv1.Pod, error) {
136164
var fakePods []*apiv1.Pod
137165
samplePod := getPodFromTemplate(samplePodTemplate)
138-
samplePod = withCapacityBufferFakePodAnnotation(samplePod)
139166
for i := 1; i <= podCount; i++ {
140-
newPod := samplePod.DeepCopy()
141-
newPod.Name = fmt.Sprintf("capacity-buffer-%s-%d", bufferName, i)
142-
newPod.UID = types.UID(fmt.Sprintf("%s-%d", string(bufferName), i))
143-
newPod.Spec.NodeName = ""
144-
fakePods = append(fakePods, newPod)
167+
fakePod := samplePod.DeepCopy()
168+
fakePod = withCapacityBufferFakePodAnnotation(fakePod)
169+
fakePod.Name = fmt.Sprintf("capacity-buffer-%s-%d", buffer.Name, i)
170+
fakePod.UID = types.UID(fmt.Sprintf("%s-%d", string(buffer.UID), i))
171+
fakePod.Spec.NodeName = ""
172+
fakePods = append(fakePods, fakePod)
145173
}
146174
return fakePods, nil
147175
}
@@ -150,15 +178,15 @@ func withCapacityBufferFakePodAnnotation(pod *apiv1.Pod) *apiv1.Pod {
150178
if pod.Annotations == nil {
151179
pod.Annotations = make(map[string]string, 1)
152180
}
153-
pod.Annotations[FakeCapacityBufferPodAnnotationKey] = FakeCapacityBufferPodAnnotationValue
181+
pod.Annotations[CapacityBufferFakePodAnnotationKey] = CapacityBufferFakePodAnnotationValue
154182
return pod
155183
}
156184

157185
func isFakeCapacityBuffersPod(pod *apiv1.Pod) bool {
158186
if pod.Annotations == nil {
159187
return false
160188
}
161-
return pod.Annotations[FakeCapacityBufferPodAnnotationKey] == FakeCapacityBufferPodAnnotationValue
189+
return pod.Annotations[CapacityBufferFakePodAnnotationKey] == CapacityBufferFakePodAnnotationValue
162190
}
163191

164192
func getPodFromTemplate(template *apiv1.PodTemplateSpec) *apiv1.Pod {

cluster-autoscaler/processors/capacitybuffer/pod_list_processor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ func TestPodListProcessor(t *testing.T) {
143143
fakeBuffersClient := buffersfake.NewSimpleClientset(test.objectsInBuffersClient...)
144144
fakeCapacityBuffersClient, _ := client.NewCapacityBufferClientFromClients(fakeBuffersClient, fakeKubernetesClient, nil, nil)
145145

146-
processor := NewCapacityBufferPodListProcessor(fakeCapacityBuffersClient, []string{testProvStrategyAllowed})
146+
processor := NewCapacityBufferPodListProcessor(fakeCapacityBuffersClient, []string{testProvStrategyAllowed}, NewDefaultCapacityBuffersFakePodsRegistry())
147147
resUnschedulablePods, err := processor.Process(nil, test.unschedulablePods)
148148
assert.Equal(t, err != nil, test.expectError)
149149

cluster-autoscaler/processors/capacitybuffer/scale_up_status_processor.go

Lines changed: 98 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,46 +17,124 @@ limitations under the License.
1717
package capacitybufferpodlister
1818

1919
import (
20+
"fmt"
21+
"strings"
22+
2023
apiv1 "k8s.io/api/core/v1"
24+
v1alpha1 "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/autoscaling.x-k8s.io/v1alpha1"
2125
ca_context "k8s.io/autoscaler/cluster-autoscaler/context"
2226
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
23-
"k8s.io/klog/v2"
2427
)
2528

2629
// FakePodsScaleUpStatusProcessor is a ScaleUpStatusProcessor used for filtering out fake pods from scaleup status.
27-
type FakePodsScaleUpStatusProcessor struct{}
30+
type FakePodsScaleUpStatusProcessor struct {
31+
buffersRegistry *capacityBuffersFakePodsRegistry
32+
}
33+
34+
type bufferInfo struct {
35+
buffer *v1alpha1.CapacityBuffer
36+
numberOfPods int
37+
reasonMessages []string
38+
}
2839

2940
// NewFakePodsScaleUpStatusProcessor return an instance of FakePodsScaleUpStatusProcessor
30-
func NewFakePodsScaleUpStatusProcessor() *FakePodsScaleUpStatusProcessor {
31-
return &FakePodsScaleUpStatusProcessor{}
41+
func NewFakePodsScaleUpStatusProcessor(buffersRegistry *capacityBuffersFakePodsRegistry) *FakePodsScaleUpStatusProcessor {
42+
return &FakePodsScaleUpStatusProcessor{buffersRegistry: buffersRegistry}
3243
}
3344

3445
// Process updates scaleupStatus to remove all capacity buffer fake pods from
3546
// PodsRemainUnschedulable, PodsAwaitEvaluation & PodsTriggeredScaleup
36-
func (a *FakePodsScaleUpStatusProcessor) Process(_ *ca_context.AutoscalingContext, scaleUpStatus *status.ScaleUpStatus) {
37-
scaleUpStatus.PodsRemainUnschedulable = filterFakePods(scaleUpStatus.PodsRemainUnschedulable, func(noScaleUpInfo status.NoScaleUpInfo) *apiv1.Pod { return noScaleUpInfo.Pod }, "PodsRemainUnschedulable")
38-
scaleUpStatus.PodsAwaitEvaluation = filterFakePods(scaleUpStatus.PodsAwaitEvaluation, func(pod *apiv1.Pod) *apiv1.Pod { return pod }, "PodsAwaitEvaluation")
39-
scaleUpStatus.PodsTriggeredScaleUp = filterFakePods(scaleUpStatus.PodsTriggeredScaleUp, func(pod *apiv1.Pod) *apiv1.Pod { return pod }, "PodsTriggeredScaleUp")
47+
func (p *FakePodsScaleUpStatusProcessor) Process(context *ca_context.AutoscalingContext, scaleUpStatus *status.ScaleUpStatus) {
48+
var fakePodsRemainUnschedulable []status.NoScaleUpInfo
49+
var fakePodsTriggeredScaleUp []*apiv1.Pod
50+
51+
scaleUpStatus.PodsRemainUnschedulable, fakePodsRemainUnschedulable = filterOutFakeUnschedulablePods(scaleUpStatus.PodsRemainUnschedulable)
52+
scaleUpStatus.PodsTriggeredScaleUp, fakePodsTriggeredScaleUp = filterOutFakePods(scaleUpStatus.PodsTriggeredScaleUp)
53+
scaleUpStatus.PodsAwaitEvaluation, _ = filterOutFakePods(scaleUpStatus.PodsAwaitEvaluation)
54+
55+
p.createEventsForBuffersWithUnschedulablePods(context, scaleUpStatus, fakePodsRemainUnschedulable)
56+
p.createEventsForBuffersWithPodsTriggeredScaleUp(context, scaleUpStatus, fakePodsTriggeredScaleUp)
57+
}
58+
59+
func (p *FakePodsScaleUpStatusProcessor) createEventsForBuffersWithUnschedulablePods(context *ca_context.AutoscalingContext, scaleUpStatus *status.ScaleUpStatus, fakePodsRemainUnschedulable []status.NoScaleUpInfo) {
60+
if scaleUpStatus.Result != status.ScaleUpSuccessful && scaleUpStatus.Result != status.ScaleUpError {
61+
consideredNodeGroupsMap := status.NodeGroupListToMapById(scaleUpStatus.ConsideredNodeGroups)
62+
buffersInfo := map[string]*bufferInfo{}
63+
for _, noScaleUpInfo := range fakePodsRemainUnschedulable {
64+
parentCapacityBuffer, found := p.buffersRegistry.fakePodsUIDToBuffer[string(noScaleUpInfo.Pod.UID)]
65+
if found {
66+
bufferUID := string(parentCapacityBuffer.UID)
67+
if _, found := buffersInfo[bufferUID]; !found {
68+
buffersInfo[bufferUID] = &bufferInfo{
69+
buffer: parentCapacityBuffer,
70+
}
71+
}
72+
buffersInfo[bufferUID].numberOfPods += 1
73+
buffersInfo[bufferUID].reasonMessages = append(buffersInfo[bufferUID].reasonMessages,
74+
status.ReasonsMessage(scaleUpStatus.Result, noScaleUpInfo, consideredNodeGroupsMap))
75+
}
76+
}
77+
78+
for _, bufferInfo := range buffersInfo {
79+
context.Recorder.Event(bufferInfo.buffer, apiv1.EventTypeNormal, "NotTriggerScaleUp",
80+
fmt.Sprintf("capacity buffer %d fake pods didn't trigger scale-up: %s",
81+
bufferInfo.numberOfPods, strings.Join(bufferInfo.reasonMessages, ",")))
82+
}
83+
}
84+
}
85+
86+
func (p *FakePodsScaleUpStatusProcessor) createEventsForBuffersWithPodsTriggeredScaleUp(context *ca_context.AutoscalingContext, scaleUpStatus *status.ScaleUpStatus, fakePodsTriggeredScaleUp []*apiv1.Pod) {
87+
if len(scaleUpStatus.ScaleUpInfos) > 0 && len(fakePodsTriggeredScaleUp) > 0 {
88+
buffersInfo := map[string]*bufferInfo{}
89+
for _, pod := range fakePodsTriggeredScaleUp {
90+
parentCapacityBuffer, found := p.buffersRegistry.fakePodsUIDToBuffer[string(pod.UID)]
91+
if found {
92+
bufferUID := string(parentCapacityBuffer.UID)
93+
if _, found := buffersInfo[bufferUID]; !found {
94+
buffersInfo[bufferUID] = &bufferInfo{
95+
buffer: parentCapacityBuffer,
96+
}
97+
}
98+
buffersInfo[bufferUID].numberOfPods += 1
99+
}
100+
}
101+
for _, bufferInfo := range buffersInfo {
102+
context.Recorder.Eventf(bufferInfo.buffer, apiv1.EventTypeNormal, "TriggeredScaleUp",
103+
"capacity buffer %d fake pods triggered scale-up: %v", bufferInfo.numberOfPods, scaleUpStatus.ScaleUpInfos)
104+
}
105+
}
40106
}
41107

42-
// filterFakePods removes capacity buffer fake pods from the input list of T using passed getPod(T)
43-
// Returns a list containing only non-fake pods
44-
func filterFakePods[T any](podsWrappers []T, getPod func(T) *apiv1.Pod, resourceName string) []T {
45-
filteredPodsSources := make([]T, 0)
46-
removedPods := make([]*apiv1.Pod, 0)
108+
// filterOutFakeUnschedulablePods filters out NoScaleUpInfo for capacity buffers fake pods
109+
func filterOutFakeUnschedulablePods(noScaleUpInfo []status.NoScaleUpInfo) ([]status.NoScaleUpInfo, []status.NoScaleUpInfo) {
110+
filteredInfo := make([]status.NoScaleUpInfo, 0)
111+
filteredOutInfo := make([]status.NoScaleUpInfo, 0)
47112

48-
for _, podsWrapper := range podsWrappers {
49-
currentPod := getPod(podsWrapper)
113+
for _, info := range noScaleUpInfo {
114+
currentPod := info.Pod
50115
if !isFakeCapacityBuffersPod(currentPod) {
51-
filteredPodsSources = append(filteredPodsSources, podsWrapper)
116+
filteredInfo = append(filteredInfo, info)
52117
continue
53118
}
54-
removedPods = append(removedPods, currentPod)
119+
filteredOutInfo = append(filteredOutInfo, info)
55120
}
121+
return filteredInfo, filteredOutInfo
122+
}
123+
124+
// filterFakePods filters out capacity buffer fake pods
125+
func filterOutFakePods(pods []*apiv1.Pod) ([]*apiv1.Pod, []*apiv1.Pod) {
126+
filteredPods := make([]*apiv1.Pod, 0)
127+
filteredOutPods := make([]*apiv1.Pod, 0)
56128

57-
klog.Infof("Filtered out %d pods from %s", len(removedPods), resourceName)
58-
return filteredPodsSources
129+
for _, pod := range pods {
130+
if !isFakeCapacityBuffersPod(pod) {
131+
filteredPods = append(filteredPods, pod)
132+
continue
133+
}
134+
filteredOutPods = append(filteredOutPods, pod)
135+
}
136+
return filteredPods, filteredOutPods
59137
}
60138

61139
// CleanUp is called at CA termination
62-
func (a *FakePodsScaleUpStatusProcessor) CleanUp() {}
140+
func (p *FakePodsScaleUpStatusProcessor) CleanUp() {}

0 commit comments

Comments
 (0)