Skip to content

Commit 8110884

Browse files
authored
xdsclient: fix the flaky ADS stream restart test (#8631)
The ADS stream restart test can be flaky for the following reason: - It requests a CDS resource and unrequests it before the stream breaks. - And then once the stream restarts, it verifies that this resource is not requested again. - But the ACK for this resource may or may not be received at the management server before the stream breaks. This can falsely cause the test to conclude that the request was re-requested after the restart. This PR changes the test in the following ways: - Use a single resource - Verify ACK before the stream is restarted Ran a million times without flakes on Forge. RELEASE NOTES: NONE
1 parent 74b65af commit 8110884

File tree

1 file changed

+61
-115
lines changed

1 file changed

+61
-115
lines changed

internal/xds/xdsclient/tests/ads_stream_restart_test.go

Lines changed: 61 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -20,45 +20,23 @@ package xdsclient_test
2020

2121
import (
2222
"context"
23-
"fmt"
2423
"testing"
25-
"time"
2624

2725
"github.com/google/go-cmp/cmp"
28-
"github.com/google/go-cmp/cmp/cmpopts"
2926
"github.com/google/uuid"
27+
"google.golang.org/grpc"
3028
"google.golang.org/grpc/internal/testutils"
3129
"google.golang.org/grpc/internal/testutils/xds/e2e"
3230
"google.golang.org/grpc/internal/xds/bootstrap"
3331
"google.golang.org/grpc/internal/xds/xdsclient"
3432
"google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
35-
"google.golang.org/grpc/internal/xds/xdsclient/xdsresource/version"
33+
"google.golang.org/protobuf/testing/protocmp"
3634

37-
v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
3835
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
3936
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
4037
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
4138
)
4239

43-
// waitForResourceNames waits for the wantNames to be received on namesCh.
44-
// Returns a non-nil error if the context expires before that.
45-
func waitForResourceNames(ctx context.Context, t *testing.T, namesCh chan []string, wantNames []string) error {
46-
t.Helper()
47-
48-
var lastRequestedNames []string
49-
for ; ; <-time.After(defaultTestShortTimeout) {
50-
select {
51-
case <-ctx.Done():
52-
return fmt.Errorf("timeout waiting for resources %v to be requested from the management server. Last requested resources: %v", wantNames, lastRequestedNames)
53-
case gotNames := <-namesCh:
54-
if cmp.Equal(gotNames, wantNames, cmpopts.EquateEmpty(), cmpopts.SortSlices(func(s1, s2 string) bool { return s1 < s2 })) {
55-
return nil
56-
}
57-
lastRequestedNames = gotNames
58-
}
59-
}
60-
}
61-
6240
// Tests that an ADS stream is restarted after a connection failure. Also
6341
// verifies that if there were any watches registered before the connection
6442
// failed, those resources are re-requested after the stream is restarted.
@@ -74,47 +52,26 @@ func (s) TestADS_ResourcesAreRequestedAfterStreamRestart(t *testing.T) {
7452
defer cancel()
7553

7654
// Start an xDS management server that uses a couple of channels to inform
77-
// the test about the specific LDS and CDS resource names being requested.
78-
ldsResourcesCh := make(chan []string, 1)
79-
cdsResourcesCh := make(chan []string, 1)
80-
streamOpened := make(chan struct{}, 1)
81-
streamClosed := make(chan struct{}, 1)
55+
// the test about the request and response messages being exchanged.
56+
streamRequestCh := testutils.NewChannel()
57+
streamResponseCh := testutils.NewChannel()
58+
streamOpened := testutils.NewChannel()
59+
streamClosed := testutils.NewChannel()
8260
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{
8361
Listener: lis,
8462
OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
8563
t.Logf("Received request for resources: %v of type %s", req.GetResourceNames(), req.GetTypeUrl())
86-
87-
// Drain the resource name channels before writing to them to ensure
88-
// that the most recently requested names are made available to the
89-
// test.
90-
switch req.GetTypeUrl() {
91-
case version.V3ClusterURL:
92-
select {
93-
case <-cdsResourcesCh:
94-
default:
95-
}
96-
cdsResourcesCh <- req.GetResourceNames()
97-
case version.V3ListenerURL:
98-
select {
99-
case <-ldsResourcesCh:
100-
default:
101-
}
102-
ldsResourcesCh <- req.GetResourceNames()
103-
}
64+
streamRequestCh.SendContext(ctx, req)
10465
return nil
10566
},
67+
OnStreamResponse: func(_ context.Context, _ int64, _ *v3discoverypb.DiscoveryRequest, resp *v3discoverypb.DiscoveryResponse) {
68+
streamResponseCh.SendContext(ctx, resp)
69+
},
10670
OnStreamClosed: func(int64, *v3corepb.Node) {
107-
select {
108-
case streamClosed <- struct{}{}:
109-
default:
110-
}
111-
71+
streamClosed.SendContext(ctx, nil)
11272
},
11373
OnStreamOpen: func(context.Context, int64, string) error {
114-
select {
115-
case streamOpened <- struct{}{}:
116-
default:
117-
}
74+
streamOpened.SendContext(ctx, nil)
11875
return nil
11976
},
12077
})
@@ -156,93 +113,82 @@ func (s) TestADS_ResourcesAreRequestedAfterStreamRestart(t *testing.T) {
156113

157114
// Verify that an ADS stream is opened and an LDS request with the above
158115
// resource name is sent.
159-
select {
160-
case <-streamOpened:
161-
case <-ctx.Done():
116+
if _, err = streamOpened.Receive(ctx); err != nil {
162117
t.Fatal("Timeout when waiting for ADS stream to open")
163118
}
164-
if err := waitForResourceNames(ctx, t, ldsResourcesCh, []string{listenerName}); err != nil {
165-
t.Fatal(err)
166-
}
167119

168-
// Verify the update received by the watcher.
169-
wantListenerUpdate := listenerUpdateErrTuple{
170-
update: xdsresource.ListenerUpdate{
171-
RouteConfigName: routeConfigName,
172-
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
120+
// Verify that the initial discovery request matches expectation.
121+
r, err := streamRequestCh.Receive(ctx)
122+
if err != nil {
123+
t.Fatal("Timeout when waiting for the initial discovery request")
124+
}
125+
gotReq := r.(*v3discoverypb.DiscoveryRequest)
126+
wantReq := &v3discoverypb.DiscoveryRequest{
127+
VersionInfo: "",
128+
Node: &v3corepb.Node{
129+
Id: nodeID,
130+
UserAgentName: "gRPC Go",
131+
UserAgentVersionType: &v3corepb.Node_UserAgentVersion{UserAgentVersion: grpc.Version},
132+
ClientFeatures: []string{"envoy.lb.does_not_support_overprovisioning", "xds.config.resource-in-sotw"},
173133
},
134+
ResourceNames: []string{listenerName},
135+
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
136+
ResponseNonce: "",
174137
}
175-
if err := verifyListenerUpdate(ctx, lw.updateCh, wantListenerUpdate); err != nil {
176-
t.Fatal(err)
138+
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
139+
t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff)
177140
}
178141

179-
// Create a cluster resource on the management server, in addition to the
180-
// existing listener resource.
181-
const clusterName = "cluster"
182-
resources = e2e.UpdateOptions{
183-
NodeID: nodeID,
184-
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(listenerName, routeConfigName)},
185-
Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, clusterName, e2e.SecurityLevelNone)},
186-
SkipValidation: true,
187-
}
188-
if err := mgmtServer.Update(ctx, resources); err != nil {
189-
t.Fatal(err)
142+
// Capture the version and nonce from the response.
143+
r, err = streamResponseCh.Receive(ctx)
144+
if err != nil {
145+
t.Fatal("Timeout when waiting for a discovery response from the server")
190146
}
147+
gotResp := r.(*v3discoverypb.DiscoveryResponse)
191148

192-
// Register a watch for a cluster resource, and verify that a CDS request
193-
// with the above resource name is sent.
194-
cw := newClusterWatcher()
195-
cdsCancel := xdsresource.WatchCluster(client, clusterName, cw)
196-
if err := waitForResourceNames(ctx, t, cdsResourcesCh, []string{clusterName}); err != nil {
197-
t.Fatal(err)
149+
// Verify that the ACK contains the appropriate version and nonce.
150+
r, err = streamRequestCh.Receive(ctx)
151+
if err != nil {
152+
t.Fatal("Timeout when waiting for ACK")
153+
}
154+
gotReq = r.(*v3discoverypb.DiscoveryRequest)
155+
wantReq.VersionInfo = gotResp.GetVersionInfo()
156+
wantReq.ResponseNonce = gotResp.GetNonce()
157+
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
158+
t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff)
198159
}
199160

