Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 42 additions & 16 deletions internal/jobscheduler/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package jobscheduler
import (
"context"
"runtime"
"strings"
"sync"
"time"

Expand All @@ -13,8 +14,9 @@ import (
)

type Config struct {
Concurrency int `hcl:"concurrency" help:"The maximum number of concurrent jobs to run (0 means number of cores)." default:"4"`
SchedulerDB string `hcl:"scheduler-db" help:"Path to the scheduler state database." default:"${CACHEW_STATE}/scheduler.db"`
Concurrency int `hcl:"concurrency" help:"The maximum number of concurrent jobs to run (0 means number of cores)." default:"4"`
MaxCloneConcurrency int `hcl:"max-clone-concurrency" help:"Maximum number of concurrent clone jobs. Remaining worker slots are reserved for fetch/repack/snapshot jobs. 0 means no limit." default:"0"`
SchedulerDB string `hcl:"scheduler-db" help:"Path to the scheduler state database." default:"${CACHEW_STATE}/scheduler.db"`
}

type queueJob struct {
Expand Down Expand Up @@ -70,12 +72,14 @@ func (p *prefixedScheduler) WithQueuePrefix(prefix string) Scheduler {
}

type RootScheduler struct {
workAvailable chan bool
lock sync.Mutex
queue []queueJob
active map[string]bool
cancel context.CancelFunc
store ScheduleStore
workAvailable chan bool
lock sync.Mutex
queue []queueJob
active map[string]string // queue -> job id
activeClones int
maxCloneConcurrency int
cancel context.CancelFunc
store ScheduleStore
}

var _ Scheduler = &RootScheduler{}
Expand All @@ -102,10 +106,16 @@ func New(ctx context.Context, config Config) (*RootScheduler, error) {
return nil, errors.Wrap(err, "create schedule store")
}
}
maxClones := config.MaxCloneConcurrency
if maxClones == 0 && config.Concurrency > 1 {
// Default: reserve at least half the workers for non-clone jobs.
maxClones = max(1, config.Concurrency/2)
}
q := &RootScheduler{
workAvailable: make(chan bool, 1024),
active: make(map[string]bool),
store: store,
workAvailable: make(chan bool, 1024),
active: make(map[string]string),
maxCloneConcurrency: maxClones,
store: store,
}
ctx, cancel := context.WithCancel(ctx)
q.cancel = cancel
Expand Down Expand Up @@ -207,20 +217,36 @@ func (q *RootScheduler) worker(ctx context.Context, id int) {
func (q *RootScheduler) markQueueInactive(queue string) {
q.lock.Lock()
defer q.lock.Unlock()
if isCloneJob(q.active[queue]) {
q.activeClones--
}
delete(q.active, queue)
}

// isCloneJob returns true for job IDs that represent long-running clone operations
// which should be subject to concurrency limits.
func isCloneJob(id string) bool {
return strings.HasSuffix(id, "clone") || strings.HasSuffix(id, "deferred-mirror-restore")
}

// Take the next job for any queue that is not already running a job.
func (q *RootScheduler) takeNextJob() (queueJob, bool) {
q.lock.Lock()
defer q.lock.Unlock()
for i, job := range q.queue {
if !q.active[job.queue] {
q.queue = append(q.queue[:i], q.queue[i+1:]...)
q.workAvailable <- true
q.active[job.queue] = true
return job, true
if _, active := q.active[job.queue]; active {
continue
}
if q.maxCloneConcurrency > 0 && isCloneJob(job.id) && q.activeClones >= q.maxCloneConcurrency {
continue
}
q.queue = append(q.queue[:i], q.queue[i+1:]...)
q.workAvailable <- true
q.active[job.queue] = job.id
if isCloneJob(job.id) {
q.activeClones++
}
return job, true
}
return queueJob{}, false
}
67 changes: 67 additions & 0 deletions internal/jobscheduler/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,3 +459,70 @@ func FuzzJobScheduler(f *testing.F) {
}
})
}

func TestJobSchedulerCloneConcurrencyLimit(t *testing.T) {
_, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError})
ctx, cancel := context.WithCancel(ctx)
defer cancel()

scheduler := newTestScheduler(ctx, t, jobscheduler.Config{
Concurrency: 8,
MaxCloneConcurrency: 2,
})

var (
cloneRunning atomic.Int32
maxCloneConcurrent atomic.Int32
fetchCompleted atomic.Int32
cloneCompleted atomic.Int32
allClonesBlocking sync.WaitGroup
)

// Submit 4 clone jobs that block until released.
allClonesBlocking.Add(1)
for i := range 4 {
queue := fmt.Sprintf("repo%d", i)
scheduler.Submit(queue, "git-clone", func(_ context.Context) error {
current := cloneRunning.Add(1)
defer cloneRunning.Add(-1)
for {
maxVal := maxCloneConcurrent.Load()
if current <= maxVal {
break
}
if maxCloneConcurrent.CompareAndSwap(maxVal, current) {
break
}
}
allClonesBlocking.Wait()
cloneCompleted.Add(1)
return nil
})
}

// Submit fetch jobs on different queues — these should NOT be blocked by clone limit.
for i := range 4 {
queue := fmt.Sprintf("fetch-repo%d", i)
scheduler.Submit(queue, "git-fetch", func(_ context.Context) error {
fetchCompleted.Add(1)
return nil
})
}

// Fetch jobs should complete even while clone jobs are blocking workers.
eventually(t, 2*time.Second, func() bool {
return fetchCompleted.Load() == 4
}, "fetch jobs should complete without being blocked by clone limit")

// Clone concurrency should be capped at 2.
assert.True(t, maxCloneConcurrent.Load() <= 2,
"max concurrent clones (%d) should not exceed MaxCloneConcurrency (2)",
maxCloneConcurrent.Load())

// Release clone jobs.
allClonesBlocking.Done()

eventually(t, 2*time.Second, func() bool {
return cloneCompleted.Load() == 4
}, "all clone jobs should eventually complete")
}