Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions internal/gitclone/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,10 @@ func (m *Manager) GetOrCreate(_ context.Context, upstreamURL string) (*Repositor
return repo, nil
}

clonePath := m.clonePathForURL(upstreamURL)
clonePath, err := m.clonePathForURL(upstreamURL)
if err != nil {
return nil, err
}

repo = &Repository{
state: StateEmpty,
Expand Down Expand Up @@ -259,14 +262,22 @@ func (m *Manager) DiscoverExisting(ctx context.Context) ([]*Repository, error) {
return discovered, nil
}

func (m *Manager) clonePathForURL(upstreamURL string) string {
// RepoPathFromURL extracts a normalised "host/path" from an upstream Git URL,
// stripping any ".git" suffix.
func RepoPathFromURL(upstreamURL string) (string, error) {
parsed, err := url.Parse(upstreamURL)
if err != nil {
return filepath.Join(m.config.MirrorRoot, "unknown")
return "", errors.Wrap(err, "parse upstream URL")
}
return filepath.Join(parsed.Host, strings.TrimSuffix(parsed.Path, ".git")), nil
}

repoPath := strings.TrimSuffix(parsed.Path, ".git")
return filepath.Join(m.config.MirrorRoot, parsed.Host, repoPath)
func (m *Manager) clonePathForURL(upstreamURL string) (string, error) {
repoPath, err := RepoPathFromURL(upstreamURL)
if err != nil {
return "", err
}
return filepath.Join(m.config.MirrorRoot, repoPath), nil
}

func (r *Repository) State() State {
Expand Down
6 changes: 3 additions & 3 deletions internal/jobscheduler/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type queueJob struct {
run func(ctx context.Context) error
}

func jobKey(queue, id string) string { return queue + ":" + id }
func jobKey(queue, id string) string { return id + ":" + queue }

func (j *queueJob) String() string { return jobKey(j.queue, j.id) }
func (j *queueJob) Run(ctx context.Context) error { return errors.WithStack(j.run(ctx)) }
Expand Down Expand Up @@ -55,11 +55,11 @@ type prefixedScheduler struct {
}

func (p *prefixedScheduler) Submit(queue, id string, run func(ctx context.Context) error) {
p.scheduler.Submit(p.prefix+queue, id, run)
p.scheduler.Submit(queue, p.prefix+id, run)
}

func (p *prefixedScheduler) SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error) {
p.scheduler.SubmitPeriodicJob(p.prefix+queue, id, interval, run)
p.scheduler.SubmitPeriodicJob(queue, p.prefix+id, interval, run)
}

func (p *prefixedScheduler) WithQueuePrefix(prefix string) Scheduler {
Expand Down
6 changes: 3 additions & 3 deletions internal/jobscheduler/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestPeriodicJobDelaysWhenRecentlyRun(t *testing.T) {
// Seed the store with a recent run time, then close it so the scheduler can open it.
store, err := jobscheduler.NewScheduleStore(dbPath)
assert.NoError(t, err)
assert.NoError(t, store.SetLastRun("queue1:periodic", time.Now()))
assert.NoError(t, store.SetLastRun("periodic:queue1", time.Now()))
assert.NoError(t, store.Close())

scheduler, err := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: 2, SchedulerDB: dbPath})
Expand Down Expand Up @@ -137,7 +137,7 @@ func TestPeriodicJobRunsImmediatelyWhenIntervalElapsed(t *testing.T) {
// Seed the store with a run time long ago, then close it.
store, err := jobscheduler.NewScheduleStore(dbPath)
assert.NoError(t, err)
assert.NoError(t, store.SetLastRun("queue1:periodic", time.Now().Add(-10*time.Second)))
assert.NoError(t, store.SetLastRun("periodic:queue1", time.Now().Add(-10*time.Second)))
assert.NoError(t, store.Close())

scheduler, err := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: 2, SchedulerDB: dbPath})
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestPeriodicJobRecordsLastRun(t *testing.T) {
assert.NoError(t, err)
defer store.Close()

lastRun, found, err := store.GetLastRun("queue1:periodic")
lastRun, found, err := store.GetLastRun("periodic:queue1")
assert.NoError(t, err)
assert.True(t, found, "last run should be recorded")
assert.True(t, !lastRun.Before(before), "last run should be at or after test start")
Expand Down
34 changes: 21 additions & 13 deletions internal/strategy/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"log/slog"
"net/http"
"net/http/httputil"
"net/url"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -241,25 +240,28 @@ func SpoolKeyForRequest(pathValue string, r *http.Request) (string, error) {
return "upload-pack-" + hex.EncodeToString(h[:8]), nil
}

func spoolDirForURL(mirrorRoot, upstreamURL string) string {
parsed, err := url.Parse(upstreamURL)
func spoolDirForURL(mirrorRoot, upstreamURL string) (string, error) {
repoPath, err := gitclone.RepoPathFromURL(upstreamURL)
if err != nil {
return filepath.Join(mirrorRoot, ".spools", "unknown")
return "", errors.Wrap(err, "resolve spool directory")
}
repoPath := strings.TrimSuffix(parsed.Path, ".git")
return filepath.Join(mirrorRoot, ".spools", parsed.Host, repoPath)
return filepath.Join(mirrorRoot, ".spools", repoPath), nil
}

func (s *Strategy) getOrCreateRepoSpools(upstreamURL string) *RepoSpools {
func (s *Strategy) getOrCreateRepoSpools(upstreamURL string) (*RepoSpools, error) {
s.spoolsMu.Lock()
defer s.spoolsMu.Unlock()
rp, exists := s.spools[upstreamURL]
if !exists {
dir := spoolDirForURL(s.cloneManager.Config().MirrorRoot, upstreamURL)
rp = NewRepoSpools(dir)
s.spools[upstreamURL] = rp
if exists {
return rp, nil
}
dir, err := spoolDirForURL(s.cloneManager.Config().MirrorRoot, upstreamURL)
if err != nil {
return nil, err
}
return rp
rp = NewRepoSpools(dir)
s.spools[upstreamURL] = rp
return rp, nil
}

func (s *Strategy) cleanupSpools(upstreamURL string) {
Expand Down Expand Up @@ -294,7 +296,13 @@ func (s *Strategy) serveWithSpool(w http.ResponseWriter, r *http.Request, host,
return
}

rp := s.getOrCreateRepoSpools(upstreamURL)
rp, err := s.getOrCreateRepoSpools(upstreamURL)
if err != nil {
logger.WarnContext(ctx, "Failed to resolve spool directory, forwarding to upstream",
slog.String("error", err.Error()))
s.forwardToUpstream(w, r, host, pathValue)
return
}
spool, isWriter, err := rp.GetOrCreate(key)
if err != nil {
logger.WarnContext(ctx, "Failed to create spool, forwarding to upstream",
Expand Down
18 changes: 9 additions & 9 deletions internal/strategy/git/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ import (
"context"
"log/slog"
"net/http"
"net/url"
"os"
"os/exec"
"path/filepath"
"strings"
"time"

"github.com/alecthomas/errors"
Expand All @@ -19,13 +17,12 @@ import (
"github.com/block/cachew/internal/snapshot"
)

func snapshotDirForURL(mirrorRoot, upstreamURL string) string {
parsed, err := url.Parse(upstreamURL)
func snapshotDirForURL(mirrorRoot, upstreamURL string) (string, error) {
repoPath, err := gitclone.RepoPathFromURL(upstreamURL)
if err != nil {
return filepath.Join(mirrorRoot, ".snapshots", "unknown")
return "", errors.Wrap(err, "resolve snapshot directory")
}
repoPath := strings.TrimSuffix(parsed.Path, ".git")
return filepath.Join(mirrorRoot, ".snapshots", parsed.Host, repoPath)
return filepath.Join(mirrorRoot, ".snapshots", repoPath), nil
}

func (s *Strategy) generateAndUploadSnapshot(ctx context.Context, repo *gitclone.Repository) error {
Expand All @@ -35,7 +32,10 @@ func (s *Strategy) generateAndUploadSnapshot(ctx context.Context, repo *gitclone
logger.InfoContext(ctx, "Snapshot generation started", slog.String("upstream", upstream))

mirrorRoot := s.cloneManager.Config().MirrorRoot
snapshotDir := snapshotDirForURL(mirrorRoot, upstream)
snapshotDir, err := snapshotDirForURL(mirrorRoot, upstream)
if err != nil {
return err
}

// Clean any previous snapshot working directory.
if err := os.RemoveAll(snapshotDir); err != nil {
Expand All @@ -57,7 +57,7 @@ func (s *Strategy) generateAndUploadSnapshot(ctx context.Context, repo *gitclone
ttl := 7 * 24 * time.Hour
excludePatterns := []string{"*.lock"}

err := snapshot.Create(ctx, s.cache, cacheKey, snapshotDir, ttl, excludePatterns)
err = snapshot.Create(ctx, s.cache, cacheKey, snapshotDir, ttl, excludePatterns)

// Always clean up the snapshot working directory.
if rmErr := os.RemoveAll(snapshotDir); rmErr != nil {
Expand Down