200161
// Verify the update received by the watcher.
201-
wantClusterUpdate := clusterUpdateErrTuple{
202-
update: xdsresource.ClusterUpdate{
203-
ClusterName: clusterName,
204-
EDSServiceName: clusterName,
162+
wantListenerUpdate := listenerUpdateErrTuple{
163+
update: xdsresource.ListenerUpdate{
164+
RouteConfigName: routeConfigName,
165+
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
205166
},
206167
}
207-
if err := verifyClusterUpdate(ctx, cw.updateCh, wantClusterUpdate); err != nil {
208-
t.Fatal(err)
209-
}
210-
211-
// Cancel the watch for the above cluster resource, and verify that a CDS
212-
// request with no resource names is sent.
213-
cdsCancel()
214-
if err := waitForResourceNames(ctx, t, cdsResourcesCh, []string{}); err != nil {
168+
if err := verifyListenerUpdate(ctx, lw.updateCh, wantListenerUpdate); err != nil {
215169
t.Fatal(err)
216170
}
217171

218172
// Stop the restartable listener and wait for the stream to close.
219173
lis.Stop()
220-
select {
221-
case <-streamClosed:
222-
case <-ctx.Done():
174+
if _, err = streamClosed.Receive(ctx); err != nil {
223175
t.Fatal("Timeout when waiting for ADS stream to close")
224176
}
225177

226178
// Restart the restartable listener and wait for the stream to open.
227179
lis.Restart()
228-
select {
229-
case <-streamOpened:
230-
case <-ctx.Done():
180+
if _, err = streamOpened.Receive(ctx); err != nil {
231181
t.Fatal("Timeout when waiting for ADS stream to open")
232182
}
233183

234184
// Verify that the listener resource is requested again.
235-
if err := waitForResourceNames(ctx, t, ldsResourcesCh, []string{listenerName}); err != nil {
236-
t.Fatal(err)
185+
r, err = streamRequestCh.Receive(ctx)
186+
if err != nil {
187+
t.Fatal("Timeout when waiting for the initial discovery request")
237188
}
238-
239-
// Wait for a short duration and verify that no CDS request is sent, since
240-
// there are no resources being watched.
241-
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
242-
defer sCancel()
243-
select {
244-
case <-sCtx.Done():
245-
case names := <-cdsResourcesCh:
246-
t.Fatalf("CDS request sent for resource names %v, when expecting no request", names)
189+
gotReq = r.(*v3discoverypb.DiscoveryRequest)
190+
wantReq.ResponseNonce = ""
191+
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
192+
t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff)
247193
}
248194
}

0 commit comments

Comments
 (0)