Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions pkg/backup/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,17 +226,35 @@ func backup(
}
}

// Create a channel that is large enough that it does not block.
perNodeProgressCh := make(chan map[execinfrapb.ComponentID]float32, numTotalSpans)
// Create a channel with a little buffering, but plan on dropping if blocked.
perNodeProgressCh := make(chan map[execinfrapb.ComponentID]float32, len(backupSpecs))
storePerNodeProgressLoop := func(ctx context.Context) error {
// track the last progress per component, periodically flushing those that
// have changed to info rows.
current, persisted := make(map[execinfrapb.ComponentID]float32), make(map[execinfrapb.ComponentID]float32)
lastSaved := timeutil.Now()

for {
select {
case prog, ok := <-perNodeProgressCh:
if !ok {
return nil
}
jobsprofiler.StorePerNodeProcessorProgressFraction(
ctx, execCtx.ExecCfg().InternalDB, job.ID(), prog)
for k, v := range prog {
current[k] = v
}
if timeutil.Since(lastSaved) > time.Second*15 {
lastSaved = timeutil.Now()
updates := make(map[execinfrapb.ComponentID]float32)
for k := range current {
if current[k] != persisted[k] {
persisted[k] = current[k]
updates[k] = current[k]
}
}
jobsprofiler.StorePerNodeProcessorProgressFraction(
ctx, execCtx.ExecCfg().InternalDB, job.ID(), updates)
}
case <-ctx.Done():
return ctx.Err()
}
Expand Down Expand Up @@ -285,11 +303,8 @@ func backup(
perComponentProgress[component] = fraction
}
select {
// This send to a buffered channel should never block but incase it does
// we will fallthrough to the default case.
case perNodeProgressCh <- perComponentProgress:
default:
log.Warningf(ctx, "skipping persisting per component progress as buffered channel was full")
default: // discard the update if the flusher is backed up.
}

// Check if we should persist a checkpoint backup manifest.
Expand Down