Skip to content

Commit 364c27f

Browse files
committed
use a list.List for the queue of requests
1 parent 6ff8f6d commit 364c27f

File tree

3 files changed

+108
-137
lines changed

3 files changed

+108
-137
lines changed

internal/xds/clients/internal/buffer/unbounded.go

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -115,23 +115,3 @@ func (b *Unbounded) Close() {
115115
close(b.c)
116116
}
117117
}
118-
119-
// Reset clears all buffered data in the unbounded buffer. This does not close
120-
// the buffer, and new data may be Put() into it after a call to this method.
121-
//
122-
// It's expected to be used in scenarios where the buffered data is no longer
123-
// relevant, and needs to be cleared.
124-
func (b *Unbounded) Reset() {
125-
b.mu.Lock()
126-
defer b.mu.Unlock()
127-
128-
if b.closing {
129-
return
130-
}
131-
132-
b.backlog = b.backlog[:0]
133-
select {
134-
case <-b.c:
135-
default:
136-
}
137-
}

internal/xds/clients/internal/buffer/unbounded_test.go

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"sort"
2222
"sync"
2323
"testing"
24-
"time"
2524

2625
"github.com/google/go-cmp/cmp"
2726
"google.golang.org/grpc/internal/grpctest"
@@ -147,37 +146,3 @@ func (s) TestClose(t *testing.T) {
147146
}
148147
ub.Close() // ignored
149148
}
150-
151-
// TestReset resets the buffer and makes sure that the buffer can be used after
152-
// the reset.
153-
func (s) TestReset(t *testing.T) {
154-
ub := NewUnbounded()
155-
if err := ub.Put(1); err != nil {
156-
t.Fatalf("Unbounded.Put() = %v; want nil", err)
157-
}
158-
if err := ub.Put(2); err != nil {
159-
t.Fatalf("Unbounded.Put() = %v; want nil", err)
160-
}
161-
if v, ok := <-ub.Get(); !ok {
162-
t.Errorf("Unbounded.Get() = %v, %v, want %v, %v", v, ok, 1, true)
163-
}
164-
ub.Load()
165-
ub.Reset()
166-
167-
// Make sure the buffer is empty after the reset. Wait for a short duration
168-
// to make sure that no value is received on the read channel.
169-
select {
170-
case v, ok := <-ub.Get():
171-
t.Errorf("Unbounded.Get() = %v, %v; want no value", v, ok)
172-
case <-time.After(10 * time.Millisecond):
173-
}
174-
175-
// Make sure the buffer can be used after the reset.
176-
if err := ub.Put(1); err != nil {
177-
t.Fatalf("Unbounded.Put() = %v; want nil", err)
178-
}
179-
if v, ok := <-ub.Get(); !ok {
180-
t.Errorf("Unbounded.Get() = %v, %v, want %v, %v", v, ok, 1, true)
181-
}
182-
ub.Load()
183-
}

internal/xds/clients/xdsclient/ads_stream.go

Lines changed: 108 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package xdsclient
2020

