diff --git a/internal/jobscheduler/jobs.go b/internal/jobscheduler/jobs.go index 2838cc0..86945d1 100644 --- a/internal/jobscheduler/jobs.go +++ b/internal/jobscheduler/jobs.go @@ -4,6 +4,7 @@ package jobscheduler import ( "context" "runtime" + "strings" "sync" "time" @@ -13,8 +14,9 @@ import ( ) type Config struct { - Concurrency int `hcl:"concurrency" help:"The maximum number of concurrent jobs to run (0 means number of cores)." default:"4"` - SchedulerDB string `hcl:"scheduler-db" help:"Path to the scheduler state database." default:"${CACHEW_STATE}/scheduler.db"` + Concurrency int `hcl:"concurrency" help:"The maximum number of concurrent jobs to run (0 means number of cores)." default:"4"` + MaxCloneConcurrency int `hcl:"max-clone-concurrency" help:"Maximum number of concurrent clone jobs. Remaining worker slots are reserved for fetch/repack/snapshot jobs. 0 means no limit." default:"0"` + SchedulerDB string `hcl:"scheduler-db" help:"Path to the scheduler state database." default:"${CACHEW_STATE}/scheduler.db"` } type queueJob struct { @@ -70,12 +72,14 @@ func (p *prefixedScheduler) WithQueuePrefix(prefix string) Scheduler { } type RootScheduler struct { - workAvailable chan bool - lock sync.Mutex - queue []queueJob - active map[string]bool - cancel context.CancelFunc - store ScheduleStore + workAvailable chan bool + lock sync.Mutex + queue []queueJob + active map[string]string // queue -> job id + activeClones int + maxCloneConcurrency int + cancel context.CancelFunc + store ScheduleStore } var _ Scheduler = &RootScheduler{} @@ -102,10 +106,16 @@ func New(ctx context.Context, config Config) (*RootScheduler, error) { return nil, errors.Wrap(err, "create schedule store") } } + maxClones := config.MaxCloneConcurrency + if maxClones == 0 && config.Concurrency > 1 { + // Default: reserve at least half the workers for non-clone jobs. + maxClones = max(1, config.Concurrency/2) + } q := &RootScheduler{ - workAvailable: make(chan bool, 1024), - active: make(map[string]bool), - store: store, + workAvailable: make(chan bool, 1024), + active: make(map[string]string), + maxCloneConcurrency: maxClones, + store: store, } ctx, cancel := context.WithCancel(ctx) q.cancel = cancel @@ -207,20 +217,36 @@ func (q *RootScheduler) worker(ctx context.Context, id int) { func (q *RootScheduler) markQueueInactive(queue string) { q.lock.Lock() defer q.lock.Unlock() + if isCloneJob(q.active[queue]) { + q.activeClones-- + } delete(q.active, queue) } +// isCloneJob returns true for job IDs that represent long-running clone operations +// which should be subject to concurrency limits. +func isCloneJob(id string) bool { + return strings.HasSuffix(id, "clone") || strings.HasSuffix(id, "deferred-mirror-restore") +} + // Take the next job for any queue that is not already running a job. func (q *RootScheduler) takeNextJob() (queueJob, bool) { q.lock.Lock() defer q.lock.Unlock() for i, job := range q.queue { - if !q.active[job.queue] { - q.queue = append(q.queue[:i], q.queue[i+1:]...) - q.workAvailable <- true - q.active[job.queue] = true - return job, true + if _, active := q.active[job.queue]; active { + continue + } + if q.maxCloneConcurrency > 0 && isCloneJob(job.id) && q.activeClones >= q.maxCloneConcurrency { + continue + } + q.queue = append(q.queue[:i], q.queue[i+1:]...) + q.workAvailable <- true + q.active[job.queue] = job.id + if isCloneJob(job.id) { + q.activeClones++ } + return job, true } return queueJob{}, false } diff --git a/internal/jobscheduler/jobs_test.go b/internal/jobscheduler/jobs_test.go index 12df2dd..6db5034 100644 --- a/internal/jobscheduler/jobs_test.go +++ b/internal/jobscheduler/jobs_test.go @@ -459,3 +459,70 @@ func FuzzJobScheduler(f *testing.F) { } }) } + +func TestJobSchedulerCloneConcurrencyLimit(t *testing.T) { + _, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError}) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + scheduler := newTestScheduler(ctx, t, jobscheduler.Config{ + Concurrency: 8, + MaxCloneConcurrency: 2, + }) + + var ( + cloneRunning atomic.Int32 + maxCloneConcurrent atomic.Int32 + fetchCompleted atomic.Int32 + cloneCompleted atomic.Int32 + allClonesBlocking sync.WaitGroup + ) + + // Submit 4 clone jobs that block until released. + allClonesBlocking.Add(1) + for i := range 4 { + queue := fmt.Sprintf("repo%d", i) + scheduler.Submit(queue, "git-clone", func(_ context.Context) error { + current := cloneRunning.Add(1) + defer cloneRunning.Add(-1) + for { + maxVal := maxCloneConcurrent.Load() + if current <= maxVal { + break + } + if maxCloneConcurrent.CompareAndSwap(maxVal, current) { + break + } + } + allClonesBlocking.Wait() + cloneCompleted.Add(1) + return nil + }) + } + + // Submit fetch jobs on different queues — these should NOT be blocked by clone limit. + for i := range 4 { + queue := fmt.Sprintf("fetch-repo%d", i) + scheduler.Submit(queue, "git-fetch", func(_ context.Context) error { + fetchCompleted.Add(1) + return nil + }) + } + + // Fetch jobs should complete even while clone jobs are blocking workers. + eventually(t, 2*time.Second, func() bool { + return fetchCompleted.Load() == 4 + }, "fetch jobs should complete without being blocked by clone limit") + + // Clone concurrency should be capped at 2. + assert.True(t, maxCloneConcurrent.Load() <= 2, + "max concurrent clones (%d) should not exceed MaxCloneConcurrency (2)", + maxCloneConcurrent.Load()) + + // Release clone jobs. + allClonesBlocking.Done() + + eventually(t, 2*time.Second, func() bool { + return cloneCompleted.Load() == 4 + }, "all clone jobs should eventually complete") +}