Skip to content

Commit 5834a0f

Browse files
committed
process all gateway logic in a single worker
Change-Id: Ie228405370e5e90b7a030c03cd086a379818d36c
1 parent 324eda5 commit 5834a0f

File tree

4 files changed

+144
-261
lines changed

4 files changed

+144
-261
lines changed

pkg/gateway/controller.go

Lines changed: 90 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"net"
77
"sort"
8+
"sync/atomic"
89
"time"
910

1011
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -26,15 +27,17 @@ import (
2627
"google.golang.org/grpc"
2728
"google.golang.org/grpc/keepalive"
2829

29-
clusterservice "github.com/envoyproxy/go-control-plane/envoy/service/cluster/v3"
30-
discoverygrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
31-
endpointservice "github.com/envoyproxy/go-control-plane/envoy/service/endpoint/v3"
32-
listenerservice "github.com/envoyproxy/go-control-plane/envoy/service/listener/v3"
33-
routeservice "github.com/envoyproxy/go-control-plane/envoy/service/route/v3"
34-
runtimeservice "github.com/envoyproxy/go-control-plane/envoy/service/runtime/v3"
35-
secretservice "github.com/envoyproxy/go-control-plane/envoy/service/secret/v3"
36-
envoyproxycache "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
37-
envoyproxyserver "github.com/envoyproxy/go-control-plane/pkg/server/v3"
30+
clusterv3 "github.com/envoyproxy/go-control-plane/envoy/service/cluster/v3"
31+
discoveryv3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
32+
endpointv3 "github.com/envoyproxy/go-control-plane/envoy/service/endpoint/v3"
33+
listenerv3 "github.com/envoyproxy/go-control-plane/envoy/service/listener/v3"
34+
routev3 "github.com/envoyproxy/go-control-plane/envoy/service/route/v3"
35+
runtimev3 "github.com/envoyproxy/go-control-plane/envoy/service/runtime/v3"
36+
secretv3 "github.com/envoyproxy/go-control-plane/envoy/service/secret/v3"
37+
envoyproxytypes "github.com/envoyproxy/go-control-plane/pkg/cache/types"
38+
cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
39+
resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
40+
serverv3 "github.com/envoyproxy/go-control-plane/pkg/server/v3"
3841
)
3942

