diff --git a/internal/gitclone/manager.go b/internal/gitclone/manager.go index ed9633b..49d0ba7 100644 --- a/internal/gitclone/manager.go +++ b/internal/gitclone/manager.go @@ -160,7 +160,10 @@ func (m *Manager) GetOrCreate(_ context.Context, upstreamURL string) (*Repositor return repo, nil } - clonePath := m.clonePathForURL(upstreamURL) + clonePath, err := m.clonePathForURL(upstreamURL) + if err != nil { + return nil, err + } repo = &Repository{ state: StateEmpty, @@ -259,14 +262,22 @@ func (m *Manager) DiscoverExisting(ctx context.Context) ([]*Repository, error) { return discovered, nil } -func (m *Manager) clonePathForURL(upstreamURL string) string { +// RepoPathFromURL extracts a normalised "host/path" from an upstream Git URL, +// stripping any ".git" suffix. +func RepoPathFromURL(upstreamURL string) (string, error) { parsed, err := url.Parse(upstreamURL) if err != nil { - return filepath.Join(m.config.MirrorRoot, "unknown") + return "", errors.Wrap(err, "parse upstream URL") } + return filepath.Join(parsed.Host, strings.TrimSuffix(parsed.Path, ".git")), nil +} - repoPath := strings.TrimSuffix(parsed.Path, ".git") - return filepath.Join(m.config.MirrorRoot, parsed.Host, repoPath) +func (m *Manager) clonePathForURL(upstreamURL string) (string, error) { + repoPath, err := RepoPathFromURL(upstreamURL) + if err != nil { + return "", err + } + return filepath.Join(m.config.MirrorRoot, repoPath), nil } func (r *Repository) State() State { diff --git a/internal/jobscheduler/jobs.go b/internal/jobscheduler/jobs.go index 46e10fc..e85dd9e 100644 --- a/internal/jobscheduler/jobs.go +++ b/internal/jobscheduler/jobs.go @@ -23,7 +23,7 @@ type queueJob struct { run func(ctx context.Context) error } -func jobKey(queue, id string) string { return queue + ":" + id } +func jobKey(queue, id string) string { return id + ":" + queue } func (j *queueJob) String() string { return jobKey(j.queue, j.id) } func (j *queueJob) Run(ctx context.Context) error { return errors.WithStack(j.run(ctx)) } @@ -55,11 +55,11 @@ type prefixedScheduler struct { } func (p *prefixedScheduler) Submit(queue, id string, run func(ctx context.Context) error) { - p.scheduler.Submit(p.prefix+queue, id, run) + p.scheduler.Submit(queue, p.prefix+id, run) } func (p *prefixedScheduler) SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error) { - p.scheduler.SubmitPeriodicJob(p.prefix+queue, id, interval, run) + p.scheduler.SubmitPeriodicJob(queue, p.prefix+id, interval, run) } func (p *prefixedScheduler) WithQueuePrefix(prefix string) Scheduler { diff --git a/internal/jobscheduler/store_test.go b/internal/jobscheduler/store_test.go index 13bacd9..c3ed72b 100644 --- a/internal/jobscheduler/store_test.go +++ b/internal/jobscheduler/store_test.go @@ -90,7 +90,7 @@ func TestPeriodicJobDelaysWhenRecentlyRun(t *testing.T) { // Seed the store with a recent run time, then close it so the scheduler can open it. store, err := jobscheduler.NewScheduleStore(dbPath) assert.NoError(t, err) - assert.NoError(t, store.SetLastRun("queue1:periodic", time.Now())) + assert.NoError(t, store.SetLastRun("periodic:queue1", time.Now())) assert.NoError(t, store.Close()) scheduler, err := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: 2, SchedulerDB: dbPath}) @@ -137,7 +137,7 @@ func TestPeriodicJobRunsImmediatelyWhenIntervalElapsed(t *testing.T) { // Seed the store with a run time long ago, then close it. store, err := jobscheduler.NewScheduleStore(dbPath) assert.NoError(t, err) - assert.NoError(t, store.SetLastRun("queue1:periodic", time.Now().Add(-10*time.Second))) + assert.NoError(t, store.SetLastRun("periodic:queue1", time.Now().Add(-10*time.Second))) assert.NoError(t, store.Close()) scheduler, err := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: 2, SchedulerDB: dbPath}) @@ -181,7 +181,7 @@ func TestPeriodicJobRecordsLastRun(t *testing.T) { assert.NoError(t, err) defer store.Close() - lastRun, found, err := store.GetLastRun("queue1:periodic") + lastRun, found, err := store.GetLastRun("periodic:queue1") assert.NoError(t, err) assert.True(t, found, "last run should be recorded") assert.True(t, !lastRun.Before(before), "last run should be at or after test start") diff --git a/internal/strategy/git/git.go b/internal/strategy/git/git.go index fe1db1a..7bd38f5 100644 --- a/internal/strategy/git/git.go +++ b/internal/strategy/git/git.go @@ -10,7 +10,6 @@ import ( "log/slog" "net/http" "net/http/httputil" - "net/url" "os" "path/filepath" "strings" @@ -241,25 +240,28 @@ func SpoolKeyForRequest(pathValue string, r *http.Request) (string, error) { return "upload-pack-" + hex.EncodeToString(h[:8]), nil } -func spoolDirForURL(mirrorRoot, upstreamURL string) string { - parsed, err := url.Parse(upstreamURL) +func spoolDirForURL(mirrorRoot, upstreamURL string) (string, error) { + repoPath, err := gitclone.RepoPathFromURL(upstreamURL) if err != nil { - return filepath.Join(mirrorRoot, ".spools", "unknown") + return "", errors.Wrap(err, "resolve spool directory") } - repoPath := strings.TrimSuffix(parsed.Path, ".git") - return filepath.Join(mirrorRoot, ".spools", parsed.Host, repoPath) + return filepath.Join(mirrorRoot, ".spools", repoPath), nil } -func (s *Strategy) getOrCreateRepoSpools(upstreamURL string) *RepoSpools { +func (s *Strategy) getOrCreateRepoSpools(upstreamURL string) (*RepoSpools, error) { s.spoolsMu.Lock() defer s.spoolsMu.Unlock() rp, exists := s.spools[upstreamURL] - if !exists { - dir := spoolDirForURL(s.cloneManager.Config().MirrorRoot, upstreamURL) - rp = NewRepoSpools(dir) - s.spools[upstreamURL] = rp + if exists { + return rp, nil + } + dir, err := spoolDirForURL(s.cloneManager.Config().MirrorRoot, upstreamURL) + if err != nil { + return nil, err } - return rp + rp = NewRepoSpools(dir) + s.spools[upstreamURL] = rp + return rp, nil } func (s *Strategy) cleanupSpools(upstreamURL string) { @@ -294,7 +296,13 @@ func (s *Strategy) serveWithSpool(w http.ResponseWriter, r *http.Request, host, return } - rp := s.getOrCreateRepoSpools(upstreamURL) + rp, err := s.getOrCreateRepoSpools(upstreamURL) + if err != nil { + logger.WarnContext(ctx, "Failed to resolve spool directory, forwarding to upstream", + slog.String("error", err.Error())) + s.forwardToUpstream(w, r, host, pathValue) + return + } spool, isWriter, err := rp.GetOrCreate(key) if err != nil { logger.WarnContext(ctx, "Failed to create spool, forwarding to upstream", diff --git a/internal/strategy/git/snapshot.go b/internal/strategy/git/snapshot.go index 9cb6440..a027d2c 100644 --- a/internal/strategy/git/snapshot.go +++ b/internal/strategy/git/snapshot.go @@ -4,11 +4,9 @@ import ( "context" "log/slog" "net/http" - "net/url" "os" "os/exec" "path/filepath" - "strings" "time" "github.com/alecthomas/errors" @@ -19,13 +17,12 @@ import ( "github.com/block/cachew/internal/snapshot" ) -func snapshotDirForURL(mirrorRoot, upstreamURL string) string { - parsed, err := url.Parse(upstreamURL) +func snapshotDirForURL(mirrorRoot, upstreamURL string) (string, error) { + repoPath, err := gitclone.RepoPathFromURL(upstreamURL) if err != nil { - return filepath.Join(mirrorRoot, ".snapshots", "unknown") + return "", errors.Wrap(err, "resolve snapshot directory") } - repoPath := strings.TrimSuffix(parsed.Path, ".git") - return filepath.Join(mirrorRoot, ".snapshots", parsed.Host, repoPath) + return filepath.Join(mirrorRoot, ".snapshots", repoPath), nil } func (s *Strategy) generateAndUploadSnapshot(ctx context.Context, repo *gitclone.Repository) error { @@ -35,7 +32,10 @@ func (s *Strategy) generateAndUploadSnapshot(ctx context.Context, repo *gitclone logger.InfoContext(ctx, "Snapshot generation started", slog.String("upstream", upstream)) mirrorRoot := s.cloneManager.Config().MirrorRoot - snapshotDir := snapshotDirForURL(mirrorRoot, upstream) + snapshotDir, err := snapshotDirForURL(mirrorRoot, upstream) + if err != nil { + return err + } // Clean any previous snapshot working directory. if err := os.RemoveAll(snapshotDir); err != nil { @@ -57,7 +57,7 @@ func (s *Strategy) generateAndUploadSnapshot(ctx context.Context, repo *gitclone ttl := 7 * 24 * time.Hour excludePatterns := []string{"*.lock"} - err := snapshot.Create(ctx, s.cache, cacheKey, snapshotDir, ttl, excludePatterns) + err = snapshot.Create(ctx, s.cache, cacheKey, snapshotDir, ttl, excludePatterns) // Always clean up the snapshot working directory. if rmErr := os.RemoveAll(snapshotDir); rmErr != nil {