Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ The codebase uses Hermit to manage toolchains. It is written in Go, and uses Jus
Only add comments for relatively large blocks of code, 20+ lines or more, and ONLY if it is not obvious what the code is
doing. ALWAYS add Go-style documentation comments for public variables/types/functions. If you do add comments, the
comments should explain WHY something is happening, not WHAT is happening.

Functions should return errors, not log them internally. Logging belongs at the call site so callers retain control over
how failures are reported and handled.
30 changes: 30 additions & 0 deletions internal/gitclone/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,36 @@ func WithReadLockReturn[T any](repo *Repository, fn func() (T, error)) (T, error
return fn()
}

// MarkRestored transitions a repository from StateEmpty to StateReady after an
// external restore (e.g. from an S3 snapshot). It applies the same mirror
// configuration that Clone would, so the repo is ready to serve upload-pack.
func (r *Repository) MarkRestored(ctx context.Context) error {
r.mu.Lock()
if r.state != StateEmpty {
r.mu.Unlock()
return nil
}
r.state = StateCloning
r.mu.Unlock()

err := configureMirror(ctx, r.path, r.config.PackThreads)
if err == nil && r.config.Maintenance {
err = registerMaintenance(ctx, r.path)
}

r.mu.Lock()
if err != nil {
r.state = StateEmpty
r.mu.Unlock()
return errors.Wrap(err, "configure mirror after restore")
}

r.state = StateReady
r.lastFetch = time.Now()
r.mu.Unlock()
return nil
}

func (r *Repository) Clone(ctx context.Context) error {
r.mu.Lock()
if r.state != StateEmpty {
Expand Down
56 changes: 52 additions & 4 deletions internal/strategy/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/block/cachew/internal/githubapp"
"github.com/block/cachew/internal/jobscheduler"
"github.com/block/cachew/internal/logging"
"github.com/block/cachew/internal/snapshot"
"github.com/block/cachew/internal/strategy"
)

Expand Down Expand Up @@ -434,26 +435,51 @@ func (s *Strategy) ensureCloneReady(ctx context.Context, repo *gitclone.Reposito

func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) {
logger := logging.FromContext(ctx)
upstream := repo.UpstreamURL()

if err := s.tryRestoreSnapshot(ctx, repo); err != nil {
logger.InfoContext(ctx, "Snapshot restore failed, falling back to clone",
slog.String("upstream", upstream),
slog.String("error", err.Error()))
} else {
s.cleanupSpools(upstream)

logger.InfoContext(ctx, "Snapshot restore completed, scheduling catch-up fetch",
slog.String("upstream", upstream))

s.scheduler.Submit(upstream, "fetch", func(ctx context.Context) error {
s.backgroundFetch(ctx, repo)
return nil
})

if s.config.SnapshotInterval > 0 {
s.scheduleSnapshotJobs(repo)
}
if s.config.RepackInterval > 0 {
s.scheduleRepackJobs(repo)
}
return
}

logger.InfoContext(ctx, "Starting clone",
slog.String("upstream", repo.UpstreamURL()),
slog.String("upstream", upstream),
slog.String("path", repo.Path()))

err := repo.Clone(ctx)

// Clean up spools regardless of clone success or failure, so that subsequent
// requests either serve from the local backend or go directly to upstream.
s.cleanupSpools(repo.UpstreamURL())
s.cleanupSpools(upstream)

if err != nil {
logger.ErrorContext(ctx, "Clone failed",
slog.String("upstream", repo.UpstreamURL()),
slog.String("upstream", upstream),
slog.String("error", err.Error()))
return
}

logger.InfoContext(ctx, "Clone completed",
slog.String("upstream", repo.UpstreamURL()),
slog.String("upstream", upstream),
slog.String("path", repo.Path()))

if s.config.SnapshotInterval > 0 {
Expand All @@ -464,6 +490,28 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) {
}
}

// tryRestoreSnapshot attempts to restore a mirror from an S3 snapshot.
// On failure the repo path is cleaned up so the caller can fall back to clone.
func (s *Strategy) tryRestoreSnapshot(ctx context.Context, repo *gitclone.Repository) error {
cacheKey := snapshotCacheKey(repo.UpstreamURL())

if err := os.MkdirAll(filepath.Dir(repo.Path()), 0o750); err != nil {
return errors.Wrap(err, "create parent directory for restore")
}
Comment thread
worstell marked this conversation as resolved.

if err := snapshot.Restore(ctx, s.cache, cacheKey, repo.Path(), s.config.ZstdThreads); err != nil {
_ = os.RemoveAll(repo.Path())
return errors.Wrap(err, "restore snapshot")
}

if err := repo.MarkRestored(ctx); err != nil {
_ = os.RemoveAll(repo.Path())
return errors.Wrap(err, "mark restored")
}

return nil
}

func (s *Strategy) maybeBackgroundFetch(repo *gitclone.Repository) {
if !repo.NeedsFetch(s.cloneManager.Config().FetchInterval) {
return
Expand Down
21 changes: 16 additions & 5 deletions internal/strategy/git/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/alecthomas/assert/v2"

"github.com/block/cachew/internal/cache"
"github.com/block/cachew/internal/gitclone"
"github.com/block/cachew/internal/githubapp"
"github.com/block/cachew/internal/logging"
Expand Down Expand Up @@ -114,7 +115,9 @@ func TestIntegrationGitCloneViaProxy(t *testing.T) {
FetchInterval: 15,
}, nil)
mux := http.NewServeMux()
strategy, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{})
assert.NoError(t, err)
strategy, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
assert.NoError(t, err)
assert.NotZero(t, strategy)

Expand Down Expand Up @@ -193,7 +196,9 @@ func TestIntegrationGitFetchViaProxy(t *testing.T) {
}, nil)

