Skip to content

Commit af71b59

Browse files
committed
Add peer fields to expander/grpcplugin to replace inlined non proto-message Kubernetes objects
1 parent 28c2063 commit af71b59

File tree

4 files changed

+155
-67
lines changed

4 files changed

+155
-67
lines changed

cluster-autoscaler/expander/grpcplugin/grpc_client.go

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,13 @@ func (g *grpcclientstrategy) BestOptions(expansionOptions []expander.Option, nod
8080

8181
// Transform inputs to gRPC inputs
8282
grpcOptionsSlice, nodeGroupIDOptionMap := populateOptionsForGRPC(expansionOptions)
83-
grpcNodeMap := populateNodeInfoForGRPC(nodeInfo)
83+
grpcNodeMap, grpcNodeBytesMap := populateNodeInfoForGRPC(nodeInfo)
8484

8585
// call gRPC server to get BestOption
8686
klog.V(2).Infof("GPRC call of best options to server with %v options", len(nodeGroupIDOptionMap))
8787
ctx, cancel := context.WithTimeout(context.Background(), gRPCTimeout)
8888
defer cancel()
89-
bestOptionsResponse, err := g.grpcClient.BestOptions(ctx, &protos.BestOptionsRequest{Options: grpcOptionsSlice, NodeMap: grpcNodeMap})
89+
bestOptionsResponse, err := g.grpcClient.BestOptions(ctx, &protos.BestOptionsRequest{Options: grpcOptionsSlice, NodeMap: grpcNodeMap, NodeBytesMap: grpcNodeBytesMap})
9090
if err != nil {
9191
klog.V(4).Infof("GRPC call failed, no options filtered: %v", err)
9292
return expansionOptions
@@ -117,12 +117,26 @@ func populateOptionsForGRPC(expansionOptions []expander.Option) ([]*protos.Optio
117117
}
118118

119119
// populateNodeInfoForGRPC looks at the corresponding v1.Node object per NodeInfo object, and populates the grpcNodeInfoMap with these to pass over grpc
120-
func populateNodeInfoForGRPC(nodeInfos map[string]*framework.NodeInfo) map[string]*v1.Node {
120+
func populateNodeInfoForGRPC(nodeInfos map[string]*framework.NodeInfo) (map[string]*v1.Node, map[string][]byte) {
121121
grpcNodeInfoMap := make(map[string]*v1.Node)
122+
grpcNodeBytesMap := make(map[string][]byte)
122123
for nodeId, nodeInfo := range nodeInfos {
123-
grpcNodeInfoMap[nodeId] = nodeInfo.Node()
124+
node := nodeInfo.Node()
125+
grpcNodeInfoMap[nodeId] = node
126+
127+
// if we're still accumulating node bytes
128+
if grpcNodeBytesMap != nil {
129+
// try to serialize to proto bytes
130+
nodeBytes, err := node.Marshal()
131+
if err != nil {
132+
// unexpected proto serialization error, avoid sending nodeBytes map at all
133+
grpcNodeBytesMap = nil
134+
} else {
135+
grpcNodeBytesMap[nodeId] = nodeBytes
136+
}
137+
}
124138
}
125-
return grpcNodeInfoMap
139+
return grpcNodeInfoMap, grpcNodeBytesMap
126140
}
127141

128142
func transformAndSanitizeOptionsFromGRPC(bestOptionsResponseOptions []*protos.Option, nodeGroupIDOptionMap map[string]expander.Option) []expander.Option {
@@ -143,5 +157,17 @@ func transformAndSanitizeOptionsFromGRPC(bestOptionsResponseOptions []*protos.Op
143157
}
144158

145159
func newOptionMessage(nodeGroupId string, nodeCount int32, debug string, pods []*v1.Pod) *protos.Option {
146-
return &protos.Option{NodeGroupId: nodeGroupId, NodeCount: nodeCount, Debug: debug, Pod: pods}
160+
var podsBytes [][]byte
161+
if len(pods) > 0 {
162+
podsBytes = make([][]byte, 0, len(pods))
163+
for _, pod := range pods {
164+
podBytes, err := pod.Marshal()
165+
if err != nil {
166+
podsBytes = nil
167+
break
168+
}
169+
podsBytes = append(podsBytes, podBytes)
170+
}
171+
}
172+
return &protos.Option{NodeGroupId: nodeGroupId, NodeCount: nodeCount, Debug: debug, Pod: pods, PodBytes: podsBytes}
147173
}

cluster-autoscaler/expander/grpcplugin/grpc_client_test.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
"github.com/golang/mock/gomock"
2424
"github.com/stretchr/testify/assert"
25+
2526
v1 "k8s.io/api/core/v1"
2627
"k8s.io/autoscaler/cluster-autoscaler/expander/grpcplugin/protos"
2728
"k8s.io/autoscaler/cluster-autoscaler/expander/mocks"
@@ -135,13 +136,17 @@ func makeFakeNodeInfos() map[string]*framework.NodeInfo {
135136

136137
func TestPopulateNodeInfoForGRPC(t *testing.T) {
137138
nodeInfos := makeFakeNodeInfos()
138-
grpcNodeInfoMap := populateNodeInfoForGRPC(nodeInfos)
139+
grpcNodeInfoMap, grpcNodeBytesMap := populateNodeInfoForGRPC(nodeInfos)
139140

140141
expectedGrpcNodeInfoMap := make(map[string]*v1.Node)
142+
expectedGrpcNodeBytesMap := make(map[string][]byte)
141143
for i, opt := range options {
142144
expectedGrpcNodeInfoMap[opt.NodeGroup.Id()] = nodes[i]
145+
expectedGrpcNodeBytesMap[opt.NodeGroup.Id()], _ = nodes[i].Marshal()
143146
}
147+
144148
assert.Equal(t, expectedGrpcNodeInfoMap, grpcNodeInfoMap)
149+
assert.Equal(t, expectedGrpcNodeBytesMap, grpcNodeBytesMap)
145150
}
146151

147152
func TestValidTransformAndSanitizeOptionsFromGRPC(t *testing.T) {
@@ -179,12 +184,15 @@ func TestBestOptionsValid(t *testing.T) {
179184

180185
nodeInfos := makeFakeNodeInfos()
181186
grpcNodeInfoMap := make(map[string]*v1.Node)
187+
grpcNodeBytesMap := make(map[string][]byte)
182188
for i, opt := range options {
183189
grpcNodeInfoMap[opt.NodeGroup.Id()] = nodes[i]
190+
grpcNodeBytesMap[opt.NodeGroup.Id()], _ = nodes[i].Marshal()
184191
}
185192
expectedBestOptionsReq := &protos.BestOptionsRequest{
186-
Options: []*protos.Option{&grpcEoT2Micro, &grpcEoT2Large, &grpcEoT3Large, &grpcEoM44XLarge},
187-
NodeMap: grpcNodeInfoMap,
193+
Options: []*protos.Option{&grpcEoT2Micro, &grpcEoT2Large, &grpcEoT3Large, &grpcEoM44XLarge},
194+
NodeMap: grpcNodeInfoMap,
195+
NodeBytesMap: grpcNodeBytesMap,
188196
}
189197

190198
mockClient.EXPECT().BestOptions(
@@ -220,12 +228,14 @@ func TestBestOptionsEmpty(t *testing.T) {
220228
},
221229
}
222230
for _, tc := range testCases {
223-
grpcNodeInfoMap := populateNodeInfoForGRPC(makeFakeNodeInfos())
231+
grpcNodeInfoMap, grpcNodeBytesMap := populateNodeInfoForGRPC(makeFakeNodeInfos())
232+
assert.NotNil(t, grpcNodeBytesMap)
224233
mockClient.EXPECT().BestOptions(
225234
gomock.Any(), gomock.Eq(
226235
&protos.BestOptionsRequest{
227-
Options: []*protos.Option{&grpcEoT2Micro, &grpcEoT2Large, &grpcEoT3Large, &grpcEoM44XLarge},
228-
NodeMap: grpcNodeInfoMap,
236+
Options: []*protos.Option{&grpcEoT2Micro, &grpcEoT2Large, &grpcEoT3Large, &grpcEoM44XLarge},
237+
NodeMap: grpcNodeInfoMap,
238+
NodeBytesMap: grpcNodeBytesMap,
229239
})).Return(&tc.mockResponse, nil)
230240
resp := g.BestOptions(options, makeFakeNodeInfos())
231241

@@ -284,13 +294,15 @@ func TestBestOptionsErrors(t *testing.T) {
284294
},
285295
}
286296
for _, tc := range testCases {
287-
grpcNodeInfoMap := populateNodeInfoForGRPC(tc.nodeInfo)
297+
grpcNodeInfoMap, grpcNodeBytesMap := populateNodeInfoForGRPC(tc.nodeInfo)
298+
assert.NotNil(t, grpcNodeBytesMap)
288299
if tc.client.grpcClient != nil {
289300
mockClient.EXPECT().BestOptions(
290301
gomock.Any(), gomock.Eq(
291302
&protos.BestOptionsRequest{
292-
Options: []*protos.Option{&grpcEoT2Micro, &grpcEoT2Large, &grpcEoT3Large, &grpcEoM44XLarge},
293-
NodeMap: grpcNodeInfoMap,
303+
Options: []*protos.Option{&grpcEoT2Micro, &grpcEoT2Large, &grpcEoT3Large, &grpcEoM44XLarge},
304+
NodeMap: grpcNodeInfoMap,
305+
NodeBytesMap: grpcNodeBytesMap,
294306
})).Return(&tc.mockResponse, tc.errResponse)
295307
}
296308
resp := tc.client.BestOptions(options, tc.nodeInfo)

0 commit comments

Comments
 (0)