4043
const (
@@ -69,10 +72,11 @@ type Controller struct {
6972
grpcroutequeue workqueue.TypedRateLimitingInterface[string]
7073

7174
// envoyproxy control plane
72-
xdscache envoyproxycache.SnapshotCache
73-
xdsserver envoyproxyserver.Server
75+
xdscache cachev3.SnapshotCache
76+
xdsserver serverv3.Server
7477
xdsLocalAddress string
7578
xdsLocalPort int
79+
xdsVersion atomic.Uint64
7680

7781
tunnelManager *tunnels.TunnelManager
7882
}
@@ -146,33 +150,28 @@ func New(
146150
_, err = httprouteInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
147151
AddFunc: func(obj interface{}) {
148152
httproute := obj.(*gatewayv1.HTTPRoute)
149-
if !c.isOwned(httproute.Spec.ParentRefs, httproute.Namespace) {
150-
return
151-
}
152-
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
153-
if err == nil {
154-
c.httproutequeue.Add(key)
155-
}
153+
c.processGateways(httproute.Spec.ParentRefs, httproute.Namespace)
156154
},
157155
UpdateFunc: func(oldObj, newObj interface{}) {
158156
httproute := newObj.(*gatewayv1.HTTPRoute)
159-
if !c.isOwned(httproute.Spec.ParentRefs, httproute.Namespace) {
160-
return
161-
}
162-
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(newObj)
163-
if err == nil {
164-
c.httproutequeue.Add(key)
165-
}
157+
c.processGateways(httproute.Spec.ParentRefs, httproute.Namespace)
166158
},
167159
DeleteFunc: func(obj interface{}) {
168-
httproute := obj.(*gatewayv1.HTTPRoute)
169-
if !c.isOwned(httproute.Spec.ParentRefs, httproute.Namespace) {
170-
return
171-
}
172-
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
173-
if err == nil {
174-
c.httproutequeue.Add(key)
160+
httproute, ok := obj.(*gatewayv1.HTTPRoute)
161+
if !ok {
162+
// If we reached here it means the pod was deleted but its final state is unrecorded.
163+
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
164+
if !ok {
165+
runtime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
166+
return
167+
}
168+
httproute, ok = tombstone.Obj.(*gatewayv1.HTTPRoute)
169+
if !ok {
170+
runtime.HandleError(fmt.Errorf("tombstone contained object that is not a GRPCRoute: %#v", obj))
171+
return
172+
}
175173
}
174+
c.processGateways(httproute.Spec.ParentRefs, httproute.Namespace)
176175
},
177176
})
178177
if err != nil {
@@ -182,33 +181,28 @@ func New(
182181
_, err = grpcrouteInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
183182
AddFunc: func(obj interface{}) {
184183
grpcroute := obj.(*gatewayv1.GRPCRoute)
185-
if !c.isOwned(grpcroute.Spec.ParentRefs, grpcroute.Namespace) {
186-
return
187-
}
188-
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
189-
if err == nil {
190-
c.grpcroutequeue.Add(key)
191-
}
184+
c.processGateways(grpcroute.Spec.ParentRefs, grpcroute.Namespace)
192185
},
193186
UpdateFunc: func(oldObj, newObj interface{}) {
194187
grpcroute := newObj.(*gatewayv1.GRPCRoute)
195-
if !c.isOwned(grpcroute.Spec.ParentRefs, grpcroute.Namespace) {
196-
return
197-
}
198-
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(newObj)
199-
if err == nil {
200-
c.grpcroutequeue.Add(key)
201-
}
188+
c.processGateways(grpcroute.Spec.ParentRefs, grpcroute.Namespace)
202189
},
203190
DeleteFunc: func(obj interface{}) {
204-
grpcroute := obj.(*gatewayv1.GRPCRoute)
205-
if !c.isOwned(grpcroute.Spec.ParentRefs, grpcroute.Namespace) {
206-
return
207-
}
208-
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
209-
if err == nil {
210-
c.grpcroutequeue.Add(key)
191+
grpcroute, ok := obj.(*gatewayv1.GRPCRoute)
192+
if !ok {
193+
// If we reached here it means the pod was deleted but its final state is unrecorded.
194+
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
195+
if !ok {
196+
runtime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
197+
return
198+
}
199+
grpcroute, ok = tombstone.Obj.(*gatewayv1.GRPCRoute)
200+
if !ok {
201+
runtime.HandleError(fmt.Errorf("tombstone contained object that is not a GRPCRoute: %#v", obj))
202+
return
203+
}
211204
}
205+
c.processGateways(grpcroute.Spec.ParentRefs, grpcroute.Namespace)
212206
},
213207
})
214208
if err != nil {
@@ -272,8 +266,8 @@ func (c *Controller) Run(ctx context.Context) error {
272266

273267
klog.Info("Starting Envoy proxy controller")
274268
// Create a cache
275-
c.xdscache = envoyproxycache.NewSnapshotCache(false, envoyproxycache.IDHash{}, nil)
276-
c.xdsserver = envoyproxyserver.NewServer(ctx, c.xdscache, nil)
269+
c.xdscache = cachev3.NewSnapshotCache(false, cachev3.IDHash{}, nil)
270+
c.xdsserver = serverv3.NewServer(ctx, c.xdscache, nil)
277271

278272
var grpcOptions []grpc.ServerOption
279273
grpcOptions = append(grpcOptions,
@@ -289,13 +283,13 @@ func (c *Controller) Run(ctx context.Context) error {
289283
)
290284
grpcServer := grpc.NewServer(grpcOptions...)
291285

292-
discoverygrpc.RegisterAggregatedDiscoveryServiceServer(grpcServer, c.xdsserver)
293-
endpointservice.RegisterEndpointDiscoveryServiceServer(grpcServer, c.xdsserver)
294-
clusterservice.RegisterClusterDiscoveryServiceServer(grpcServer, c.xdsserver)
295-
routeservice.RegisterRouteDiscoveryServiceServer(grpcServer, c.xdsserver)
296-
listenerservice.RegisterListenerDiscoveryServiceServer(grpcServer, c.xdsserver)
297-
secretservice.RegisterSecretDiscoveryServiceServer(grpcServer, c.xdsserver)
298-
runtimeservice.RegisterRuntimeDiscoveryServiceServer(grpcServer, c.xdsserver)
286+
discoveryv3.RegisterAggregatedDiscoveryServiceServer(grpcServer, c.xdsserver)
287+
endpointv3.RegisterEndpointDiscoveryServiceServer(grpcServer, c.xdsserver)
288+
clusterv3.RegisterClusterDiscoveryServiceServer(grpcServer, c.xdsserver)
289+
routev3.RegisterRouteDiscoveryServiceServer(grpcServer, c.xdsserver)
290+
listenerv3.RegisterListenerDiscoveryServiceServer(grpcServer, c.xdsserver)
291+
secretv3.RegisterSecretDiscoveryServiceServer(grpcServer, c.xdsserver)
292+
runtimev3.RegisterRuntimeDiscoveryServiceServer(grpcServer, c.xdsserver)
299293

300294
address, err := GetControlPlaneAddress()
301295
if err != nil {
@@ -320,7 +314,7 @@ func (c *Controller) Run(ctx context.Context) error {
320314
c.xdsLocalAddress = address
321315
c.xdsLocalPort = tcpAddr.Port
322316
go func() {
323-
klog.Infof("management server listening on %d\n", c.xdsLocalPort)
317+
klog.Infof("XDS management server listening on %s %d\n", c.xdsLocalAddress, c.xdsLocalPort)
324318
if err = grpcServer.Serve(listener); err != nil {
325319
klog.Infoln(err)
326320
}
@@ -340,40 +334,39 @@ func (c *Controller) Run(ctx context.Context) error {
340334

341335
for i := 0; i < workers; i++ {
342336
go wait.UntilWithContext(ctx, c.runGatewayWorker, time.Second)
343-
go wait.UntilWithContext(ctx, c.runHTTPRouteWorker, time.Second)
344-
go wait.UntilWithContext(ctx, c.runGRPCrouteWorker, time.Second)
345337
}
346338

347339
<-ctx.Done()
348340
klog.Info("Stopping Gateway API controller")
349341
return nil
350342
}
351343

352-
func (c *Controller) isOwned(references []gatewayv1.ParentReference, localNamespace string) bool {
344+
// processGateways enqueues all referenced gateways from the Parent References
345+
func (c *Controller) processGateways(references []gatewayv1.ParentReference, localNamespace string) {
353346
for _, ref := range references {
354-
if ref.Group == nil || ref.Kind == nil {
355-
continue
356-
}
357347
namespace := localNamespace
358348
if ref.Namespace != nil {
359349
namespace = string(*ref.Namespace)
360350
}
361-
if string(*ref.Group) != "gateway.networking.k8s.io" && string(*ref.Group) != "" {
351+
if ref.Group != nil && string(*ref.Group) != "gateway.networking.k8s.io" {
362352
continue
363353
}
364-
if string(*ref.Kind) != "Gateway" {
354+
355+
if ref.Kind != nil && string(*ref.Kind) != "Gateway" {
365356
continue
366357
}
367358

368359
gw, err := c.gatewayLister.Gateways(namespace).Get(string(ref.Name))
369360
if err != nil {
361+
klog.Infof("fail to obtain referenced gateway %s/%s : %v", namespace, ref.Name, err)
370362
continue
371363
}
372-
if gw.Spec.GatewayClassName == GWClassName {
373-
return true
364+
if gw.Spec.GatewayClassName != GWClassName {
365+
klog.V(2).Infof("gateway %s/%s not managed by this controller", namespace, ref.Name)
366+
continue
374367
}
368+
c.gatewayqueue.Add(gw.Namespace + "/" + gw.Name)
375369
}
376-
return false
377370
}
378371

379372
// UpdateConditionIfChanged updates or insert a condition if it has been changed.
@@ -453,3 +446,26 @@ func GetControlPlaneAddress() (string, error) {
453446

454447
return "", fmt.Errorf("no suitable global unicast IPv4 address found on any active non-loopback interface")
455448
}
449+
450+
// UpdateXDSServer changes the resource snapshot held by the XDS server, which
451+
// updates connected clients as required.
452+
func (c *Controller) UpdateXDSServer(ctx context.Context, nodeid string, resources map[resourcev3.Type][]envoyproxytypes.Resource) error {
453+
c.xdsVersion.Add(1)
454+
455+
// Create a snapshot with the passed in resources.
456+
snapshot, err := cachev3.NewSnapshot(fmt.Sprintf("%d", c.xdsVersion.Load()), resources)
457+
if err != nil {
458+
return fmt.Errorf("failed to create new snapshot cache: %v", err)
459+
460+
}
461+
if err := snapshot.Consistent(); err != nil {
462+
return fmt.Errorf("failed to create new resource snapshot: %v", err)
463+
}
464+
465+
// Update the cache with the new resource snapshot.
466+
if err := c.xdscache.SetSnapshot(ctx, nodeid, snapshot); err != nil {
467+
return fmt.Errorf("failed to update resource snapshot in management server: %v", err)
468+
}
469+
klog.V(4).Infof("Updated snapshot cache with resource snapshot...")
470+
return nil
471+
}

pkg/gateway/gateway.go

Lines changed: 54 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,11 @@ import (
88
"os"
99
"time"
1010

11+
corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
12+
listenerv3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
13+
envoyproxytypes "github.com/envoyproxy/go-control-plane/pkg/cache/types"
14+
resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
15+
1116
apierrors "k8s.io/apimachinery/pkg/api/errors"
1217
"k8s.io/apimachinery/pkg/util/runtime"
1318
"k8s.io/client-go/tools/cache"
@@ -29,7 +34,7 @@ func gatewayName(clusterName, namespace, name string) string {
2934
panic(err)
3035
}
3136
hash := h.Sum(nil)
32-
return fmt.Sprintf("%s-%x", constants.ContainerPrefix, hash[:6])
37+
return fmt.Sprintf("%s-gw-%x", constants.ContainerPrefix, hash[:6])
3338
}
3439

3540
func gatewaySimpleName(clusterName, namespace, name string) string {
@@ -41,8 +46,8 @@ func createGateway(clusterName string, localAddress string, localPort int, gatew
4146
name := gatewayName(clusterName, gateway.Namespace, gateway.Name)
4247
simpleName := gatewaySimpleName(clusterName, gateway.Namespace, gateway.Name)
4348
envoyConfig := &configData{
44-
Cluster: simpleName,
4549
ID: name,
50+
Cluster: simpleName,
4651
AdminPort: envoyAdminPort,
4752
ControlPlaneAddress: localAddress,
4853
ControlPlanePort: localPort,
@@ -141,7 +146,7 @@ func (c *Controller) processNextGatewayItem() bool {
141146
func (c *Controller) syncGateway(key string) error {
142147
startTime := time.Now()
143148
defer func() {
144-
klog.V(4).Infof("Finished syncing gateway %q (%v)", key, time.Since(startTime))
149+
klog.V(2).Infof("Finished syncing gateway %q (%v)", key, time.Since(startTime))
145150
}()
146151

147152
namespace, name, err := cache.SplitMetaNamespaceKey(key)
@@ -156,35 +161,65 @@ func (c *Controller) syncGateway(key string) error {
156161
}
157162
containerName := gatewayName(c.clusterName, namespace, name)
158163

164+
// Deleting
159165
if apierrors.IsNotFound(err) {
160166
// Below we will warm up our cache with a Pod, so that we will see a delete for one pod
161167
klog.Infof("Gateway %s does not exist anymore, deleting \n", key)
168+
c.xdscache.ClearSnapshot(containerName)
169+
162170
err := container.Delete(containerName)
163171
if err != nil {
164172
return fmt.Errorf("can not delete container %s for gateway %s/%s on cluster %s : %v", containerName, namespace, name, c.clusterName, err)
165173
}
166-
} else {
167-
klog.Infof("Syncing Gateway %s\n", gw.GetName())
168-
if !container.IsRunning(containerName) {
169-
klog.Infof("container %s for gateway is not running", name)
170-
if container.Exist(containerName) {
171-
err := container.Delete(containerName)
172-
if err != nil {
173-
return err
174-
}
175-
}
176-
}
177-
if !container.Exist(containerName) {
178-
klog.V(2).Infof("creating container %s for gateway %s/%s on cluster %s", containerName, namespace, name, c.clusterName)
179-
enableTunnels := c.tunnelManager != nil || config.DefaultConfig.LoadBalancerConnectivity == config.Portmap
180-
err := createGateway(c.clusterName, c.xdsLocalAddress, c.xdsLocalPort, gw, enableTunnels)
174+
return nil
175+
}
176+
// Create or Update
177+
klog.Infof("Syncing Gateway %s\n", gw.GetName())
178+
if !container.IsRunning(containerName) {
179+
klog.Infof("container %s for gateway is not running", name)
180+
if container.Exist(containerName) {
181+
err := container.Delete(containerName)
181182
if err != nil {
182183
return err
183184
}
184185
}
186+
}
187+
if !container.Exist(containerName) {
188+
klog.V(2).Infof("creating container %s for gateway %s/%s on cluster %s", containerName, namespace, name, c.clusterName)
189+
enableTunnels := c.tunnelManager != nil || config.DefaultConfig.LoadBalancerConnectivity == config.Portmap
190+
err := createGateway(c.clusterName, c.xdsLocalAddress, c.xdsLocalPort, gw, enableTunnels)
191+
if err != nil {
192+
return err
193+
}
194+
}
195+
196+
// Update configuration
197+
resources := map[resourcev3.Type][]envoyproxytypes.Resource{}
198+
199+
for _, listener := range gw.Spec.Listeners {
200+
// Determine the Envoy protocol based on the Gateway API protocol
201+
var envoyProto corev3.SocketAddress_Protocol
202+
switch listener.Protocol {
203+
case gatewayv1.UDPProtocolType:
204+
envoyProto = corev3.SocketAddress_UDP
205+
default: // TCP, HTTP, HTTPS, TLS all use TCP at the transport layer
206+
envoyProto = corev3.SocketAddress_TCP
207+
}
208+
209+
resources[resourcev3.ListenerType] = append(resources[resourcev3.ListenerType], &listenerv3.Listener{
210+
Name: string(listener.Name),
211+
Address: &corev3.Address{Address: &corev3.Address_SocketAddress{SocketAddress: &corev3.SocketAddress{
212+
Protocol: envoyProto,
213+
Address: "0.0.0.0", // Or "::" for IPv6, or specific IP if needed
214+
PortSpecifier: &corev3.SocketAddress_PortValue{
215+
PortValue: uint32(listener.Port),
216+
},
217+
}}},
218+
})
185219

186220
}
187-
return nil
221+
return c.UpdateXDSServer(context.Background(), containerName, resources)
222+
188223
}
189224

190225
// handleErr checks if an error happened and makes sure we will retry later.

0 commit comments

Comments
 (0)