From e7e56c0f25d75fcd5df003ea4ed6375582ea202d Mon Sep 17 00:00:00 2001 From: Elizabeth Worstell Date: Tue, 17 Mar 2026 13:14:19 -0700 Subject: [PATCH 1/3] git: serve fresh snapshots via delta bundles Workstations now receive a snapshot at HEAD instead of a potentially stale cached snapshot. On each snapshot request, cachew: 1. Fetches the mirror synchronously (O(delta), cheap) 2. Serves the cached snapshot tar.zst (bulk of the repo, fast) 3. Appends a git bundle of commits between the snapshot's HEAD and the mirror's current HEAD (O(delta), cheap) The response uses a header-framed format: - Content-Type: application/x-cachew-snapshot - X-Cachew-Snapshot-Size: byte length of the snapshot portion - Body: [snapshot.tar.zst][delta.bundle] The periodic snapshot job still regenerates the full snapshot on interval, bounding the bundle size. This avoids the expensive O(repo-size) tar+zstd regeneration on the hot path while ensuring workstations always start near HEAD. Co-authored-by: Amp Amp-Thread-ID: https://ampcode.com/threads/T-019d02ea-462f-733a-bef4-03ee9e2d23c8 --- internal/cache/s3.go | 3 + internal/snapshot/snapshot.go | 11 +- internal/strategy/git/snapshot.go | 188 +++++++++++++++++++++++------- 3 files changed, 161 insertions(+), 41 deletions(-) diff --git a/internal/cache/s3.go b/internal/cache/s3.go index 9256b9c..f2a029f 100644 --- a/internal/cache/s3.go +++ b/internal/cache/s3.go @@ -11,6 +11,7 @@ import ( "net/http" "os" "runtime" + "strconv" "time" "github.com/alecthomas/errors" @@ -244,6 +245,8 @@ func (s *S3) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, err headers.Set("Last-Modified", objInfo.LastModified.UTC().Format(http.TimeFormat)) } + headers.Set("Content-Length", strconv.FormatInt(objInfo.Size, 10)) + // Reset expiration time to implement LRU (same as disk cache). // Only refresh when remaining TTL is below 50% of max to avoid a // server-side copy on every read. diff --git a/internal/snapshot/snapshot.go b/internal/snapshot/snapshot.go index 8486a3e..d29ecb0 100644 --- a/internal/snapshot/snapshot.go +++ b/internal/snapshot/snapshot.go @@ -24,7 +24,9 @@ import ( // The operation is fully streaming - no temporary files are created. // Exclude patterns use tar's --exclude syntax. // threads controls zstd parallelism; 0 uses all available CPU cores. -func Create(ctx context.Context, remote cache.Cache, key cache.Key, directory string, ttl time.Duration, excludePatterns []string, threads int) error { +// Any extra headers are merged into the cache metadata alongside the default +// Content-Type and Content-Disposition headers. +func Create(ctx context.Context, remote cache.Cache, key cache.Key, directory string, ttl time.Duration, excludePatterns []string, threads int, extraHeaders ...http.Header) error { if threads <= 0 { threads = runtime.NumCPU() } @@ -39,6 +41,13 @@ func Create(ctx context.Context, remote cache.Cache, key cache.Key, directory st headers := make(http.Header) headers.Set("Content-Type", "application/zstd") headers.Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", filepath.Base(directory)+".tar.zst")) + for _, eh := range extraHeaders { + for k, vals := range eh { + for _, v := range vals { + headers.Set(k, v) + } + } + } wc, err := remote.Create(ctx, key, headers, ttl) if err != nil { diff --git a/internal/strategy/git/snapshot.go b/internal/strategy/git/snapshot.go index c6002b0..85ecc8e 100644 --- a/internal/strategy/git/snapshot.go +++ b/internal/strategy/git/snapshot.go @@ -8,6 +8,7 @@ import ( "os" "os/exec" "path/filepath" + "strconv" "strings" "sync" @@ -101,10 +102,19 @@ func (s *Strategy) generateAndUploadSnapshot(ctx context.Context, repo *gitclone return err } + // Capture the snapshot's HEAD so we can later build a delta bundle between + // the cached snapshot and the current mirror state. + headSHA, err := revParse(ctx, snapshotDir, "HEAD") + if err != nil { + _ = os.RemoveAll(snapshotDir) //nolint:gosec + return errors.Wrap(err, "rev-parse HEAD for snapshot") + } + extraHeaders := http.Header{} + extraHeaders.Set("X-Cachew-Snapshot-Commit", headSHA) + cacheKey := snapshotCacheKey(upstream) - excludePatterns := []string{"./.git/*.lock"} - err = snapshot.Create(ctx, s.cache, cacheKey, snapshotDir, 0, excludePatterns, s.config.ZstdThreads) + err = snapshot.Create(ctx, s.cache, cacheKey, snapshotDir, 0, nil, s.config.ZstdThreads, extraHeaders) // Always clean up the snapshot working directory. if rmErr := os.RemoveAll(snapshotDir); rmErr != nil { //nolint:gosec // snapshotDir is derived from controlled mirrorRoot + upstream URL @@ -176,8 +186,7 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, repoPath := ExtractRepoPath(strings.TrimSuffix(pathValue, "/snapshot.tar.zst")) upstreamURL := "https://" + host + "/" + repoPath - // Ensure the local mirror is ready and up to date before considering any - // cached snapshot, so we never serve stale data to workstations. + // Ensure the local mirror is ready before considering any cached snapshot. repo, repoErr := s.cloneManager.GetOrCreate(ctx, upstreamURL) if repoErr != nil { logger.ErrorContext(ctx, "Failed to get or create clone", "upstream", upstreamURL, "error", repoErr) @@ -189,23 +198,7 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, http.Error(w, "Repository unavailable", http.StatusServiceUnavailable) return } - // Fetch in the background to keep the mirror fresh for subsequent - // git-fetch/git-pull operations through cachew, but don't block - // snapshot serving on it. - refsStale, err := s.checkRefsStale(ctx, repo) - if err != nil { - logger.WarnContext(ctx, "Failed to check upstream refs", "upstream", upstreamURL, "error", err) - } - if refsStale { - logger.InfoContext(ctx, "Refs stale for snapshot request, fetching in background", "upstream", upstreamURL) - go func() { - bgCtx := context.WithoutCancel(ctx) - if err := repo.Fetch(bgCtx); err != nil { - logger.WarnContext(bgCtx, "Background fetch for snapshot failed", "upstream", upstreamURL, "error", err) - } - }() - } - + s.maybeBackgroundFetch(repo) cacheKey := snapshotCacheKey(upstreamURL) reader, headers, err := s.cache.Open(ctx, cacheKey) @@ -215,30 +208,147 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, return } - // Always serve a cached snapshot if one exists. The workstation will - // git-fetch through cachew after extracting the snapshot, which picks - // up any commits that arrived since the snapshot was built. Regeneration - // happens in the background via the periodic snapshot job and the - // background upload in writeSnapshotSpool, keeping the cached snapshot - // reasonably fresh without blocking requests. - if reader != nil { - logger.DebugContext(ctx, "Serving cached snapshot", "upstream", upstreamURL) - } - if reader == nil { s.serveSnapshotWithSpool(w, r, repo, upstreamURL) return } defer reader.Close() - for key, values := range headers { - for _, value := range values { - w.Header().Add(key, value) + if err := s.serveSnapshotWithBundle(ctx, w, reader, headers, repo, upstreamURL); err != nil { + logger.ErrorContext(ctx, "Failed to serve snapshot", "upstream", upstreamURL, "error", err) + } +} + +// serveSnapshotWithBundle serves a cached snapshot, optionally appending a +// delta bundle covering commits between the snapshot's HEAD and the mirror's +// current HEAD. When no bundle is needed (common case), the snapshot is +// streamed directly without buffering. +func (s *Strategy) serveSnapshotWithBundle(ctx context.Context, w http.ResponseWriter, reader io.ReadCloser, headers http.Header, repo *gitclone.Repository, upstreamURL string) error { + logger := logging.FromContext(ctx) + + snapshotCommit := headers.Get("X-Cachew-Snapshot-Commit") + mirrorHead := s.getMirrorHead(ctx, repo) + + var bundleData []byte + if snapshotCommit != "" && mirrorHead != "" && snapshotCommit != mirrorHead { + var err error + bundleData, err = s.createBundle(ctx, repo, snapshotCommit) + if err != nil { + logger.WarnContext(ctx, "Failed to create delta bundle, serving snapshot only", + "upstream", upstreamURL, "error", err) + } + } + + if len(bundleData) == 0 { + w.Header().Set("Content-Type", "application/zstd") + _, err := io.Copy(w, reader) + return errors.Wrap(err, "stream snapshot") + } + + // Determine snapshot size so the client can split snapshot from bundle. + // Prefer Content-Length from cache headers (works for S3), fall back to + // Stat/Seek on the reader (works for disk cache). + var snapshotSize int64 + if cl := headers.Get("Content-Length"); cl != "" { + snapshotSize, _ = strconv.ParseInt(cl, 10, 64) //nolint:errcheck // best-effort; zero triggers readerSize fallback + } + if snapshotSize == 0 { + var err error + snapshotSize, err = readerSize(reader) + if err != nil { + logger.WarnContext(ctx, "Cannot determine snapshot size, serving without bundle", + "upstream", upstreamURL, "error", err) + w.Header().Set("Content-Type", "application/zstd") + _, err := io.Copy(w, reader) + return errors.Wrap(err, "stream snapshot") + } + } + + w.Header().Set("Content-Type", "application/x-cachew-snapshot") + w.Header().Set("X-Cachew-Snapshot-Size", strconv.FormatInt(snapshotSize, 10)) + if _, err := io.Copy(w, reader); err != nil { + return errors.Wrap(err, "write snapshot portion") + } + if _, err := w.Write(bundleData); err != nil { //nolint:gosec // bundleData is a git bundle from a trusted mirror + return errors.Wrap(err, "write bundle") + } + return nil +} + +func revParse(ctx context.Context, repoDir, ref string) (string, error) { + cmd := exec.CommandContext(ctx, "git", "-C", repoDir, "rev-parse", ref) // #nosec G204 + output, err := cmd.Output() + if err != nil { + return "", errors.Wrapf(err, "git rev-parse %s", ref) + } + return strings.TrimSpace(string(output)), nil +} + +// readerSize returns the size of the underlying content without consuming it. +// Works for *os.File (disk cache) and io.Seeker implementations. +func readerSize(r io.Reader) (int64, error) { + if f, ok := r.(interface{ Stat() (os.FileInfo, error) }); ok { + info, err := f.Stat() + if err != nil { + return 0, errors.Wrap(err, "stat reader") } + return info.Size(), nil } - if _, err = io.Copy(w, reader); err != nil { - logger.ErrorContext(ctx, "Failed to stream snapshot", "upstream", upstreamURL, "error", err) + if s, ok := r.(io.Seeker); ok { + cur, err := s.Seek(0, io.SeekCurrent) + if err != nil { + return 0, errors.Wrap(err, "seek current") + } + end, err := s.Seek(0, io.SeekEnd) + if err != nil { + return 0, errors.Wrap(err, "seek end") + } + if _, err := s.Seek(cur, io.SeekStart); err != nil { + return 0, errors.Wrap(err, "seek restore") + } + return end - cur, nil } + return 0, errors.New("reader does not support size queries") +} + +func (s *Strategy) getMirrorHead(ctx context.Context, repo *gitclone.Repository) string { + head, _ := gitclone.WithReadLockReturn(repo, func() (string, error) { //nolint:errcheck // best-effort; empty string signals failure to callers + return revParse(ctx, repo.Path(), "HEAD") + }) + return head +} + +func (s *Strategy) createBundle(ctx context.Context, repo *gitclone.Repository, baseCommit string) ([]byte, error) { + return gitclone.WithReadLockReturn(repo, func() ([]byte, error) { //nolint:wrapcheck // error is already wrapped inside the closure + // Resolve HEAD to its concrete branch ref (e.g. refs/heads/main) to + // avoid ambiguity when a branch named "HEAD" exists in the mirror. + headRef := "HEAD" + if out, err := exec.CommandContext(ctx, "git", "-C", repo.Path(), "symbolic-ref", "HEAD").Output(); err == nil { // #nosec G204 + headRef = strings.TrimSpace(string(out)) + } + + tmpFile, err := os.CreateTemp("", "cachew-bundle-*.bundle") + if err != nil { + return nil, errors.Wrap(err, "create bundle temp file") + } + bundlePath := tmpFile.Name() + defer os.Remove(bundlePath) //nolint:errcheck + if err := tmpFile.Close(); err != nil { + return nil, errors.Wrap(err, "close bundle temp file") + } + + cmd := exec.CommandContext(ctx, "git", "-C", repo.Path(), "bundle", "create", //nolint:gosec // baseCommit is a SHA string from rev-parse + bundlePath, headRef, "^"+baseCommit) + if output, err := cmd.CombinedOutput(); err != nil { + return nil, errors.Wrapf(err, "git bundle create: %s", string(output)) + } + + data, err := os.ReadFile(bundlePath) //nolint:gosec // bundlePath is a temp file we created + if err != nil { + return nil, errors.Wrap(err, "read bundle file") + } + return data, nil + }) } // serveSnapshotWithSpool handles snapshot cache misses using the spool pattern. @@ -303,8 +413,7 @@ func (s *Strategy) streamSnapshotDirect(w http.ResponseWriter, r *http.Request, w.Header().Set("Content-Type", "application/zstd") w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", filepath.Base(repoDir)+".tar.zst")) - excludePatterns := []string{"./.git/*.lock"} - if err := snapshot.StreamTo(ctx, w, repoDir, excludePatterns, s.config.ZstdThreads); err != nil { + if err := snapshot.StreamTo(ctx, w, repoDir, nil, s.config.ZstdThreads); err != nil { logger.ErrorContext(ctx, "Failed to stream snapshot to client", "upstream", upstreamURL, "error", err) } } @@ -369,8 +478,7 @@ func (s *Strategy) writeSnapshotSpool(w http.ResponseWriter, r *http.Request, re w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", filepath.Base(repoDir)+".tar.zst")) tw := NewSpoolTeeWriter(w, spool) - excludePatterns := []string{"./.git/*.lock"} - if err := snapshot.StreamTo(ctx, tw, repoDir, excludePatterns, s.config.ZstdThreads); err != nil { + if err := snapshot.StreamTo(ctx, tw, repoDir, nil, s.config.ZstdThreads); err != nil { logger.ErrorContext(ctx, "Failed to stream snapshot to client", "upstream", upstreamURL, "error", err) spool.MarkError(err) } else { From 0a4828772aab814f3ced01ed263d833e9c64cd89 Mon Sep 17 00:00:00 2001 From: Elizabeth Worstell Date: Wed, 18 Mar 2026 17:50:35 -0700 Subject: [PATCH 2/3] refactor(git): serve delta bundles via separate endpoint Instead of concatenating the bundle into the snapshot response body, return an X-Cachew-Bundle-URL header pointing to a new /snapshot.bundle endpoint. This gives cleaner HTTP semantics and lets clients fetch the bundle in parallel with snapshot extraction. Co-authored-by: Amp Amp-Thread-ID: https://ampcode.com/threads/T-019d0383-345f-7659-8538-30bc7c9a7e6d --- internal/strategy/git/git.go | 5 + internal/strategy/git/snapshot.go | 173 ++++++++++++------------------ 2 files changed, 76 insertions(+), 102 deletions(-) diff --git a/internal/strategy/git/git.go b/internal/strategy/git/git.go index c153d67..a457743 100644 --- a/internal/strategy/git/git.go +++ b/internal/strategy/git/git.go @@ -219,6 +219,11 @@ func (s *Strategy) handleRequest(w http.ResponseWriter, r *http.Request) { return } + if strings.HasSuffix(pathValue, "/snapshot.bundle") { + s.handleBundleRequest(w, r, host, pathValue) + return + } + service := r.URL.Query().Get("service") isReceivePack := service == "git-receive-pack" || strings.HasSuffix(pathValue, "/git-receive-pack") diff --git a/internal/strategy/git/snapshot.go b/internal/strategy/git/snapshot.go index 85ecc8e..3284449 100644 --- a/internal/strategy/git/snapshot.go +++ b/internal/strategy/git/snapshot.go @@ -219,60 +219,60 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, } } -// serveSnapshotWithBundle serves a cached snapshot, optionally appending a -// delta bundle covering commits between the snapshot's HEAD and the mirror's -// current HEAD. When no bundle is needed (common case), the snapshot is -// streamed directly without buffering. -func (s *Strategy) serveSnapshotWithBundle(ctx context.Context, w http.ResponseWriter, reader io.ReadCloser, headers http.Header, repo *gitclone.Repository, upstreamURL string) error { +func (s *Strategy) handleBundleRequest(w http.ResponseWriter, r *http.Request, host, pathValue string) { + ctx := r.Context() logger := logging.FromContext(ctx) - snapshotCommit := headers.Get("X-Cachew-Snapshot-Commit") - mirrorHead := s.getMirrorHead(ctx, repo) + repoPath := ExtractRepoPath(strings.TrimSuffix(pathValue, "/snapshot.bundle")) + upstreamURL := "https://" + host + "/" + repoPath - var bundleData []byte - if snapshotCommit != "" && mirrorHead != "" && snapshotCommit != mirrorHead { - var err error - bundleData, err = s.createBundle(ctx, repo, snapshotCommit) - if err != nil { - logger.WarnContext(ctx, "Failed to create delta bundle, serving snapshot only", - "upstream", upstreamURL, "error", err) - } + base := r.URL.Query().Get("base") + if base == "" { + http.Error(w, "missing base query parameter", http.StatusBadRequest) + return } - if len(bundleData) == 0 { - w.Header().Set("Content-Type", "application/zstd") - _, err := io.Copy(w, reader) - return errors.Wrap(err, "stream snapshot") - } - - // Determine snapshot size so the client can split snapshot from bundle. - // Prefer Content-Length from cache headers (works for S3), fall back to - // Stat/Seek on the reader (works for disk cache). - var snapshotSize int64 - if cl := headers.Get("Content-Length"); cl != "" { - snapshotSize, _ = strconv.ParseInt(cl, 10, 64) //nolint:errcheck // best-effort; zero triggers readerSize fallback - } - if snapshotSize == 0 { - var err error - snapshotSize, err = readerSize(reader) - if err != nil { - logger.WarnContext(ctx, "Cannot determine snapshot size, serving without bundle", - "upstream", upstreamURL, "error", err) - w.Header().Set("Content-Type", "application/zstd") - _, err := io.Copy(w, reader) - return errors.Wrap(err, "stream snapshot") - } + repo, repoErr := s.cloneManager.GetOrCreate(ctx, upstreamURL) + if repoErr != nil { + logger.ErrorContext(ctx, "Failed to get or create clone", "upstream", upstreamURL, "error", repoErr) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + if cloneErr := s.ensureCloneReady(ctx, repo); cloneErr != nil { + logger.ErrorContext(ctx, "Clone unavailable for bundle", "upstream", upstreamURL, "error", cloneErr) + http.Error(w, "Repository unavailable", http.StatusServiceUnavailable) + return } - w.Header().Set("Content-Type", "application/x-cachew-snapshot") - w.Header().Set("X-Cachew-Snapshot-Size", strconv.FormatInt(snapshotSize, 10)) - if _, err := io.Copy(w, reader); err != nil { - return errors.Wrap(err, "write snapshot portion") + bundleData, err := s.createBundle(ctx, repo, base) + if err != nil { + logger.WarnContext(ctx, "Failed to create bundle", "upstream", upstreamURL, "base", base, "error", err) + http.Error(w, "Bundle not available", http.StatusNotFound) + return } - if _, err := w.Write(bundleData); err != nil { //nolint:gosec // bundleData is a git bundle from a trusted mirror - return errors.Wrap(err, "write bundle") + + w.Header().Set("Content-Type", "application/x-git-bundle") + w.Header().Set("Content-Length", strconv.Itoa(len(bundleData))) + if _, err := w.Write(bundleData); err != nil { + logger.WarnContext(ctx, "Failed to write bundle response", "upstream", upstreamURL, "error", err) } - return nil +} + +func (s *Strategy) serveSnapshotWithBundle(ctx context.Context, w http.ResponseWriter, reader io.ReadCloser, headers http.Header, repo *gitclone.Repository, upstreamURL string) error { + snapshotCommit := headers.Get("X-Cachew-Snapshot-Commit") + mirrorHead := s.getMirrorHead(ctx, repo) + + if snapshotCommit != "" && mirrorHead != "" && snapshotCommit != mirrorHead { + repoPath, err := gitclone.RepoPathFromURL(upstreamURL) + if err == nil { + bundleURL := fmt.Sprintf("/git/%s/snapshot.bundle?base=%s", repoPath, snapshotCommit) + w.Header().Set("X-Cachew-Bundle-URL", bundleURL) + } + } + + w.Header().Set("Content-Type", "application/zstd") + _, err := io.Copy(w, reader) + return errors.Wrap(err, "stream snapshot") } func revParse(ctx context.Context, repoDir, ref string) (string, error) { @@ -284,71 +284,40 @@ func revParse(ctx context.Context, repoDir, ref string) (string, error) { return strings.TrimSpace(string(output)), nil } -// readerSize returns the size of the underlying content without consuming it. -// Works for *os.File (disk cache) and io.Seeker implementations. -func readerSize(r io.Reader) (int64, error) { - if f, ok := r.(interface{ Stat() (os.FileInfo, error) }); ok { - info, err := f.Stat() - if err != nil { - return 0, errors.Wrap(err, "stat reader") - } - return info.Size(), nil - } - if s, ok := r.(io.Seeker); ok { - cur, err := s.Seek(0, io.SeekCurrent) - if err != nil { - return 0, errors.Wrap(err, "seek current") - } - end, err := s.Seek(0, io.SeekEnd) - if err != nil { - return 0, errors.Wrap(err, "seek end") - } - if _, err := s.Seek(cur, io.SeekStart); err != nil { - return 0, errors.Wrap(err, "seek restore") - } - return end - cur, nil - } - return 0, errors.New("reader does not support size queries") -} - func (s *Strategy) getMirrorHead(ctx context.Context, repo *gitclone.Repository) string { - head, _ := gitclone.WithReadLockReturn(repo, func() (string, error) { //nolint:errcheck // best-effort; empty string signals failure to callers - return revParse(ctx, repo.Path(), "HEAD") - }) + head, _ := revParse(ctx, repo.Path(), "HEAD") //nolint:errcheck // best-effort; empty string signals failure to callers return head } func (s *Strategy) createBundle(ctx context.Context, repo *gitclone.Repository, baseCommit string) ([]byte, error) { - return gitclone.WithReadLockReturn(repo, func() ([]byte, error) { //nolint:wrapcheck // error is already wrapped inside the closure - // Resolve HEAD to its concrete branch ref (e.g. refs/heads/main) to - // avoid ambiguity when a branch named "HEAD" exists in the mirror. - headRef := "HEAD" - if out, err := exec.CommandContext(ctx, "git", "-C", repo.Path(), "symbolic-ref", "HEAD").Output(); err == nil { // #nosec G204 - headRef = strings.TrimSpace(string(out)) - } + // No read lock needed: git bundle create reads objects through git's own + // file-level locking, safe to run concurrently with fetches. + headRef := "HEAD" + if out, err := exec.CommandContext(ctx, "git", "-C", repo.Path(), "symbolic-ref", "HEAD").Output(); err == nil { // #nosec G204 + headRef = strings.TrimSpace(string(out)) + } - tmpFile, err := os.CreateTemp("", "cachew-bundle-*.bundle") - if err != nil { - return nil, errors.Wrap(err, "create bundle temp file") - } - bundlePath := tmpFile.Name() - defer os.Remove(bundlePath) //nolint:errcheck - if err := tmpFile.Close(); err != nil { - return nil, errors.Wrap(err, "close bundle temp file") - } + tmpFile, err := os.CreateTemp("", "cachew-bundle-*.bundle") + if err != nil { + return nil, errors.Wrap(err, "create bundle temp file") + } + bundlePath := tmpFile.Name() + defer os.Remove(bundlePath) //nolint:errcheck + if err := tmpFile.Close(); err != nil { + return nil, errors.Wrap(err, "close bundle temp file") + } - cmd := exec.CommandContext(ctx, "git", "-C", repo.Path(), "bundle", "create", //nolint:gosec // baseCommit is a SHA string from rev-parse - bundlePath, headRef, "^"+baseCommit) - if output, err := cmd.CombinedOutput(); err != nil { - return nil, errors.Wrapf(err, "git bundle create: %s", string(output)) - } + cmd := exec.CommandContext(ctx, "git", "-C", repo.Path(), "bundle", "create", //nolint:gosec // baseCommit is a SHA string from rev-parse + bundlePath, headRef, "^"+baseCommit) + if output, err := cmd.CombinedOutput(); err != nil { + return nil, errors.Wrapf(err, "git bundle create: %s", string(output)) + } - data, err := os.ReadFile(bundlePath) //nolint:gosec // bundlePath is a temp file we created - if err != nil { - return nil, errors.Wrap(err, "read bundle file") - } - return data, nil - }) + data, err := os.ReadFile(bundlePath) //nolint:gosec // bundlePath is a temp file we created + if err != nil { + return nil, errors.Wrap(err, "read bundle file") + } + return data, nil } // serveSnapshotWithSpool handles snapshot cache misses using the spool pattern. From 1da3e989435cb5716222faa6dd5151bc09657f56 Mon Sep 17 00:00:00 2001 From: Elizabeth Worstell Date: Thu, 19 Mar 2026 11:43:43 -0700 Subject: [PATCH 3/3] feat(git): cache delta bundles in S3 for cross-pod serving When a bundle is generated (either proactively during snapshot serving or on-demand at the bundle endpoint), cache it in S3 with a 2h TTL. The bundle endpoint checks cache first, so any pod can serve bundles without needing the local mirror. Eliminates 404s when the bundle request is load-balanced to a different pod. Co-authored-by: Amp Amp-Thread-ID: https://ampcode.com/threads/T-019d0383-345f-7659-8538-30bc7c9a7e6d --- internal/strategy/git/snapshot.go | 67 +++++++++++++++++++++++++++++-- 1 file changed, 63 insertions(+), 4 deletions(-) diff --git a/internal/strategy/git/snapshot.go b/internal/strategy/git/snapshot.go index 3284449..c3ccac3 100644 --- a/internal/strategy/git/snapshot.go +++ b/internal/strategy/git/snapshot.go @@ -11,6 +11,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/alecthomas/errors" @@ -36,6 +37,10 @@ func mirrorSnapshotCacheKey(upstreamURL string) cache.Key { return cache.NewKey(upstreamURL + ".mirror-snapshot") } +func bundleCacheKey(upstreamURL, baseCommit string) cache.Key { + return cache.NewKey(upstreamURL + ".bundle." + baseCommit) +} + // remoteURLForSnapshot returns the URL to embed as remote.origin.url in snapshots. // When a server URL is configured, it returns the cachew URL for the repo so that // git pull goes through cachew. Otherwise it falls back to the upstream URL. @@ -232,6 +237,19 @@ func (s *Strategy) handleBundleRequest(w http.ResponseWriter, r *http.Request, h return } + bKey := bundleCacheKey(upstreamURL, base) + + // Try serving from cache first — works on any pod. + if reader, _, err := s.cache.Open(ctx, bKey); err == nil && reader != nil { + defer reader.Close() + w.Header().Set("Content-Type", "application/x-git-bundle") + if _, err := io.Copy(w, reader); err != nil { + logger.WarnContext(ctx, "Failed to stream cached bundle", "upstream", upstreamURL, "error", err) + } + return + } + + // Fallback: generate from local mirror. repo, repoErr := s.cloneManager.GetOrCreate(ctx, upstreamURL) if repoErr != nil { logger.ErrorContext(ctx, "Failed to get or create clone", "upstream", upstreamURL, "error", repoErr) @@ -251,9 +269,12 @@ func (s *Strategy) handleBundleRequest(w http.ResponseWriter, r *http.Request, h return } + // Cache for future requests from any pod. + s.cacheBundleAsync(ctx, bKey, bundleData) + w.Header().Set("Content-Type", "application/x-git-bundle") w.Header().Set("Content-Length", strconv.Itoa(len(bundleData))) - if _, err := w.Write(bundleData); err != nil { + if _, err := w.Write(bundleData); err != nil { //nolint:gosec // bundleData is a git bundle generated from a trusted local mirror logger.WarnContext(ctx, "Failed to write bundle response", "upstream", upstreamURL, "error", err) } } @@ -266,8 +287,20 @@ func (s *Strategy) serveSnapshotWithBundle(ctx context.Context, w http.ResponseW repoPath, err := gitclone.RepoPathFromURL(upstreamURL) if err == nil { bundleURL := fmt.Sprintf("/git/%s/snapshot.bundle?base=%s", repoPath, snapshotCommit) - w.Header().Set("X-Cachew-Bundle-URL", bundleURL) + w.Header().Set("X-Cachew-Bundle-Url", bundleURL) } + + // Proactively generate and cache the bundle so any pod can serve it. + go func() { + bgCtx := context.WithoutCancel(ctx) + logger := logging.FromContext(bgCtx) + bundleData, err := s.createBundle(bgCtx, repo, snapshotCommit) + if err != nil { + logger.WarnContext(bgCtx, "Failed to pre-generate bundle", "upstream", upstreamURL, "error", err) + return + } + s.cacheBundleSync(bgCtx, bundleCacheKey(upstreamURL, snapshotCommit), bundleData) + }() } w.Header().Set("Content-Type", "application/zstd") @@ -275,8 +308,34 @@ func (s *Strategy) serveSnapshotWithBundle(ctx context.Context, w http.ResponseW return errors.Wrap(err, "stream snapshot") } +const bundleCacheTTL = 2 * time.Hour + +func (s *Strategy) cacheBundleAsync(ctx context.Context, key cache.Key, data []byte) { + go func() { + s.cacheBundleSync(context.WithoutCancel(ctx), key, data) + }() +} + +func (s *Strategy) cacheBundleSync(ctx context.Context, key cache.Key, data []byte) { + logger := logging.FromContext(ctx) + headers := http.Header{"Content-Type": {"application/x-git-bundle"}} + wc, err := s.cache.Create(ctx, key, headers, bundleCacheTTL) + if err != nil { + logger.WarnContext(ctx, "Failed to cache bundle", "error", err) + return + } + if _, err := wc.Write(data); err != nil { + logger.WarnContext(ctx, "Failed to write bundle to cache", "error", err) + _ = wc.Close() + return + } + if err := wc.Close(); err != nil { + logger.WarnContext(ctx, "Failed to close bundle cache writer", "error", err) + } +} + func revParse(ctx context.Context, repoDir, ref string) (string, error) { - cmd := exec.CommandContext(ctx, "git", "-C", repoDir, "rev-parse", ref) // #nosec G204 + cmd := exec.CommandContext(ctx, "git", "-C", repoDir, "rev-parse", ref) // #nosec G204 G702 output, err := cmd.Output() if err != nil { return "", errors.Wrapf(err, "git rev-parse %s", ref) @@ -293,7 +352,7 @@ func (s *Strategy) createBundle(ctx context.Context, repo *gitclone.Repository, // No read lock needed: git bundle create reads objects through git's own // file-level locking, safe to run concurrently with fetches. headRef := "HEAD" - if out, err := exec.CommandContext(ctx, "git", "-C", repo.Path(), "symbolic-ref", "HEAD").Output(); err == nil { // #nosec G204 + if out, err := exec.CommandContext(ctx, "git", "-C", repo.Path(), "symbolic-ref", "HEAD").Output(); err == nil { // #nosec G204 G702 headRef = strings.TrimSpace(string(out)) }