mux := http.NewServeMux()
_, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{})
assert.NoError(t, err)
_, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
assert.NoError(t, err)

server := testServerWithLogging(ctx, mux)
Expand Down Expand Up @@ -273,7 +278,9 @@ func TestIntegrationPushForwardsToUpstream(t *testing.T) {
MirrorRoot: clonesDir,
FetchInterval: 15,
}, nil)
_, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{})
assert.NoError(t, err)
_, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
assert.NoError(t, err)

server := testServerWithLogging(ctx, mux)
Expand Down Expand Up @@ -367,7 +374,9 @@ func TestIntegrationSpoolReusesDuringClone(t *testing.T) {
MirrorRoot: clonesDir,
FetchInterval: 15,
}, nil)
strategy, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{})
assert.NoError(t, err)
strategy, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
assert.NoError(t, err)

strategy.SetHTTPTransport(&countingTransport{
Expand Down Expand Up @@ -511,7 +520,9 @@ func TestIntegrationNotOurRefFallsBackToUpstream(t *testing.T) {
MirrorRoot: clonesDir,
FetchInterval: 24 * time.Hour, // prevent auto-fetch during the test
}, nil)
strategy, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, gc,
memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{})
assert.NoError(t, err)
strategy, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc,
func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
assert.NoError(t, err)

Expand Down
33 changes: 23 additions & 10 deletions internal/strategy/git/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ func snapshotDirForURL(mirrorRoot, upstreamURL string) (string, error) {
return filepath.Join(mirrorRoot, ".snapshots", repoPath), nil
}

func snapshotCacheKey(upstreamURL string) cache.Key {
return cache.NewKey(upstreamURL + ".snapshot")
}

