diff --git a/pkg/backup/backup_job.go b/pkg/backup/backup_job.go index 9ae1a7569425..bc43b8e60b59 100644 --- a/pkg/backup/backup_job.go +++ b/pkg/backup/backup_job.go @@ -226,17 +226,35 @@ func backup( } } - // Create a channel that is large enough that it does not block. - perNodeProgressCh := make(chan map[execinfrapb.ComponentID]float32, numTotalSpans) + // Create a channel with a little buffering, but plan on dropping if blocked. + perNodeProgressCh := make(chan map[execinfrapb.ComponentID]float32, len(backupSpecs)) storePerNodeProgressLoop := func(ctx context.Context) error { + // track the last progress per component, periodically flushing those that + // have changed to info rows. + current, persisted := make(map[execinfrapb.ComponentID]float32), make(map[execinfrapb.ComponentID]float32) + lastSaved := timeutil.Now() + for { select { case prog, ok := <-perNodeProgressCh: if !ok { return nil } - jobsprofiler.StorePerNodeProcessorProgressFraction( - ctx, execCtx.ExecCfg().InternalDB, job.ID(), prog) + for k, v := range prog { + current[k] = v + } + if timeutil.Since(lastSaved) > time.Second*15 { + lastSaved = timeutil.Now() + updates := make(map[execinfrapb.ComponentID]float32) + for k := range current { + if current[k] != persisted[k] { + persisted[k] = current[k] + updates[k] = current[k] + } + } + jobsprofiler.StorePerNodeProcessorProgressFraction( + ctx, execCtx.ExecCfg().InternalDB, job.ID(), updates) + } case <-ctx.Done(): return ctx.Err() } @@ -285,11 +303,8 @@ func backup( perComponentProgress[component] = fraction } select { - // This send to a buffered channel should never block but incase it does - // we will fallthrough to the default case. case perNodeProgressCh <- perComponentProgress: - default: - log.Warningf(ctx, "skipping persisting per component progress as buffered channel was full") + default: // discard the update if the flusher is backed up. } // Check if we should persist a checkpoint backup manifest.