diff --git a/AGENTS.md b/AGENTS.md index fdacc57..ef0ca45 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -19,3 +19,6 @@ The codebase uses Hermit to manage toolchains. It is written in Go, and uses Jus Only add comments for relatively large blocks of code, 20+ lines or more, and ONLY if it is not obvious what the code is doing. ALWAYS add Go-style documentation comments for public variables/types/functions. If you do add comments, the comments should explain WHY something is happening, not WHAT is happening. + +Functions should return errors, not log them internally. Logging belongs at the call site so callers retain control over +how failures are reported and handled. diff --git a/internal/gitclone/manager.go b/internal/gitclone/manager.go index e03bd0f..d8cd0ae 100644 --- a/internal/gitclone/manager.go +++ b/internal/gitclone/manager.go @@ -321,6 +321,36 @@ func WithReadLockReturn[T any](repo *Repository, fn func() (T, error)) (T, error return fn() } +// MarkRestored transitions a repository from StateEmpty to StateReady after an +// external restore (e.g. from an S3 snapshot). It applies the same mirror +// configuration that Clone would, so the repo is ready to serve upload-pack. +func (r *Repository) MarkRestored(ctx context.Context) error { + r.mu.Lock() + if r.state != StateEmpty { + r.mu.Unlock() + return nil + } + r.state = StateCloning + r.mu.Unlock() + + err := configureMirror(ctx, r.path, r.config.PackThreads) + if err == nil && r.config.Maintenance { + err = registerMaintenance(ctx, r.path) + } + + r.mu.Lock() + if err != nil { + r.state = StateEmpty + r.mu.Unlock() + return errors.Wrap(err, "configure mirror after restore") + } + + r.state = StateReady + r.lastFetch = time.Now() + r.mu.Unlock() + return nil +} + func (r *Repository) Clone(ctx context.Context) error { r.mu.Lock() if r.state != StateEmpty { diff --git a/internal/strategy/git/git.go b/internal/strategy/git/git.go index 613751a..0a69513 100644 --- a/internal/strategy/git/git.go +++ b/internal/strategy/git/git.go @@ -24,6 +24,7 @@ import ( "github.com/block/cachew/internal/githubapp" "github.com/block/cachew/internal/jobscheduler" "github.com/block/cachew/internal/logging" + "github.com/block/cachew/internal/snapshot" "github.com/block/cachew/internal/strategy" ) @@ -434,26 +435,51 @@ func (s *Strategy) ensureCloneReady(ctx context.Context, repo *gitclone.Reposito func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) { logger := logging.FromContext(ctx) + upstream := repo.UpstreamURL() + + if err := s.tryRestoreSnapshot(ctx, repo); err != nil { + logger.InfoContext(ctx, "Snapshot restore failed, falling back to clone", + slog.String("upstream", upstream), + slog.String("error", err.Error())) + } else { + s.cleanupSpools(upstream) + + logger.InfoContext(ctx, "Snapshot restore completed, scheduling catch-up fetch", + slog.String("upstream", upstream)) + + s.scheduler.Submit(upstream, "fetch", func(ctx context.Context) error { + s.backgroundFetch(ctx, repo) + return nil + }) + + if s.config.SnapshotInterval > 0 { + s.scheduleSnapshotJobs(repo) + } + if s.config.RepackInterval > 0 { + s.scheduleRepackJobs(repo) + } + return + } logger.InfoContext(ctx, "Starting clone", - slog.String("upstream", repo.UpstreamURL()), + slog.String("upstream", upstream), slog.String("path", repo.Path())) err := repo.Clone(ctx) // Clean up spools regardless of clone success or failure, so that subsequent // requests either serve from the local backend or go directly to upstream. - s.cleanupSpools(repo.UpstreamURL()) + s.cleanupSpools(upstream) if err != nil { logger.ErrorContext(ctx, "Clone failed", - slog.String("upstream", repo.UpstreamURL()), + slog.String("upstream", upstream), slog.String("error", err.Error())) return } logger.InfoContext(ctx, "Clone completed", - slog.String("upstream", repo.UpstreamURL()), + slog.String("upstream", upstream), slog.String("path", repo.Path())) if s.config.SnapshotInterval > 0 { @@ -464,6 +490,28 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) { } } +// tryRestoreSnapshot attempts to restore a mirror from an S3 snapshot. +// On failure the repo path is cleaned up so the caller can fall back to clone. +func (s *Strategy) tryRestoreSnapshot(ctx context.Context, repo *gitclone.Repository) error { + cacheKey := snapshotCacheKey(repo.UpstreamURL()) + + if err := os.MkdirAll(filepath.Dir(repo.Path()), 0o750); err != nil { + return errors.Wrap(err, "create parent directory for restore") + } + + if err := snapshot.Restore(ctx, s.cache, cacheKey, repo.Path(), s.config.ZstdThreads); err != nil { + _ = os.RemoveAll(repo.Path()) + return errors.Wrap(err, "restore snapshot") + } + + if err := repo.MarkRestored(ctx); err != nil { + _ = os.RemoveAll(repo.Path()) + return errors.Wrap(err, "mark restored") + } + + return nil +} + func (s *Strategy) maybeBackgroundFetch(repo *gitclone.Repository) { if !repo.NeedsFetch(s.cloneManager.Config().FetchInterval) { return diff --git a/internal/strategy/git/integration_test.go b/internal/strategy/git/integration_test.go index 7633ae8..0a3e172 100644 --- a/internal/strategy/git/integration_test.go +++ b/internal/strategy/git/integration_test.go @@ -19,6 +19,7 @@ import ( "github.com/alecthomas/assert/v2" + "github.com/block/cachew/internal/cache" "github.com/block/cachew/internal/gitclone" "github.com/block/cachew/internal/githubapp" "github.com/block/cachew/internal/logging" @@ -114,7 +115,9 @@ func TestIntegrationGitCloneViaProxy(t *testing.T) { FetchInterval: 15, }, nil) mux := http.NewServeMux() - strategy, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil + memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{}) + assert.NoError(t, err) + strategy, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil assert.NoError(t, err) assert.NotZero(t, strategy) @@ -193,7 +196,9 @@ func TestIntegrationGitFetchViaProxy(t *testing.T) { }, nil) mux := http.NewServeMux() - _, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil + memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{}) + assert.NoError(t, err) + _, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil assert.NoError(t, err) server := testServerWithLogging(ctx, mux) @@ -273,7 +278,9 @@ func TestIntegrationPushForwardsToUpstream(t *testing.T) { MirrorRoot: clonesDir, FetchInterval: 15, }, nil) - _, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil + memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{}) + assert.NoError(t, err) + _, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil assert.NoError(t, err) server := testServerWithLogging(ctx, mux) @@ -367,7 +374,9 @@ func TestIntegrationSpoolReusesDuringClone(t *testing.T) { MirrorRoot: clonesDir, FetchInterval: 15, }, nil) - strategy, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil + memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{}) + assert.NoError(t, err) + strategy, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil assert.NoError(t, err) strategy.SetHTTPTransport(&countingTransport{ @@ -511,7 +520,9 @@ func TestIntegrationNotOurRefFallsBackToUpstream(t *testing.T) { MirrorRoot: clonesDir, FetchInterval: 24 * time.Hour, // prevent auto-fetch during the test }, nil) - strategy, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, gc, + memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{}) + assert.NoError(t, err) + strategy, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil assert.NoError(t, err) diff --git a/internal/strategy/git/snapshot.go b/internal/strategy/git/snapshot.go index 5c2e8b5..ab1ac24 100644 --- a/internal/strategy/git/snapshot.go +++ b/internal/strategy/git/snapshot.go @@ -28,6 +28,10 @@ func snapshotDirForURL(mirrorRoot, upstreamURL string) (string, error) { return filepath.Join(mirrorRoot, ".snapshots", repoPath), nil } +func snapshotCacheKey(upstreamURL string) cache.Key { + return cache.NewKey(upstreamURL + ".snapshot") +} + // remoteURLForSnapshot returns the URL to embed as remote.origin.url in snapshots. // When a server URL is configured, it returns the cachew URL for the repo so that // git pull goes through cachew. Otherwise it falls back to the upstream URL. @@ -46,7 +50,8 @@ func (s *Strategy) generateAndUploadSnapshot(ctx context.Context, repo *gitclone logger := logging.FromContext(ctx) upstream := repo.UpstreamURL() - logger.InfoContext(ctx, "Snapshot generation started", slog.String("upstream", upstream)) + logger.InfoContext(ctx, "Snapshot generation started", + slog.String("upstream", upstream)) mu := s.snapshotMutexFor(upstream) mu.Lock() @@ -59,10 +64,10 @@ func (s *Strategy) generateAndUploadSnapshot(ctx context.Context, repo *gitclone } // Clean any previous snapshot working directory. - if err := os.RemoveAll(snapshotDir); err != nil { + if err := os.RemoveAll(snapshotDir); err != nil { //nolint:gosec // snapshotDir is derived from controlled mirrorRoot + upstream URL return errors.Wrap(err, "remove previous snapshot dir") } - if err := os.MkdirAll(filepath.Dir(snapshotDir), 0o750); err != nil { + if err := os.MkdirAll(filepath.Dir(snapshotDir), 0o750); err != nil { //nolint:gosec // snapshotDir is derived from controlled mirrorRoot + upstream URL return errors.Wrap(err, "create snapshot parent dir") } @@ -82,26 +87,29 @@ func (s *Strategy) generateAndUploadSnapshot(ctx context.Context, repo *gitclone } return nil }); err != nil { - _ = os.RemoveAll(snapshotDir) + _ = os.RemoveAll(snapshotDir) //nolint:gosec // snapshotDir is derived from controlled mirrorRoot + upstream URL return errors.WithStack(err) } - cacheKey := cache.NewKey(upstream + ".snapshot") + cacheKey := snapshotCacheKey(upstream) ttl := 7 * 24 * time.Hour excludePatterns := []string{"*.lock"} err = snapshot.Create(ctx, s.cache, cacheKey, snapshotDir, ttl, excludePatterns, s.config.ZstdThreads) // Always clean up the snapshot working directory. - if rmErr := os.RemoveAll(snapshotDir); rmErr != nil { + if rmErr := os.RemoveAll(snapshotDir); rmErr != nil { //nolint:gosec // snapshotDir is derived from controlled mirrorRoot + upstream URL logger.WarnContext(ctx, "Failed to clean up snapshot dir", slog.String("error", rmErr.Error())) } if err != nil { - logger.ErrorContext(ctx, "Snapshot generation failed", slog.String("upstream", upstream), slog.String("error", err.Error())) + logger.ErrorContext(ctx, "Snapshot generation failed", + slog.String("upstream", upstream), + slog.String("error", err.Error())) return errors.Wrap(err, "create snapshot") } - logger.InfoContext(ctx, "Snapshot generation completed", slog.String("upstream", upstream)) + logger.InfoContext(ctx, "Snapshot generation completed", + slog.String("upstream", upstream)) return nil } @@ -122,7 +130,8 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, repoPath := ExtractRepoPath(strings.TrimSuffix(pathValue, "/snapshot.tar.zst")) upstreamURL := "https://" + host + "/" + repoPath - cacheKey := cache.NewKey(upstreamURL + ".snapshot") + + cacheKey := snapshotCacheKey(upstreamURL) reader, headers, err := s.cache.Open(ctx, cacheKey) if errors.Is(err, os.ErrNotExist) { @@ -148,11 +157,15 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, http.Error(w, "Internal server error", http.StatusInternalServerError) return } + if s.config.SnapshotInterval > 0 { + s.scheduleSnapshotJobs(repo) + } reader, headers, err = s.cache.Open(ctx, cacheKey) } if err != nil { if errors.Is(err, os.ErrNotExist) { - logger.DebugContext(ctx, "snapshot not found in cache", slog.String("upstream", upstreamURL)) + logger.DebugContext(ctx, "Snapshot not found in cache", + slog.String("upstream", upstreamURL)) http.NotFound(w, r) return }