From c5d9b70d7fb6632ab9c9f4729c0b4a0acea887c1 Mon Sep 17 00:00:00 2001 From: Alec Thomas Date: Mon, 23 Feb 2026 10:20:54 +1100 Subject: [PATCH] refactor: construct ScheduleStore internally in Scheduler Move store construction from external injection into the Scheduler constructor. Add SchedulerDB config field with default ${CACHEW_STATE}/scheduler.db. Extract jobKey() helper to unify key format across logging and store operations. Co-Authored-By: Claude Opus 4.6 --- CLAUDE.md | 1 + cmd/cachewd/main.go | 4 +- internal/jobscheduler/jobs.go | 78 +++++++-- internal/jobscheduler/jobs_test.go | 30 ++-- internal/jobscheduler/store.go | 68 ++++++++ internal/jobscheduler/store_test.go | 188 ++++++++++++++++++++++ internal/strategy/git/git_test.go | 14 +- internal/strategy/git/integration_test.go | 9 +- internal/strategy/git/repack_test.go | 5 +- internal/strategy/git/snapshot_test.go | 5 +- 10 files changed, 362 insertions(+), 40 deletions(-) create mode 120000 CLAUDE.md create mode 100644 internal/jobscheduler/store.go create mode 100644 internal/jobscheduler/store_test.go diff --git a/CLAUDE.md b/CLAUDE.md new file mode 120000 index 0000000..47dc3e3 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1 @@ +AGENTS.md \ No newline at end of file diff --git a/cmd/cachewd/main.go b/cmd/cachewd/main.go index 8bab8d6..6f3762b 100644 --- a/cmd/cachewd/main.go +++ b/cmd/cachewd/main.go @@ -69,7 +69,9 @@ func main() { return tokenManagerProvider() }) - scheduler := jobscheduler.New(ctx, globalConfig.SchedulerConfig) + scheduler, err := jobscheduler.New(ctx, globalConfig.SchedulerConfig) + kctx.FatalIfErrorf(err, "failed to create scheduler") + defer scheduler.Close() cr, sr := newRegistries(scheduler, managerProvider, tokenManagerProvider) diff --git a/internal/jobscheduler/jobs.go b/internal/jobscheduler/jobs.go index ae821df..46e10fc 100644 --- a/internal/jobscheduler/jobs.go +++ b/internal/jobscheduler/jobs.go @@ -3,7 +3,6 @@ package jobscheduler import ( "context" - "fmt" "runtime" "sync" "time" @@ -14,7 +13,8 @@ import ( ) type Config struct { - Concurrency int `hcl:"concurrency" help:"The maximum number of concurrent jobs to run (0 means number of cores)." default:"0"` + Concurrency int `hcl:"concurrency" help:"The maximum number of concurrent jobs to run (0 means number of cores)." default:"0"` + SchedulerDB string `hcl:"scheduler-db" help:"Path to the scheduler state database." default:"${CACHEW_STATE}/scheduler.db"` } type queueJob struct { @@ -23,7 +23,9 @@ type queueJob struct { run func(ctx context.Context) error } -func (j *queueJob) String() string { return fmt.Sprintf("job-%s-%s", j.id, j.queue) } +func jobKey(queue, id string) string { return queue + ":" + id } + +func (j *queueJob) String() string { return jobKey(j.queue, j.id) } func (j *queueJob) Run(ctx context.Context) error { return errors.WithStack(j.run(ctx)) } // Scheduler runs background jobs concurrently across multiple serialised queues. @@ -73,25 +75,42 @@ type RootScheduler struct { queue []queueJob active map[string]bool cancel context.CancelFunc + store ScheduleStore } var _ Scheduler = &RootScheduler{} // New creates a new JobScheduler. -func New(ctx context.Context, config Config) Scheduler { +func New(ctx context.Context, config Config) (*RootScheduler, error) { if config.Concurrency == 0 { config.Concurrency = runtime.NumCPU() } + var store ScheduleStore + if config.SchedulerDB != "" { + var err error + store, err = NewScheduleStore(config.SchedulerDB) + if err != nil { + return nil, errors.Wrap(err, "create schedule store") + } + } q := &RootScheduler{ workAvailable: make(chan bool, 1024), active: make(map[string]bool), + store: store, } ctx, cancel := context.WithCancel(ctx) q.cancel = cancel for id := range config.Concurrency { go q.worker(ctx, id) } - return q + return q, nil +} + +func (q *RootScheduler) Close() error { + if q.store != nil { + return errors.WithStack(q.store.Close()) + } + return nil } func (q *RootScheduler) WithQueuePrefix(prefix string) Scheduler { @@ -108,15 +127,46 @@ func (q *RootScheduler) Submit(queue, id string, run func(ctx context.Context) e q.workAvailable <- true } -func (q *RootScheduler) SubmitPeriodicJob(queue, description string, interval time.Duration, run func(ctx context.Context) error) { - q.Submit(queue, description, func(ctx context.Context) error { - err := run(ctx) - go func() { - time.Sleep(interval) - q.SubmitPeriodicJob(queue, description, interval, run) - }() - return errors.WithStack(err) - }) +func (q *RootScheduler) SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error) { + key := jobKey(queue, id) + delay := q.periodicDelay(key, interval) + submit := func() { + q.Submit(queue, id, func(ctx context.Context) error { + err := run(ctx) + if q.store != nil { + if storeErr := q.store.SetLastRun(key, time.Now()); storeErr != nil { + logging.FromContext(ctx).WarnContext(ctx, "Failed to record job last run", "key", key, "error", storeErr) + } + } + go func() { + time.Sleep(interval) + q.SubmitPeriodicJob(queue, id, interval, run) + }() + return errors.WithStack(err) + }) + } + if delay <= 0 { + submit() + return + } + go func() { + time.Sleep(delay) + submit() + }() +} + +func (q *RootScheduler) periodicDelay(key string, interval time.Duration) time.Duration { + if q.store == nil { + return 0 + } + lastRun, ok, err := q.store.GetLastRun(key) + if err != nil || !ok { + return 0 + } + if remaining := time.Until(lastRun.Add(interval)); remaining > 0 { + return remaining + } + return 0 } func (q *RootScheduler) worker(ctx context.Context, id int) { diff --git a/internal/jobscheduler/jobs_test.go b/internal/jobscheduler/jobs_test.go index 771b6d0..12df2dd 100644 --- a/internal/jobscheduler/jobs_test.go +++ b/internal/jobscheduler/jobs_test.go @@ -17,6 +17,14 @@ import ( "github.com/block/cachew/internal/logging" ) +func newTestScheduler(ctx context.Context, t *testing.T, config jobscheduler.Config) jobscheduler.Scheduler { + t.Helper() + s, err := jobscheduler.New(ctx, config) + assert.NoError(t, err) + t.Cleanup(func() { s.Close() }) + return s +} + func eventually(t *testing.T, timeout time.Duration, condition func() bool, msgAndArgs ...any) { t.Helper() deadline := time.Now().Add(timeout) @@ -40,7 +48,7 @@ func TestJobSchedulerBasic(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() - scheduler := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: 2}) + scheduler := newTestScheduler(ctx, t, jobscheduler.Config{Concurrency: 2}) var executed atomic.Bool scheduler.Submit("queue1", "job1", func(_ context.Context) error { @@ -57,7 +65,7 @@ func TestJobSchedulerConcurrency(t *testing.T) { defer cancel() concurrency := 4 - scheduler := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: concurrency}) + scheduler := newTestScheduler(ctx, t, jobscheduler.Config{Concurrency: concurrency}) var ( running atomic.Int32 @@ -103,7 +111,7 @@ func TestJobSchedulerQueueIsolation(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() - scheduler := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: 4}) + scheduler := newTestScheduler(ctx, t, jobscheduler.Config{Concurrency: 4}) var ( queue1Running atomic.Int32 @@ -155,7 +163,7 @@ func TestJobSchedulerJobOrdering(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() - scheduler := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: 4}) + scheduler := newTestScheduler(ctx, t, jobscheduler.Config{Concurrency: 4}) var ( mu sync.Mutex @@ -193,7 +201,7 @@ func TestJobSchedulerErrorHandling(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() - scheduler := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: 2}) + scheduler := newTestScheduler(ctx, t, jobscheduler.Config{Concurrency: 2}) var ( failingJobExecuted atomic.Bool @@ -219,7 +227,7 @@ func TestJobSchedulerContextCancellation(t *testing.T) { _, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError}) ctx, cancel := context.WithCancel(ctx) - scheduler := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: 2}) + scheduler := newTestScheduler(ctx, t, jobscheduler.Config{Concurrency: 2}) var jobStarted atomic.Bool @@ -241,7 +249,7 @@ func TestJobSchedulerPeriodicJob(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() - scheduler := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: 2}) + scheduler := newTestScheduler(ctx, t, jobscheduler.Config{Concurrency: 2}) var executionCount atomic.Int32 @@ -260,7 +268,7 @@ func TestJobSchedulerPeriodicJobWithError(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() - scheduler := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: 2}) + scheduler := newTestScheduler(ctx, t, jobscheduler.Config{Concurrency: 2}) var executionCount atomic.Int32 @@ -279,7 +287,7 @@ func TestJobSchedulerMultipleQueues(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() - scheduler := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: 3}) + scheduler := newTestScheduler(ctx, t, jobscheduler.Config{Concurrency: 3}) queues := []string{"queue1", "queue2", "queue3", "queue4", "queue5"} totalJobs := len(queues) @@ -319,7 +327,7 @@ func TestJobSchedulerHighConcurrency(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() - scheduler := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: 50}) + scheduler := newTestScheduler(ctx, t, jobscheduler.Config{Concurrency: 50}) jobCount := 100 var completed atomic.Int32 @@ -371,7 +379,7 @@ func FuzzJobScheduler(f *testing.F) { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - scheduler := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: int(concurrency)}) + scheduler := newTestScheduler(ctx, t, jobscheduler.Config{Concurrency: int(concurrency)}) var ( completed atomic.Int32 diff --git a/internal/jobscheduler/store.go b/internal/jobscheduler/store.go new file mode 100644 index 0000000..d81dbdb --- /dev/null +++ b/internal/jobscheduler/store.go @@ -0,0 +1,68 @@ +package jobscheduler + +import ( + "time" + + "github.com/alecthomas/errors" + "go.etcd.io/bbolt" +) + +//nolint:gochecknoglobals +var scheduleBucketName = []byte("schedule") + +// ScheduleStore persists the last execution time of periodic jobs. +type ScheduleStore interface { + GetLastRun(key string) (time.Time, bool, error) + SetLastRun(key string, t time.Time) error + Close() error +} + +type boltScheduleStore struct { + db *bbolt.DB +} + +// NewScheduleStore creates a bbolt-backed schedule store at the given database path. +func NewScheduleStore(dbPath string) (ScheduleStore, error) { + db, err := bbolt.Open(dbPath, 0600, &bbolt.Options{ + Timeout: 5 * time.Second, + }) + if err != nil { + return nil, errors.Wrap(err, "open scheduler database") + } + if err := db.Update(func(tx *bbolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(scheduleBucketName) + return errors.WithStack(err) + }); err != nil { + return nil, errors.Join(errors.Wrap(err, "create schedule bucket"), db.Close()) + } + return &boltScheduleStore{db: db}, nil +} + +func (s *boltScheduleStore) GetLastRun(key string) (time.Time, bool, error) { + var t time.Time + var found bool + err := s.db.View(func(tx *bbolt.Tx) error { + bucket := tx.Bucket(scheduleBucketName) + data := bucket.Get([]byte(key)) + if data == nil { + return nil + } + found = true + return errors.WithStack(t.UnmarshalBinary(data)) + }) + return t, found, errors.WithStack(err) +} + +func (s *boltScheduleStore) SetLastRun(key string, t time.Time) error { + data, err := t.MarshalBinary() + if err != nil { + return errors.Wrap(err, "marshal time") + } + return errors.WithStack(s.db.Update(func(tx *bbolt.Tx) error { + return errors.WithStack(tx.Bucket(scheduleBucketName).Put([]byte(key), data)) + })) +} + +func (s *boltScheduleStore) Close() error { + return errors.WithStack(s.db.Close()) +} diff --git a/internal/jobscheduler/store_test.go b/internal/jobscheduler/store_test.go new file mode 100644 index 0000000..13bacd9 --- /dev/null +++ b/internal/jobscheduler/store_test.go @@ -0,0 +1,188 @@ +package jobscheduler_test + +import ( + "context" + "log/slog" + "path/filepath" + "sync/atomic" + "testing" + "time" + + "github.com/alecthomas/assert/v2" + + "github.com/block/cachew/internal/jobscheduler" + "github.com/block/cachew/internal/logging" +) + +func TestScheduleStoreRoundTrip(t *testing.T) { + dbPath := filepath.Join(t.TempDir(), "scheduler.db") + store, err := jobscheduler.NewScheduleStore(dbPath) + assert.NoError(t, err) + defer store.Close() + + _, found, err := store.GetLastRun("key1") + assert.NoError(t, err) + assert.False(t, found) + + now := time.Now().Truncate(time.Second) + assert.NoError(t, store.SetLastRun("key1", now)) + + got, found, err := store.GetLastRun("key1") + assert.NoError(t, err) + assert.True(t, found) + assert.Equal(t, now, got.Truncate(time.Second)) +} + +func TestScheduleStoreMultipleKeys(t *testing.T) { + dbPath := filepath.Join(t.TempDir(), "scheduler.db") + store, err := jobscheduler.NewScheduleStore(dbPath) + assert.NoError(t, err) + defer store.Close() + + t1 := time.Now().Add(-time.Hour) + t2 := time.Now().Add(-2 * time.Hour) + assert.NoError(t, store.SetLastRun("a", t1)) + assert.NoError(t, store.SetLastRun("b", t2)) + + gotA, found, err := store.GetLastRun("a") + assert.NoError(t, err) + assert.True(t, found) + assert.Equal(t, t1.Truncate(time.Nanosecond), gotA.Truncate(time.Nanosecond)) + + gotB, found, err := store.GetLastRun("b") + assert.NoError(t, err) + assert.True(t, found) + assert.Equal(t, t2.Truncate(time.Nanosecond), gotB.Truncate(time.Nanosecond)) +} + +func TestScheduleStorePersistence(t *testing.T) { + dbPath := filepath.Join(t.TempDir(), "scheduler.db") + + store, err := jobscheduler.NewScheduleStore(dbPath) + assert.NoError(t, err) + + now := time.Now() + assert.NoError(t, store.SetLastRun("key1", now)) + assert.NoError(t, store.Close()) + + store2, err := jobscheduler.NewScheduleStore(dbPath) + assert.NoError(t, err) + defer store2.Close() + + got, found, err := store2.GetLastRun("key1") + assert.NoError(t, err) + assert.True(t, found) + assert.Equal(t, now.Truncate(time.Nanosecond), got.Truncate(time.Nanosecond)) +} + +func TestScheduleStoreInvalidPath(t *testing.T) { + _, err := jobscheduler.NewScheduleStore(filepath.Join(t.TempDir(), "nonexistent", "deep", "path", "scheduler.db")) + assert.Error(t, err) +} + +func TestPeriodicJobDelaysWhenRecentlyRun(t *testing.T) { + _, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError}) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + dbPath := filepath.Join(t.TempDir(), "scheduler.db") + + // Seed the store with a recent run time, then close it so the scheduler can open it. + store, err := jobscheduler.NewScheduleStore(dbPath) + assert.NoError(t, err) + assert.NoError(t, store.SetLastRun("queue1:periodic", time.Now())) + assert.NoError(t, store.Close()) + + scheduler, err := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: 2, SchedulerDB: dbPath}) + assert.NoError(t, err) + defer scheduler.Close() + + var executed atomic.Bool + scheduler.SubmitPeriodicJob("queue1", "periodic", 5*time.Second, func(_ context.Context) error { + executed.Store(true) + return nil + }) + + // The job should NOT have run within 200ms because interval is 5s and it "just ran". + time.Sleep(200 * time.Millisecond) + assert.False(t, executed.Load(), "job should be delayed because it ran recently") +} + +func TestPeriodicJobRunsImmediatelyWhenNeverRun(t *testing.T) { + _, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError}) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + dbPath := filepath.Join(t.TempDir(), "scheduler.db") + scheduler, err := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: 2, SchedulerDB: dbPath}) + assert.NoError(t, err) + defer scheduler.Close() + + var executed atomic.Bool + scheduler.SubmitPeriodicJob("queue1", "periodic", 5*time.Second, func(_ context.Context) error { + executed.Store(true) + return nil + }) + + eventually(t, time.Second, executed.Load, "job should run immediately when no prior run recorded") +} + +func TestPeriodicJobRunsImmediatelyWhenIntervalElapsed(t *testing.T) { + _, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError}) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + dbPath := filepath.Join(t.TempDir(), "scheduler.db") + + // Seed the store with a run time long ago, then close it. + store, err := jobscheduler.NewScheduleStore(dbPath) + assert.NoError(t, err) + assert.NoError(t, store.SetLastRun("queue1:periodic", time.Now().Add(-10*time.Second))) + assert.NoError(t, store.Close()) + + scheduler, err := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: 2, SchedulerDB: dbPath}) + assert.NoError(t, err) + defer scheduler.Close() + + var executed atomic.Bool + scheduler.SubmitPeriodicJob("queue1", "periodic", 5*time.Second, func(_ context.Context) error { + executed.Store(true) + return nil + }) + + eventually(t, time.Second, executed.Load, "job should run immediately when interval has elapsed") +} + +func TestPeriodicJobRecordsLastRun(t *testing.T) { + _, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError}) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + dbPath := filepath.Join(t.TempDir(), "scheduler.db") + scheduler, err := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: 2, SchedulerDB: dbPath}) + assert.NoError(t, err) + + var executed atomic.Bool + before := time.Now() + scheduler.SubmitPeriodicJob("queue1", "periodic", 5*time.Second, func(_ context.Context) error { + executed.Store(true) + return nil + }) + + eventually(t, time.Second, executed.Load, "job should execute") + + // Give a moment for the store write to complete. + time.Sleep(50 * time.Millisecond) + + // Close scheduler to release the DB, then open a new store to check. + assert.NoError(t, scheduler.Close()) + + store, err := jobscheduler.NewScheduleStore(dbPath) + assert.NoError(t, err) + defer store.Close() + + lastRun, found, err := store.GetLastRun("queue1:periodic") + assert.NoError(t, err) + assert.True(t, found, "last run should be recorded") + assert.True(t, !lastRun.Before(before), "last run should be at or after test start") +} diff --git a/internal/strategy/git/git_test.go b/internal/strategy/git/git_test.go index 553295d..02f6587 100644 --- a/internal/strategy/git/git_test.go +++ b/internal/strategy/git/git_test.go @@ -33,6 +33,14 @@ func (m *testMux) HandleFunc(pattern string, handler func(http.ResponseWriter, * m.handlers[pattern] = http.HandlerFunc(handler) } +func newTestScheduler(ctx context.Context, t *testing.T) jobscheduler.Scheduler { + t.Helper() + s, err := jobscheduler.New(ctx, jobscheduler.Config{}) + assert.NoError(t, err) + t.Cleanup(func() { s.Close() }) + return s +} + func TestNew(t *testing.T) { _, ctx := logging.Configure(context.Background(), logging.Config{}) tmpDir := t.TempDir() @@ -68,7 +76,7 @@ func TestNew(t *testing.T) { t.Run(tt.name, func(t *testing.T) { mux := newTestMux() cm := gitclone.NewManagerProvider(ctx, tt.config, nil) - s, err := git.New(ctx, git.Config{}, jobscheduler.New(ctx, jobscheduler.Config{}), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil + s, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil if tt.wantError != "" { assert.Error(t, err) assert.Contains(t, err.Error(), tt.wantError) @@ -150,7 +158,7 @@ func TestNewWithExistingCloneOnDisk(t *testing.T) { MirrorRoot: tmpDir, FetchInterval: 15, }, nil) - s, err := git.New(ctx, git.Config{}, jobscheduler.New(ctx, jobscheduler.Config{}), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil + s, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil assert.NoError(t, err) assert.NotZero(t, s) } @@ -174,7 +182,7 @@ func TestIntegrationWithMockUpstream(t *testing.T) { MirrorRoot: tmpDir, FetchInterval: 15, }, nil) - _, err := git.New(ctx, git.Config{}, jobscheduler.New(ctx, jobscheduler.Config{}), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil + _, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil assert.NoError(t, err) // Verify handlers exist diff --git a/internal/strategy/git/integration_test.go b/internal/strategy/git/integration_test.go index b3151b3..02507aa 100644 --- a/internal/strategy/git/integration_test.go +++ b/internal/strategy/git/integration_test.go @@ -21,7 +21,6 @@ import ( "github.com/block/cachew/internal/gitclone" "github.com/block/cachew/internal/githubapp" - "github.com/block/cachew/internal/jobscheduler" "github.com/block/cachew/internal/logging" "github.com/block/cachew/internal/strategy/git" ) @@ -63,7 +62,7 @@ func TestIntegrationGitCloneViaProxy(t *testing.T) { FetchInterval: 15, }, nil) mux := http.NewServeMux() - strategy, err := git.New(ctx, git.Config{}, jobscheduler.New(ctx, jobscheduler.Config{}), nil, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil + strategy, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil assert.NoError(t, err) assert.NotZero(t, strategy) @@ -142,7 +141,7 @@ func TestIntegrationGitFetchViaProxy(t *testing.T) { }, nil) mux := http.NewServeMux() - _, err = git.New(ctx, git.Config{}, jobscheduler.New(ctx, jobscheduler.Config{}), nil, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil + _, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil assert.NoError(t, err) server := testServerWithLogging(ctx, mux) @@ -222,7 +221,7 @@ func TestIntegrationPushForwardsToUpstream(t *testing.T) { MirrorRoot: clonesDir, FetchInterval: 15, }, nil) - _, err = git.New(ctx, git.Config{}, jobscheduler.New(ctx, jobscheduler.Config{}), nil, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil + _, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil assert.NoError(t, err) server := testServerWithLogging(ctx, mux) @@ -316,7 +315,7 @@ func TestIntegrationSpoolReusesDuringClone(t *testing.T) { MirrorRoot: clonesDir, FetchInterval: 15, }, nil) - strategy, err := git.New(ctx, git.Config{}, jobscheduler.New(ctx, jobscheduler.Config{}), nil, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil + strategy, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil assert.NoError(t, err) strategy.SetHTTPTransport(&countingTransport{ diff --git a/internal/strategy/git/repack_test.go b/internal/strategy/git/repack_test.go index 72336c2..f77e1d6 100644 --- a/internal/strategy/git/repack_test.go +++ b/internal/strategy/git/repack_test.go @@ -11,7 +11,6 @@ import ( "github.com/block/cachew/internal/gitclone" "github.com/block/cachew/internal/githubapp" - "github.com/block/cachew/internal/jobscheduler" "github.com/block/cachew/internal/logging" "github.com/block/cachew/internal/strategy/git" ) @@ -42,7 +41,7 @@ func TestRepackInterval(t *testing.T) { }, nil) s, err := git.New(ctx, git.Config{ RepackInterval: tt.repackInterval, - }, jobscheduler.New(ctx, jobscheduler.Config{}), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil + }, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil assert.NoError(t, err) assert.NotZero(t, s) }) @@ -66,7 +65,7 @@ func TestRepackScheduledForExistingRepos(t *testing.T) { }, nil) s, err := git.New(ctx, git.Config{ RepackInterval: 24 * time.Hour, - }, jobscheduler.New(ctx, jobscheduler.Config{}), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil + }, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil assert.NoError(t, err) assert.NotZero(t, s) } diff --git a/internal/strategy/git/snapshot_test.go b/internal/strategy/git/snapshot_test.go index 29e2104..d59566e 100644 --- a/internal/strategy/git/snapshot_test.go +++ b/internal/strategy/git/snapshot_test.go @@ -15,7 +15,6 @@ import ( "github.com/block/cachew/internal/cache" "github.com/block/cachew/internal/gitclone" "github.com/block/cachew/internal/githubapp" - "github.com/block/cachew/internal/jobscheduler" "github.com/block/cachew/internal/logging" "github.com/block/cachew/internal/snapshot" "github.com/block/cachew/internal/strategy/git" @@ -34,7 +33,7 @@ func TestSnapshotHTTPEndpoint(t *testing.T) { }, nil) _, err = git.New(ctx, git.Config{ SnapshotInterval: 24 * time.Hour, - }, jobscheduler.New(ctx, jobscheduler.Config{}), memCache, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil + }, newTestScheduler(ctx, t), memCache, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil assert.NoError(t, err) // Create a fake snapshot in the cache @@ -126,7 +125,7 @@ func TestSnapshotGenerationViaLocalClone(t *testing.T) { mux := newTestMux() cm := gitclone.NewManagerProvider(ctx, gitclone.Config{MirrorRoot: mirrorRoot}, nil) - s, err := git.New(ctx, git.Config{}, jobscheduler.New(ctx, jobscheduler.Config{}), memCache, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil + s, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil assert.NoError(t, err) // GetOrCreate so the strategy knows about the repo.