From 5236bfd56f2a0ca88609183b42eb332fc9de3ba0 Mon Sep 17 00:00:00 2001 From: Josh Friend Date: Tue, 17 Mar 2026 14:30:31 -0400 Subject: [PATCH] feat: add LFS snapshot support Generate, cache, and serve LFS object snapshots for repos that declare filter=lfs in any .gitattributes file. Snapshots are produced by the periodic job and served cache-first on cold start. The LFS batch API is proxied to GitHub so insteadOf rewrites work transparently. Snapshots now embed the upstream GitHub URL instead of the cachew instance URL, letting clients control routing via insteadOf rules. Requires git-lfs at startup. --- docker/Dockerfile | 2 +- internal/gitclone/command.go | 4 +- internal/gitclone/command_test.go | 6 +- internal/gitclone/manager.go | 7 +- internal/snapshot/snapshot.go | 59 +++++-- internal/strategy/git/git.go | 35 +++- internal/strategy/git/snapshot.go | 227 +++++++++++++++++++------ internal/strategy/git/snapshot_test.go | 9 +- 8 files changed, 261 insertions(+), 88 deletions(-) diff --git a/docker/Dockerfile b/docker/Dockerfile index dbc405b5..7cdc931a 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -6,7 +6,7 @@ ARG TARGETARCH SHELL ["/bin/sh", "-o", "pipefail", "-c"] # Install runtime dependencies for git operations and TLS -RUN apk add --no-cache ca-certificates curl git git-daemon tzdata zstd && \ +RUN apk add --no-cache ca-certificates curl git git-daemon git-lfs tzdata zstd && \ addgroup -g 1000 cachew && \ adduser -D -u 1000 -G cachew cachew diff --git a/internal/gitclone/command.go b/internal/gitclone/command.go index d4728c08..891b2314 100644 --- a/internal/gitclone/command.go +++ b/internal/gitclone/command.go @@ -11,7 +11,9 @@ import ( "github.com/alecthomas/errors" ) -func (r *Repository) gitCommand(ctx context.Context, args ...string) (*exec.Cmd, error) { +// GitCommand returns a git subprocess configured with repository-scoped +// authentication and any per-URL git config overrides disabled. +func (r *Repository) GitCommand(ctx context.Context, args ...string) (*exec.Cmd, error) { repoURL := r.upstreamURL var token string if r.credentialProvider != nil && strings.Contains(repoURL, "github.com") { diff --git a/internal/gitclone/command_test.go b/internal/gitclone/command_test.go index 4f9465a2..6a4f21b6 100644 --- a/internal/gitclone/command_test.go +++ b/internal/gitclone/command_test.go @@ -51,7 +51,7 @@ func TestGitCommand(t *testing.T) { credentialProvider: nil, } - cmd, err := repo.gitCommand(ctx, "version") + cmd, err := repo.GitCommand(ctx, "version") assert.NoError(t, err) assert.NotZero(t, cmd) @@ -70,7 +70,7 @@ func TestGitCommandWithEmptyURL(t *testing.T) { credentialProvider: nil, } - cmd, err := repo.gitCommand(ctx, "version") + cmd, err := repo.GitCommand(ctx, "version") assert.NoError(t, err) assert.NotZero(t, cmd) @@ -124,7 +124,7 @@ func TestGitCommandWithCredentialProvider(t *testing.T) { }, } - cmd, err := repo.gitCommand(ctx, "version") + cmd, err := repo.GitCommand(ctx, "version") assert.NoError(t, err) assert.NotZero(t, cmd) diff --git a/internal/gitclone/manager.go b/internal/gitclone/manager.go index ab44a025..2228eaa2 100644 --- a/internal/gitclone/manager.go +++ b/internal/gitclone/manager.go @@ -504,7 +504,7 @@ func (r *Repository) executeClone(ctx context.Context) error { r.upstreamURL, cloneDest, } - cmd, err := r.gitCommand(cloneCtx, args...) + cmd, err := r.GitCommand(cloneCtx, args...) if err != nil { return errors.Wrap(err, "create git command") } @@ -582,8 +582,7 @@ func (r *Repository) fetchInternal(ctx context.Context, timeout time.Duration, e } args = append(args, "fetch", "--prune", "--prune-tags") - // #nosec G204 - r.path is controlled by us - cmd, err := r.gitCommand(fetchCtx, args...) + cmd, err := r.GitCommand(fetchCtx, args...) if err != nil { return errors.Wrap(err, "create git command") } @@ -681,7 +680,7 @@ func (r *Repository) GetLocalRefs(ctx context.Context) (map[string]string, error func (r *Repository) GetUpstreamRefs(ctx context.Context) (map[string]string, error) { // #nosec G204 - r.upstreamURL is controlled by us - cmd, err := r.gitCommand(ctx, "ls-remote", r.upstreamURL) + cmd, err := r.GitCommand(ctx, "ls-remote", r.upstreamURL) if err != nil { return nil, errors.Wrap(err, "create git command") } diff --git a/internal/snapshot/snapshot.go b/internal/snapshot/snapshot.go index 80523b1a..20e50a19 100644 --- a/internal/snapshot/snapshot.go +++ b/internal/snapshot/snapshot.go @@ -27,20 +27,42 @@ import ( // Any extra headers are merged into the cache metadata alongside the default // Content-Type and Content-Disposition headers. func Create(ctx context.Context, remote cache.Cache, key cache.Key, directory string, ttl time.Duration, excludePatterns []string, threads int, extraHeaders ...http.Header) error { + return CreatePaths(ctx, remote, key, directory, filepath.Base(directory), []string{"."}, ttl, excludePatterns, threads, extraHeaders...) +} + +// CreatePaths archives named paths within baseDir using tar with zstd compression, +// then uploads the resulting archive to the cache. +// +// The archive preserves all file permissions, ownership, and symlinks. +// Each entry in includePaths is archived relative to baseDir and must exist. +// This allows callers to archive either an entire directory with "." or a +// specific subtree such as "lfs" while preserving that relative path prefix. +// Exclude patterns use tar's --exclude syntax. +// threads controls zstd parallelism; 0 uses all available CPU cores. +func CreatePaths(ctx context.Context, remote cache.Cache, key cache.Key, baseDir, archiveName string, includePaths []string, ttl time.Duration, excludePatterns []string, threads int, extraHeaders ...http.Header) error { if threads <= 0 { threads = runtime.NumCPU() } - // Verify directory exists - if info, err := os.Stat(directory); err != nil { - return errors.Wrap(err, "failed to stat directory") + if len(includePaths) == 0 { + return errors.New("includePaths must not be empty") + } + + if info, err := os.Stat(baseDir); err != nil { + return errors.Wrap(err, "failed to stat base directory") } else if !info.IsDir() { - return errors.Errorf("not a directory: %s", directory) + return errors.Errorf("not a directory: %s", baseDir) + } + for _, path := range includePaths { + targetPath := filepath.Join(baseDir, path) + if _, err := os.Stat(targetPath); err != nil { + return errors.Wrapf(err, "failed to stat include path %q", path) + } } headers := make(http.Header) headers.Set("Content-Type", "application/zstd") - headers.Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", filepath.Base(directory)+".tar.zst")) + headers.Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", archiveName+".tar.zst")) for _, eh := range extraHeaders { for k, vals := range eh { for _, v := range vals { @@ -54,12 +76,22 @@ func Create(ctx context.Context, remote cache.Cache, key cache.Key, directory st return errors.Wrap(err, "failed to create object") } - tarArgs := []string{"-cpf", "-", "-C", directory} + tarArgs := []string{"-cpf", "-", "-C", baseDir} for _, pattern := range excludePatterns { tarArgs = append(tarArgs, "--exclude", pattern) } - tarArgs = append(tarArgs, ".") + tarArgs = append(tarArgs, "--") + tarArgs = append(tarArgs, includePaths...) + + if err := runTarZstdPipeline(ctx, tarArgs, threads, wc); err != nil { + return errors.Join(err, wc.Close()) + } + return errors.Wrap(wc.Close(), "failed to close writer") +} +// runTarZstdPipeline runs tar piped through zstd, writing compressed output to w. +// The caller is responsible for closing w after this returns. +func runTarZstdPipeline(ctx context.Context, tarArgs []string, threads int, w io.Writer) error { tarCmd := exec.CommandContext(ctx, "tar", tarArgs...) zstdCmd := exec.CommandContext(ctx, "zstd", "-c", fmt.Sprintf("-T%d", threads)) //nolint:gosec // threads is a validated integer, not user input @@ -68,7 +100,7 @@ func Create(ctx context.Context, remote cache.Cache, key cache.Key, directory st // closing the read end ensures tar receives SIGPIPE instead of blocking. pr, pw, err := os.Pipe() if err != nil { - return errors.Join(errors.Wrap(err, "failed to create pipe"), wc.Close()) + return errors.Wrap(err, "failed to create pipe") } var tarStderr, zstdStderr bytes.Buffer @@ -76,25 +108,24 @@ func Create(ctx context.Context, remote cache.Cache, key cache.Key, directory st tarCmd.Stderr = &tarStderr zstdCmd.Stdin = pr - zstdCmd.Stdout = wc + zstdCmd.Stdout = w zstdCmd.Stderr = &zstdStderr if err := tarCmd.Start(); err != nil { pw.Close() //nolint:errcheck,gosec // best-effort cleanup pr.Close() //nolint:errcheck,gosec // best-effort cleanup - return errors.Join(errors.Wrap(err, "failed to start tar"), wc.Close()) + return errors.Wrap(err, "failed to start tar") } pw.Close() //nolint:errcheck,gosec // parent no longer needs write end; tar holds its own copy if err := zstdCmd.Start(); err != nil { pr.Close() //nolint:errcheck,gosec // let tar receive SIGPIPE so it exits - return errors.Join(errors.Wrap(err, "failed to start zstd"), tarCmd.Wait(), wc.Close()) + return errors.Join(errors.Wrap(err, "failed to start zstd"), tarCmd.Wait()) } pr.Close() //nolint:errcheck,gosec // parent no longer needs read end; if zstd dies, tar gets SIGPIPE tarErr := tarCmd.Wait() zstdErr := zstdCmd.Wait() - closeErr := wc.Close() var errs []error if tarErr != nil { @@ -103,10 +134,6 @@ func Create(ctx context.Context, remote cache.Cache, key cache.Key, directory st if zstdErr != nil { errs = append(errs, errors.Errorf("zstd failed: %w: %s", zstdErr, zstdStderr.String())) } - if closeErr != nil { - errs = append(errs, errors.Wrap(closeErr, "failed to close writer")) - } - return errors.Join(errs...) } diff --git a/internal/strategy/git/git.go b/internal/strategy/git/git.go index 7f9962ca..4862bd59 100644 --- a/internal/strategy/git/git.go +++ b/internal/strategy/git/git.go @@ -38,9 +38,7 @@ type Config struct { SnapshotInterval time.Duration `hcl:"snapshot-interval,optional" help:"How often to generate tar.zstd workstation snapshots. 0 disables snapshots." default:"0"` MirrorSnapshotInterval time.Duration `hcl:"mirror-snapshot-interval,optional" help:"How often to generate mirror snapshots for pod bootstrap. 0 uses snapshot-interval. Defaults to 2h." default:"2h"` RepackInterval time.Duration `hcl:"repack-interval,optional" help:"How often to run full repack. 0 disables." default:"0"` - // ServerURL is embedded as remote.origin.url in snapshots so git pull goes through cachew. - ServerURL string `hcl:"server-url,optional" help:"Base URL of this cachew instance, embedded in snapshot remote URLs." default:"${CACHEW_URL}"` - ZstdThreads int `hcl:"zstd-threads,optional" help:"Threads for zstd compression/decompression (0 = all CPU cores)." default:"0"` + ZstdThreads int `hcl:"zstd-threads,optional" help:"Threads for zstd compression/decompression (0 = all CPU cores)." default:"0"` } type Strategy struct { @@ -83,6 +81,10 @@ func New( logger := logging.FromContext(ctx) + if _, err := exec.LookPath("git-lfs"); err != nil { + return nil, errors.New("git-lfs is required but not found in PATH") + } + // Get GitHub App token manager if configured tokenManager, err := tokenManagerProvider() if err != nil { @@ -122,8 +124,6 @@ func New( tokenManager: tokenManager, metrics: m, } - s.config.ServerURL = strings.TrimRight(config.ServerURL, "/") - if err := s.warmExistingRepos(ctx); err != nil { logger.WarnContext(ctx, "Failed to warm existing repos", "error", err) } @@ -199,6 +199,7 @@ func (s *Strategy) warmExistingRepos(ctx context.Context) error { if s.config.SnapshotInterval > 0 { s.scheduleSnapshotJobs(repo) + s.scheduleLFSSnapshotJobs(repo) } if s.config.RepackInterval > 0 { s.scheduleRepackJobs(repo) @@ -237,6 +238,22 @@ func (s *Strategy) handleRequest(w http.ResponseWriter, r *http.Request) { return } + if strings.HasSuffix(pathValue, "/lfs-snapshot.tar.zst") { + s.metrics.recordRequest(ctx, "lfs-snapshot") + s.handleLFSSnapshotRequest(w, r, host, pathValue) + return + } + + // Proxy LFS Batch API requests directly to GitHub. Cachew doesn't cache + // individual LFS objects, but it needs to handle these requests because + // git's url.*.insteadOf rewrites the LFS endpoint URL to point at cachew. + if strings.Contains(pathValue, "/info/lfs/") { + s.metrics.recordRequest(ctx, "lfs-api") + logger.DebugContext(ctx, "Proxying LFS API request to upstream", "uri", pathValue) + s.forwardToUpstream(w, r, host, pathValue) + return + } + service := r.URL.Query().Get("service") isReceivePack := service == "git-receive-pack" || strings.HasSuffix(pathValue, "/git-receive-pack") @@ -248,6 +265,12 @@ func (s *Strategy) handleRequest(w http.ResponseWriter, r *http.Request) { } s.metrics.recordRequest(ctx, "upload-pack") + s.handleGitRequest(w, r, host, pathValue) +} + +func (s *Strategy) handleGitRequest(w http.ResponseWriter, r *http.Request, host, pathValue string) { + ctx := r.Context() + logger := logging.FromContext(ctx) repoPath := ExtractRepoPath(pathValue) upstreamURL := "https://" + host + "/" + repoPath @@ -515,6 +538,7 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) er if s.config.SnapshotInterval > 0 { s.scheduleSnapshotJobs(repo) + s.scheduleLFSSnapshotJobs(repo) } if s.config.RepackInterval > 0 { s.scheduleRepackJobs(repo) @@ -545,6 +569,7 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) er if s.config.SnapshotInterval > 0 { s.scheduleSnapshotJobs(repo) + s.scheduleLFSSnapshotJobs(repo) } if s.config.RepackInterval > 0 { s.scheduleRepackJobs(repo) diff --git a/internal/strategy/git/snapshot.go b/internal/strategy/git/snapshot.go index e50bf531..30d5df86 100644 --- a/internal/strategy/git/snapshot.go +++ b/internal/strategy/git/snapshot.go @@ -41,18 +41,8 @@ func bundleCacheKey(upstreamURL, baseCommit string) cache.Key { return cache.NewKey(upstreamURL + ".bundle." + baseCommit) } -// remoteURLForSnapshot returns the URL to embed as remote.origin.url in snapshots. -// When a server URL is configured, it returns the cachew URL for the repo so that -// git pull goes through cachew. Otherwise it falls back to the upstream URL. -func (s *Strategy) remoteURLForSnapshot(upstream string) string { - if s.config.ServerURL == "" { - return upstream - } - repoPath, err := gitclone.RepoPathFromURL(upstream) - if err != nil { - return upstream - } - return s.config.ServerURL + "/git/" + repoPath +func lfsSnapshotCacheKey(upstreamURL string) cache.Key { + return cache.NewKey(upstreamURL + ".lfs-snapshot") } // cloneForSnapshot clones the mirror into destDir under repo's read lock, @@ -61,13 +51,16 @@ func (s *Strategy) cloneForSnapshot(ctx context.Context, repo *gitclone.Reposito if err := repo.WithReadLock(func() error { // #nosec G204 - repo.Path() and destDir are controlled by us cmd := exec.CommandContext(ctx, "git", "clone", repo.Path(), destDir) + cmd.Env = append(os.Environ(), "GIT_LFS_SKIP_SMUDGE=1") if output, err := cmd.CombinedOutput(); err != nil { return errors.Wrapf(err, "git clone for snapshot: %s", string(output)) } - // git clone from a local path sets remote.origin.url to that path; restore it. - // #nosec G204 - remoteURL is derived from controlled inputs - cmd = exec.CommandContext(ctx, "git", "-C", destDir, "remote", "set-url", "origin", s.remoteURLForSnapshot(repo.UpstreamURL())) + // git clone from a local path sets remote.origin.url to that path; restore + // it to the upstream URL. Clients use insteadOf to route through cachew, so + // embedding the cachew URL here would couple snapshots to a specific instance. + // #nosec G204 - upstreamURL is derived from controlled inputs + cmd = exec.CommandContext(ctx, "git", "-C", destDir, "remote", "set-url", "origin", repo.UpstreamURL()) if output, err := cmd.CombinedOutput(); err != nil { return errors.Wrapf(err, "fix snapshot remote URL: %s", string(output)) } @@ -78,56 +71,62 @@ func (s *Strategy) cloneForSnapshot(ctx context.Context, repo *gitclone.Reposito return nil } -func (s *Strategy) generateAndUploadSnapshot(ctx context.Context, repo *gitclone.Repository) error { +func (s *Strategy) withSnapshotClone(ctx context.Context, repo *gitclone.Repository, suffix string, fn func(workDir string) error) error { logger := logging.FromContext(ctx) - upstream := repo.UpstreamURL() - start := time.Now() - - logger.InfoContext(ctx, "Snapshot generation started", "upstream", upstream) - - mu := s.snapshotMutexFor(upstream) - mu.Lock() - defer mu.Unlock() - mirrorRoot := s.cloneManager.Config().MirrorRoot - snapshotDir, err := snapshotDirForURL(mirrorRoot, upstream) + workDir, err := snapshotDirForURL(mirrorRoot, repo.UpstreamURL()) if err != nil { return err } + workDir = filepath.Join(workDir, suffix) // Clean any previous snapshot working directory. - if err := os.RemoveAll(snapshotDir); err != nil { //nolint:gosec // snapshotDir is derived from controlled mirrorRoot + upstream URL - return errors.Wrap(err, "remove previous snapshot dir") + if err := os.RemoveAll(workDir); err != nil { + return errors.Wrap(err, "remove previous snapshot work dir") } - if err := os.MkdirAll(filepath.Dir(snapshotDir), 0o750); err != nil { //nolint:gosec // snapshotDir is derived from controlled mirrorRoot + upstream URL - return errors.Wrap(err, "create snapshot parent dir") + if err := os.MkdirAll(filepath.Dir(workDir), 0o750); err != nil { + return errors.Wrap(err, "create snapshot work dir parent") } - if err := s.cloneForSnapshot(ctx, repo, snapshotDir); err != nil { - _ = os.RemoveAll(snapshotDir) //nolint:gosec // snapshotDir is derived from controlled mirrorRoot + upstream URL + if err := s.cloneForSnapshot(ctx, repo, workDir); err != nil { + _ = os.RemoveAll(workDir) return err } - // Capture the snapshot's HEAD so we can later build a delta bundle between - // the cached snapshot and the current mirror state. - headSHA, err := revParse(ctx, snapshotDir, "HEAD") - if err != nil { - _ = os.RemoveAll(snapshotDir) //nolint:gosec - return errors.Wrap(err, "rev-parse HEAD for snapshot") - } - extraHeaders := http.Header{} - extraHeaders.Set("X-Cachew-Snapshot-Commit", headSHA) + // Always clean up the snapshot working directory. + defer func() { + if rmErr := os.RemoveAll(workDir); rmErr != nil { + logger.WarnContext(ctx, "Failed to clean up snapshot work dir", "work_dir", workDir, "error", rmErr) + } + }() - cacheKey := snapshotCacheKey(upstream) + return fn(workDir) +} - err = snapshot.Create(ctx, s.cache, cacheKey, snapshotDir, 0, nil, s.config.ZstdThreads, extraHeaders) +func (s *Strategy) generateAndUploadSnapshot(ctx context.Context, repo *gitclone.Repository) error { + logger := logging.FromContext(ctx) + upstream := repo.UpstreamURL() + start := time.Now() - // Always clean up the snapshot working directory. - if rmErr := os.RemoveAll(snapshotDir); rmErr != nil { //nolint:gosec // snapshotDir is derived from controlled mirrorRoot + upstream URL - logger.WarnContext(ctx, "Failed to clean up snapshot dir", "error", rmErr) - } - if err != nil { - s.metrics.recordOperation(ctx, "snapshot", "error", time.Since(start)) + logger.InfoContext(ctx, "Snapshot generation started", "upstream", upstream) + + mu := s.snapshotMutexFor(upstream) + mu.Lock() + defer mu.Unlock() + + cacheKey := snapshotCacheKey(upstream) + if err := s.withSnapshotClone(ctx, repo, "base", func(workDir string) error { + // Capture the snapshot's HEAD so we can later build a delta bundle between + // the cached snapshot and the current mirror state. + headSHA, err := revParse(ctx, workDir, "HEAD") + if err != nil { + return errors.Wrap(err, "rev-parse HEAD for snapshot") + } + extraHeaders := http.Header{} + extraHeaders.Set("X-Cachew-Snapshot-Commit", headSHA) + + return snapshot.Create(ctx, s.cache, cacheKey, workDir, 0, nil, s.config.ZstdThreads, extraHeaders) + }); err != nil { return errors.Wrap(err, "create snapshot") } @@ -239,10 +238,6 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, logger.WarnContext(ctx, "Failed to stream cached snapshot", "upstream", upstreamURL, "error", err) } _ = reader.Close() - // Schedule a deferred mirror restore so the mirror eventually - // becomes hot and cachew can generate fresh bundle deltas. - // Without this, repos that only ever serve cached snapshots - // would never restore their mirror. s.scheduleDeferredMirrorRestore(ctx, repo, entry) return } @@ -281,6 +276,18 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, } } +func (s *Strategy) streamSnapshotArtifact(_ context.Context, w http.ResponseWriter, reader io.ReadCloser, headers http.Header) error { + for key, values := range headers { + for _, value := range values { + w.Header().Add(key, value) + } + } + if _, err := io.Copy(w, reader); err != nil { + return errors.Wrap(err, "streaming artifact") + } + return nil +} + func (s *Strategy) handleBundleRequest(w http.ResponseWriter, r *http.Request, host, pathValue string) { ctx := r.Context() logger := logging.FromContext(ctx) @@ -676,3 +683,117 @@ func snapshotSpoolDirForURL(mirrorRoot, upstreamURL string) (string, error) { } return filepath.Join(mirrorRoot, ".snapshot-spools", repoPath), nil } + +// generateAndUploadLFSSnapshot fetches only the LFS objects needed to check out +// the repository's default branch (HEAD) and archives them as a separate tar.zst +// served at /git/{repo}/lfs-snapshot.tar.zst. +// +// Only objects referenced by the current HEAD tree are included — historical +// versions of LFS-tracked files are excluded to keep the archive small. +// +// The archive stores paths relative to .git/ (e.g. ./lfs/objects/xx/yy/sha256) so that +// the client can extract it directly into the repo's .git/ directory. +func (s *Strategy) generateAndUploadLFSSnapshot(ctx context.Context, repo *gitclone.Repository) error { + logger := logging.FromContext(ctx) + upstream := repo.UpstreamURL() + + // Check if any .gitattributes file at HEAD declares filter=lfs. This searches + // the root and all nested .gitattributes, avoiding false negatives for repos + // that only configure LFS in subdirectories. + repoPath := repo.Path() + grepCmd := exec.CommandContext(ctx, "git", "-C", repoPath, "grep", "-q", "filter=lfs", "HEAD", "--", "*.gitattributes") //nolint:gosec + if err := grepCmd.Run(); err != nil { + logger.DebugContext(ctx, "No LFS filter in any .gitattributes, skipping LFS snapshot", "upstream", upstream) + return nil + } + + start := time.Now() + logger.InfoContext(ctx, "LFS snapshot generation started", "upstream", upstream) + + mu := s.snapshotMutexFor(upstream) + mu.Lock() + defer mu.Unlock() + + cacheKey := lfsSnapshotCacheKey(upstream) + excludePatterns := []string{"*.lock"} + if err := s.withSnapshotClone(ctx, repo, "lfs", func(workDir string) error { + // Set up LFS in the snapshot clone. cloneForSnapshot already restores + // remote.origin.url to the upstream URL, so LFS will fetch from GitHub. + // #nosec G204 + if output, err := exec.CommandContext(ctx, "git", "-C", workDir, + "lfs", "install", "--local").CombinedOutput(); err != nil { + logger.WarnContext(ctx, "git lfs install --local failed (non-fatal)", "upstream", upstream, "error", err, + "output", string(output)) + } + + // Fetch only the LFS objects referenced by HEAD (the default branch). + fetchCmd, err := repo.GitCommand(ctx, "-C", workDir, "lfs", "fetch", "origin", "HEAD") + if err != nil { + return errors.Wrap(err, "create git lfs fetch command") + } + if output, err := fetchCmd.CombinedOutput(); err != nil { + return errors.Wrapf(err, "git lfs fetch: %s", string(output)) + } + + lfsDir := filepath.Join(workDir, ".git", "lfs") + if _, err := os.Stat(lfsDir); os.IsNotExist(err) { + logger.InfoContext(ctx, "No LFS objects in repository, skipping LFS snapshot", "upstream", upstream) + return nil + } + + gitDir := filepath.Join(workDir, ".git") + return snapshot.CreatePaths(ctx, s.cache, cacheKey, gitDir, "lfs", []string{"lfs"}, 0, excludePatterns, s.config.ZstdThreads) + }); err != nil { + s.metrics.recordOperation(ctx, "lfs-snapshot", "error", time.Since(start)) + return errors.Wrap(err, "create LFS snapshot") + } + + s.metrics.recordOperation(ctx, "lfs-snapshot", "success", time.Since(start)) + logger.InfoContext(ctx, "LFS snapshot generation completed", "upstream", upstream) + return nil +} + +func (s *Strategy) scheduleLFSSnapshotJobs(repo *gitclone.Repository) { + s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "lfs-snapshot-periodic", s.config.SnapshotInterval, func(ctx context.Context) error { + return s.generateAndUploadLFSSnapshot(ctx, repo) + }) +} + +func (s *Strategy) handleLFSSnapshotRequest(w http.ResponseWriter, r *http.Request, host, pathValue string) { + ctx := r.Context() + logger := logging.FromContext(ctx) + + repoPath := ExtractRepoPath(strings.TrimSuffix(pathValue, "/lfs-snapshot.tar.zst")) + upstreamURL := "https://" + host + "/" + repoPath + cacheKey := lfsSnapshotCacheKey(upstreamURL) + + // Try cache first so we can serve even when the mirror isn't ready (cold start). + reader, headers, err := s.cache.Open(ctx, cacheKey) + if err != nil && !errors.Is(err, os.ErrNotExist) { + logger.ErrorContext(ctx, "Failed to open LFS snapshot from cache", "upstream", upstreamURL, "error", err) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + if reader != nil { + defer reader.Close() + logger.DebugContext(ctx, "Serving cached LFS snapshot", "upstream", upstreamURL) + if err := s.streamSnapshotArtifact(ctx, w, reader, headers); err != nil { + logger.ErrorContext(ctx, "Failed to stream LFS snapshot", "upstream", upstreamURL, "error", err) + } + return + } + + // Cache miss — return 404 immediately rather than blocking on mirror + // restore + on-demand generation. Kick off a background mirror warm so + // the periodic LFS snapshot job can fire once the mirror is ready. + logger.InfoContext(ctx, "LFS snapshot cache miss, triggering background warm", "upstream", upstreamURL) + if repo, repoErr := s.cloneManager.GetOrCreate(ctx, upstreamURL); repoErr == nil && repo.State() != gitclone.StateReady { + s.scheduler.Submit(upstreamURL, "lfs-mirror-warm", func(ctx context.Context) error { + if err := s.startClone(ctx, repo); err != nil { + logger.WarnContext(ctx, "Background mirror warm for LFS failed", "upstream", upstreamURL, "error", err) + } + return nil + }) + } + http.Error(w, "LFS snapshot not found", http.StatusNotFound) +} diff --git a/internal/strategy/git/snapshot_test.go b/internal/strategy/git/snapshot_test.go index 98f607ff..7431f3fa 100644 --- a/internal/strategy/git/snapshot_test.go +++ b/internal/strategy/git/snapshot_test.go @@ -230,7 +230,7 @@ func TestSnapshotGenerationViaLocalClone(t *testing.T) { assert.Equal(t, upstreamURL+"\n", string(output)) // Snapshot working directory should have been cleaned up. - snapshotWorkDir := filepath.Join(mirrorRoot, ".snapshots", "github.com", "org", "repo") + snapshotWorkDir := filepath.Join(mirrorRoot, ".snapshots", "github.com", "org", "repo", "base") _, err = os.Stat(snapshotWorkDir) assert.True(t, os.IsNotExist(err)) } @@ -682,7 +682,7 @@ func TestDeferredRestoreOnlyScheduledOnce(t *testing.T) { time.Sleep(time.Second) } -func TestSnapshotRemoteURLUsesServerURL(t *testing.T) { +func TestSnapshotRemoteURLUsesUpstreamURL(t *testing.T) { if _, err := exec.LookPath("git"); err != nil { t.Skip("git not found in PATH") } @@ -691,7 +691,6 @@ func TestSnapshotRemoteURLUsesServerURL(t *testing.T) { tmpDir := t.TempDir() mirrorRoot := filepath.Join(tmpDir, "mirrors") upstreamURL := "https://github.com/org/repo" - serverURL := "http://cachew.example.com" mirrorPath := filepath.Join(mirrorRoot, "github.com", "org", "repo") createTestMirrorRepo(t, mirrorPath) @@ -701,7 +700,7 @@ func TestSnapshotRemoteURLUsesServerURL(t *testing.T) { mux := newTestMux() cm := gitclone.NewManagerProvider(ctx, gitclone.Config{MirrorRoot: mirrorRoot}, nil) - s, err := git.New(ctx, git.Config{ServerURL: serverURL}, newTestScheduler(ctx, t), memCache, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil + s, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil assert.NoError(t, err) manager, err := cm() @@ -720,5 +719,5 @@ func TestSnapshotRemoteURLUsesServerURL(t *testing.T) { cmd := exec.Command("git", "-C", restoreDir, "remote", "get-url", "origin") output, err := cmd.CombinedOutput() assert.NoError(t, err, string(output)) - assert.Equal(t, serverURL+"/git/github.com/org/repo\n", string(output)) + assert.Equal(t, upstreamURL+"\n", string(output)) }