@@ -269,17 +269,35 @@ func backup(
269
269
}
270
270
}
271
271
272
- // Create a channel that is large enough that it does not block .
273
- perNodeProgressCh := make (chan map [execinfrapb.ComponentID ]float32 , numTotalSpans )
272
+ // Create a channel with a little buffering, but plan on dropping if blocked .
273
+ perNodeProgressCh := make (chan map [execinfrapb.ComponentID ]float32 , len ( backupSpecs ) )
274
274
storePerNodeProgressLoop := func (ctx context.Context ) error {
275
+ // track the last progress per component, periodically flushing those that
276
+ // have changed to info rows.
277
+ current , persisted := make (map [execinfrapb.ComponentID ]float32 ), make (map [execinfrapb.ComponentID ]float32 )
278
+ lastSaved := timeutil .Now ()
279
+
275
280
for {
276
281
select {
277
282
case prog , ok := <- perNodeProgressCh :
278
283
if ! ok {
279
284
return nil
280
285
}
281
- jobsprofiler .StorePerNodeProcessorProgressFraction (
282
- ctx , execCtx .ExecCfg ().InternalDB , job .ID (), prog )
286
+ for k , v := range prog {
287
+ current [k ] = v
288
+ }
289
+ if timeutil .Since (lastSaved ) > time .Second * 15 {
290
+ lastSaved = timeutil .Now ()
291
+ updates := make (map [execinfrapb.ComponentID ]float32 )
292
+ for k := range current {
293
+ if current [k ] != persisted [k ] {
294
+ persisted [k ] = current [k ]
295
+ updates [k ] = current [k ]
296
+ }
297
+ }
298
+ jobsprofiler .StorePerNodeProcessorProgressFraction (
299
+ ctx , execCtx .ExecCfg ().InternalDB , job .ID (), updates )
300
+ }
283
301
case <- ctx .Done ():
284
302
return ctx .Err ()
285
303
}
@@ -328,11 +346,8 @@ func backup(
328
346
perComponentProgress [component ] = fraction
329
347
}
330
348
select {
331
- // This send to a buffered channel should never block but incase it does
332
- // we will fallthrough to the default case.
333
349
case perNodeProgressCh <- perComponentProgress :
334
- default :
335
- log .Warningf (ctx , "skipping persisting per component progress as buffered channel was full" )
350
+ default : // discard the update if the flusher is backed up.
336
351
}
337
352
338
353
// Check if we should persist a checkpoint backup manifest.
0 commit comments