// remoteURLForSnapshot returns the URL to embed as remote.origin.url in snapshots.
// When a server URL is configured, it returns the cachew URL for the repo so that
// git pull goes through cachew. Otherwise it falls back to the upstream URL.
Expand All @@ -46,7 +50,8 @@ func (s *Strategy) generateAndUploadSnapshot(ctx context.Context, repo *gitclone
logger := logging.FromContext(ctx)
upstream := repo.UpstreamURL()

logger.InfoContext(ctx, "Snapshot generation started", slog.String("upstream", upstream))
logger.InfoContext(ctx, "Snapshot generation started",
slog.String("upstream", upstream))

mu := s.snapshotMutexFor(upstream)
mu.Lock()
Expand All @@ -59,10 +64,10 @@ func (s *Strategy) generateAndUploadSnapshot(ctx context.Context, repo *gitclone
}

// Clean any previous snapshot working directory.
if err := os.RemoveAll(snapshotDir); err != nil {
if err := os.RemoveAll(snapshotDir); err != nil { //nolint:gosec // snapshotDir is derived from controlled mirrorRoot + upstream URL
return errors.Wrap(err, "remove previous snapshot dir")
}
if err := os.MkdirAll(filepath.Dir(snapshotDir), 0o750); err != nil {
if err := os.MkdirAll(filepath.Dir(snapshotDir), 0o750); err != nil { //nolint:gosec // snapshotDir is derived from controlled mirrorRoot + upstream URL
return errors.Wrap(err, "create snapshot parent dir")
}

Expand All @@ -82,26 +87,29 @@ func (s *Strategy) generateAndUploadSnapshot(ctx context.Context, repo *gitclone
}
return nil
}); err != nil {
_ = os.RemoveAll(snapshotDir)
_ = os.RemoveAll(snapshotDir) //nolint:gosec // snapshotDir is derived from controlled mirrorRoot + upstream URL
return errors.WithStack(err)
}

cacheKey := cache.NewKey(upstream + ".snapshot")
cacheKey := snapshotCacheKey(upstream)
ttl := 7 * 24 * time.Hour
excludePatterns := []string{"*.lock"}

err = snapshot.Create(ctx, s.cache, cacheKey, snapshotDir, ttl, excludePatterns, s.config.ZstdThreads)

// Always clean up the snapshot working directory.
if rmErr := os.RemoveAll(snapshotDir); rmErr != nil {
if rmErr := os.RemoveAll(snapshotDir); rmErr != nil { //nolint:gosec // snapshotDir is derived from controlled mirrorRoot + upstream URL
logger.WarnContext(ctx, "Failed to clean up snapshot dir", slog.String("error", rmErr.Error()))
}
if err != nil {
logger.ErrorContext(ctx, "Snapshot generation failed", slog.String("upstream", upstream), slog.String("error", err.Error()))
logger.ErrorContext(ctx, "Snapshot generation failed",
slog.String("upstream", upstream),
slog.String("error", err.Error()))
return errors.Wrap(err, "create snapshot")
}

logger.InfoContext(ctx, "Snapshot generation completed", slog.String("upstream", upstream))
logger.InfoContext(ctx, "Snapshot generation completed",
slog.String("upstream", upstream))
return nil
}

Expand All @@ -122,7 +130,8 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request,

repoPath := ExtractRepoPath(strings.TrimSuffix(pathValue, "/snapshot.tar.zst"))
upstreamURL := "https://" + host + "/" + repoPath
cacheKey := cache.NewKey(upstreamURL + ".snapshot")

cacheKey := snapshotCacheKey(upstreamURL)

reader, headers, err := s.cache.Open(ctx, cacheKey)
if errors.Is(err, os.ErrNotExist) {
Expand All @@ -148,11 +157,15 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request,
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
if s.config.SnapshotInterval > 0 {
s.scheduleSnapshotJobs(repo)
}
reader, headers, err = s.cache.Open(ctx, cacheKey)
}
if err != nil {
if errors.Is(err, os.ErrNotExist) {
logger.DebugContext(ctx, "snapshot not found in cache", slog.String("upstream", upstreamURL))
logger.DebugContext(ctx, "Snapshot not found in cache",
slog.String("upstream", upstreamURL))
http.NotFound(w, r)
return
}
Expand Down