2121
import (
22+
"container/list"
2223
"context"
2324
"fmt"
2425
"sync"
@@ -28,7 +29,6 @@ import (
2829
igrpclog "google.golang.org/grpc/internal/grpclog"
2930
"google.golang.org/grpc/internal/xds/clients"
3031
"google.golang.org/grpc/internal/xds/clients/internal/backoff"
31-
"google.golang.org/grpc/internal/xds/clients/internal/buffer"
3232
"google.golang.org/grpc/internal/xds/clients/internal/pretty"
3333
"google.golang.org/grpc/internal/xds/clients/xdsclient/internal/xdsresource"
3434

@@ -104,7 +104,6 @@ type adsStreamImpl struct {
104104
// The following fields are initialized in the constructor and are not
105105
// written to afterwards, and hence can be accessed without a mutex.
106106
streamCh chan clients.Stream // New ADS streams are pushed here.
107-
requestCh *buffer.Unbounded // Subscriptions and unsubscriptions are pushed here.
108107
runnerDoneCh chan struct{} // Notify completion of runner goroutine.
109108
cancel context.CancelFunc // To cancel the context passed to the runner goroutine.
110109
fc *adsFlowControl // Flow control for ADS stream.
@@ -113,6 +112,10 @@ type adsStreamImpl struct {
113112
mu sync.Mutex
114113
resourceTypeState map[ResourceType]*resourceTypeState // Map of resource types to their state.
115114
firstRequest bool // False after the first request is sent out.
115+
queuedReqs *list.List // Queued requests waiting to be sent.
116+
queuedReqsExist *sync.Cond // Condition variable for waiting on queued requests.
117+
streamDone bool // True if the adsStreamImpl is stopped.
118+
recvDone bool // True if receiving on the ADS stream is done.
116119
}
117120

118121
// adsStreamOpts contains the options for creating a new ADS Stream.
@@ -137,11 +140,12 @@ func newADSStreamImpl(opts adsStreamOpts) *adsStreamImpl {
137140
watchExpiryTimeout: opts.watchExpiryTimeout,
138141

139142
streamCh: make(chan clients.Stream, 1),
140-
requestCh: buffer.NewUnbounded(),
141143
runnerDoneCh: make(chan struct{}),
142144
fc: newADSFlowControl(),
143145
resourceTypeState: make(map[ResourceType]*resourceTypeState),
146+
queuedReqs: list.New(),
144147
}
148+
s.queuedReqsExist = sync.NewCond(&s.mu)
145149

146150
l := grpclog.Component("xds")
147151
s.logger = igrpclog.NewPrefixLogger(l, opts.logPrefix+fmt.Sprintf("[ads-stream %p] ", s))
@@ -156,7 +160,15 @@ func newADSStreamImpl(opts adsStreamOpts) *adsStreamImpl {
156160
func (s *adsStreamImpl) Stop() {
157161
s.cancel()
158162
s.fc.stop()
159-
s.requestCh.Close()
163+
164+
// Unblock the sender goroutine which might be blocked waiting for queued
165+
// requests to be sent out. It is allowed but not required to hold the lock
166+
// when signalling.
167+
s.mu.Lock()
168+
s.streamDone = true
169+
s.queuedReqsExist.Signal()
170+
s.mu.Unlock()
171+
160172
<-s.runnerDoneCh
161173
s.logger.Infof("Shutdown ADS stream")
162174
}
@@ -185,8 +197,13 @@ func (s *adsStreamImpl) subscribe(typ ResourceType, name string) {
185197
// be started when a request for this resource is actually sent out.
186198
state.subscribedResources[name] = &xdsresource.ResourceWatchState{State: xdsresource.ResourceWatchStateStarted}
187199

188-
// Send a request for the resource type with updated subscriptions.
189-
s.requestCh.Put(request{typ: typ, resourceNames: resourceNames(state.subscribedResources)})
200+
// Queue a request for the resource type with updated subscriptions.
201+
resourceNames := resourceNames(state.subscribedResources)
202+
if s.logger.V(2) {
203+
s.logger.Infof("Queueing a request for resources %q of type %q", resourceNames, typ.TypeName)
204+
}
205+
s.queuedReqs.PushBack(request{typ: typ, resourceNames: resourceNames})
206+
s.queuedReqsExist.Signal()
190207
}
191208

192209
// unsubscribe cancels the subscription to the given resource. It is a no-op if
@@ -215,8 +232,13 @@ func (s *adsStreamImpl) unsubscribe(typ ResourceType, name string) {
215232
}
216233
delete(state.subscribedResources, name)
217234

218-
// Send a request for the resource type with updated subscriptions.
219-
s.requestCh.Put(request{typ: typ, resourceNames: resourceNames(state.subscribedResources)})
235+
// Queue a request for the resource type with updated subscriptions.
236+
resourceNames := resourceNames(state.subscribedResources)
237+
if s.logger.V(2) {
238+
s.logger.Infof("Queueing a request for resources %q of type %q", resourceNames, typ.TypeName)
239+
}
240+
s.queuedReqs.PushBack(request{typ: typ, resourceNames: resourceNames})
241+
s.queuedReqsExist.Signal()
220242
}
221243

222244
// runner is a long-running goroutine that handles the lifecycle of the ADS
@@ -227,8 +249,6 @@ func (s *adsStreamImpl) unsubscribe(typ ResourceType, name string) {
227249
func (s *adsStreamImpl) runner(ctx context.Context) {
228250
defer close(s.runnerDoneCh)
229251

230-
go s.send(ctx)
231-
232252
runStreamWithBackoff := func() error {
233253
stream, err := s.transport.NewStream(ctx, "/envoy.service.discovery.v3.AggregatedDiscoveryService/StreamAggregatedResources")
234254
if err != nil {
@@ -242,93 +262,99 @@ func (s *adsStreamImpl) runner(ctx context.Context) {
242262

243263
s.mu.Lock()
244264
s.firstRequest = true
265+
s.recvDone = false
266+
if err := s.sendExistingLocked(stream); err != nil {
267+
s.logger.Warningf("Failed to send existing resources on newly created stream: %v", err)
268+
s.mu.Unlock()
269+
return nil
270+
}
245271
s.mu.Unlock()
246272

247-
// Ensure that the most recently created stream is pushed on the
248-
// channel for the `send` goroutine to consume.
249-
select {
250-
case <-s.streamCh:
251-
default:
252-
}
253-
s.streamCh <- stream
273+
// Spawn the sending goroutine that runs until the context is done, or
274+
// writing to the stream fails. When the latter happens, the next
275+
// iteration of the loop in the runner goroutine will spawn another
276+
// sending goroutine.
277+
sendDoneCh := make(chan struct{})
278+
recvDoneCh := make(chan struct{})
279+
go func() {
280+
defer close(sendDoneCh)
281+
282+
for ctx.Err() == nil {
283+
// Spawn a goroutine to wait for queued requests to be available
284+
// for sending. This is required to exit the sending goroutine
285+
// blocked on the condition variable when the receiving is done.
286+
waitCh := make(chan struct{})
287+
go func() {
288+
s.mu.Lock()
289+
defer s.mu.Unlock()
290+
291+
// Wait on the condition variable only if there are no
292+
// queued requests. A call to `Signal` will be a no-op if
293+
// there are no blocked goroutines at that point in time.
294+
// There could be newly queued requests that come in after
295+
// we release the lock at the end of the loop, that result
296+
// in a call to `Signal` before we get here.
297+
if s.queuedReqs.Len() == 0 && !s.recvDone && !s.streamDone {
298+
s.queuedReqsExist.Wait()
299+
}
300+
close(waitCh)
301+
}()
302+
303+
select {
304+
case <-waitCh:
305+
// Queued requests are now available, continue with sending.
306+
case <-recvDoneCh:
307+
// Receiving is done. Ensure that the goroutine waiting on
308+
// the condition variable exits.
309+
s.mu.Lock()
310+
s.recvDone = true
311+
s.queuedReqsExist.Signal()
312+
s.mu.Unlock()
313+
<-waitCh
314+
return
315+
}
316+
317+
// Iterate and consume the list of queued requests.
318+
s.mu.Lock()
319+
for s.queuedReqs.Len() > 0 {
320+
elem := s.queuedReqs.Front()
321+
req := elem.Value.(request)
322+
state := s.resourceTypeState[req.typ]
323+
if err := s.sendMessageLocked(stream, req.resourceNames, req.typ.TypeURL, state.version, state.nonce, nil); err != nil {
324+
s.logger.Warningf("Failed to send queued request for resources %q of type %q: %v", req.resourceNames, req.typ.TypeName, err)
325+
s.mu.Unlock()
326+
return
327+
}
328+
s.queuedReqs.Remove(elem)
329+
s.startWatchTimersLocked(req.typ, req.resourceNames)
330+
}
331+
s.mu.Unlock()
332+
}
333+
}()
254334

255335
// Backoff state is reset upon successful receipt of at least one
256336
// message from the server.
337+
err = nil
257338
if s.recv(stream) {
258-
return backoff.ErrResetBackoff
339+
err = backoff.ErrResetBackoff
259340
}
260-
return nil
261-
}
262-
backoff.RunF(ctx, runStreamWithBackoff, s.backoff)
263-
}
341+
close(recvDoneCh)
264342

265-
// send is a long running goroutine that handles sending discovery requests for
266-
// two scenarios:
267-
// - a new subscription or unsubscription request is received
268-
// - a new stream is created after the previous one failed
269-
func (s *adsStreamImpl) send(ctx context.Context) {
270-
// Stores the most recent stream instance received on streamCh.
271-
var stream clients.Stream
272-
for {
273-
select {
274-
case <-ctx.Done():
275-
return
276-
case stream = <-s.streamCh:
277-
if err := s.sendExisting(stream); err != nil {
278-
// Send failed, clear the current stream. Attempt to resend will
279-
// only be made after a new stream is created.
280-
stream = nil
281-
continue
282-
}
283-
case r, ok := <-s.requestCh.Get():
284-
if !ok {
285-
return
286-
}
287-
s.requestCh.Load()
288-
289-
req := r.(request)
290-
if err := s.sendNew(stream, req.typ, req.resourceNames); err != nil {
291-
stream = nil
292-
continue
293-
}
294-
}
295-
}
296-
}
297-
298-
// sendNew attempts to send a discovery request based on a new subscription or
299-
// unsubscription. This method also starts the watch expiry timer for resources
300-
// that were sent in the request for the first time, i.e. their watch state is
301-
// `watchStateStarted`.
302-
func (s *adsStreamImpl) sendNew(stream clients.Stream, typ ResourceType, names []string) error {
303-
s.mu.Lock()
304-
defer s.mu.Unlock()
305-
306-
// If there's no stream yet, skip the request. This request will be resent
307-
// when a new stream is created. If no stream is created, the watcher will
308-
// timeout (same as server not sending response back).
309-
if stream == nil {
310-
return nil
311-
}
312-
313-
state := s.resourceTypeState[typ]
314-
if err := s.sendMessageLocked(stream, names, typ.TypeURL, state.version, state.nonce, nil); err != nil {
343+
<-sendDoneCh
315344
return err
316345
}
317-
s.startWatchTimersLocked(typ, names)
318-
return nil
346+
backoff.RunF(ctx, runStreamWithBackoff, s.backoff)
319347
}
320348

321-
// sendExisting sends out discovery requests for existing resources when
322-
// recovering from a broken stream.
349+
// sendExistingLocked sends out discovery requests for existing resources when
350+
// recovering from a broken stream. The stream argument is guaranteed to be
351+
// non-nil.
323352
//
324-
// The stream argument is guaranteed to be non-nil.
325-
func (s *adsStreamImpl) sendExisting(stream clients.Stream) error {
326-
s.mu.Lock()
327-
defer s.mu.Unlock()
328-
353+
// Caller needs to hold c.mu.
354+
func (s *adsStreamImpl) sendExistingLocked(stream clients.Stream) error {
329355
// Clear any queued requests. Previously subscribed to resources will be
330356
// resent below.
331-
s.requestCh.Reset()
357+
s.queuedReqs.Init()
332358

333359
for typ, state := range s.resourceTypeState {
334360
// Reset only the nonces map when the stream restarts.

0 commit comments

Comments
 (0)