Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions internal/cache/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net/http"
"os"
"runtime"
"strconv"
"time"

"github.com/alecthomas/errors"
Expand Down Expand Up @@ -244,6 +245,8 @@ func (s *S3) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, err
headers.Set("Last-Modified", objInfo.LastModified.UTC().Format(http.TimeFormat))
}

headers.Set("Content-Length", strconv.FormatInt(objInfo.Size, 10))

// Reset expiration time to implement LRU (same as disk cache).
// Only refresh when remaining TTL is below 50% of max to avoid a
// server-side copy on every read.
Expand Down
11 changes: 10 additions & 1 deletion internal/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
// The operation is fully streaming - no temporary files are created.
// Exclude patterns use tar's --exclude syntax.
// threads controls zstd parallelism; 0 uses all available CPU cores.
func Create(ctx context.Context, remote cache.Cache, key cache.Key, directory string, ttl time.Duration, excludePatterns []string, threads int) error {
// Any extra headers are merged into the cache metadata alongside the default
// Content-Type and Content-Disposition headers.
func Create(ctx context.Context, remote cache.Cache, key cache.Key, directory string, ttl time.Duration, excludePatterns []string, threads int, extraHeaders ...http.Header) error {
if threads <= 0 {
threads = runtime.NumCPU()
}
Expand All @@ -39,6 +41,13 @@ func Create(ctx context.Context, remote cache.Cache, key cache.Key, directory st
headers := make(http.Header)
headers.Set("Content-Type", "application/zstd")
headers.Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", filepath.Base(directory)+".tar.zst"))
for _, eh := range extraHeaders {
for k, vals := range eh {
for _, v := range vals {
headers.Set(k, v)
}
}
}

wc, err := remote.Create(ctx, key, headers, ttl)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions internal/strategy/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,11 @@ func (s *Strategy) handleRequest(w http.ResponseWriter, r *http.Request) {
return
}

if strings.HasSuffix(pathValue, "/snapshot.bundle") {
s.handleBundleRequest(w, r, host, pathValue)
return
}

service := r.URL.Query().Get("service")
isReceivePack := service == "git-receive-pack" || strings.HasSuffix(pathValue, "/git-receive-pack")

Expand Down
216 changes: 176 additions & 40 deletions internal/strategy/git/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"sync"
"time"

"github.com/alecthomas/errors"

Expand All @@ -35,6 +37,10 @@ func mirrorSnapshotCacheKey(upstreamURL string) cache.Key {
return cache.NewKey(upstreamURL + ".mirror-snapshot")
}

func bundleCacheKey(upstreamURL, baseCommit string) cache.Key {
return cache.NewKey(upstreamURL + ".bundle." + baseCommit)
}

// remoteURLForSnapshot returns the URL to embed as remote.origin.url in snapshots.
// When a server URL is configured, it returns the cachew URL for the repo so that
// git pull goes through cachew. Otherwise it falls back to the upstream URL.
Expand Down Expand Up @@ -101,10 +107,19 @@ func (s *Strategy) generateAndUploadSnapshot(ctx context.Context, repo *gitclone
return err
}

// Capture the snapshot's HEAD so we can later build a delta bundle between
// the cached snapshot and the current mirror state.
headSHA, err := revParse(ctx, snapshotDir, "HEAD")
if err != nil {
_ = os.RemoveAll(snapshotDir) //nolint:gosec
return errors.Wrap(err, "rev-parse HEAD for snapshot")
}
extraHeaders := http.Header{}
extraHeaders.Set("X-Cachew-Snapshot-Commit", headSHA)

cacheKey := snapshotCacheKey(upstream)
excludePatterns := []string{"./.git/*.lock"}

err = snapshot.Create(ctx, s.cache, cacheKey, snapshotDir, 0, excludePatterns, s.config.ZstdThreads)
err = snapshot.Create(ctx, s.cache, cacheKey, snapshotDir, 0, nil, s.config.ZstdThreads, extraHeaders)

// Always clean up the snapshot working directory.
if rmErr := os.RemoveAll(snapshotDir); rmErr != nil { //nolint:gosec // snapshotDir is derived from controlled mirrorRoot + upstream URL
Expand Down Expand Up @@ -176,8 +191,7 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request,
repoPath := ExtractRepoPath(strings.TrimSuffix(pathValue, "/snapshot.tar.zst"))
upstreamURL := "https://" + host + "/" + repoPath

// Ensure the local mirror is ready and up to date before considering any
// cached snapshot, so we never serve stale data to workstations.
// Ensure the local mirror is ready before considering any cached snapshot.
repo, repoErr := s.cloneManager.GetOrCreate(ctx, upstreamURL)
if repoErr != nil {
logger.ErrorContext(ctx, "Failed to get or create clone", "upstream", upstreamURL, "error", repoErr)
Expand All @@ -189,23 +203,7 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request,
http.Error(w, "Repository unavailable", http.StatusServiceUnavailable)
return
}
// Fetch in the background to keep the mirror fresh for subsequent
// git-fetch/git-pull operations through cachew, but don't block
// snapshot serving on it.
refsStale, err := s.checkRefsStale(ctx, repo)
if err != nil {
logger.WarnContext(ctx, "Failed to check upstream refs", "upstream", upstreamURL, "error", err)
}
if refsStale {
logger.InfoContext(ctx, "Refs stale for snapshot request, fetching in background", "upstream", upstreamURL)
go func() {
bgCtx := context.WithoutCancel(ctx)
if err := repo.Fetch(bgCtx); err != nil {
logger.WarnContext(bgCtx, "Background fetch for snapshot failed", "upstream", upstreamURL, "error", err)
}
}()
}

s.maybeBackgroundFetch(repo)
cacheKey := snapshotCacheKey(upstreamURL)

reader, headers, err := s.cache.Open(ctx, cacheKey)
Expand All @@ -215,30 +213,170 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request,
return
}

// Always serve a cached snapshot if one exists. The workstation will
// git-fetch through cachew after extracting the snapshot, which picks
// up any commits that arrived since the snapshot was built. Regeneration
// happens in the background via the periodic snapshot job and the
// background upload in writeSnapshotSpool, keeping the cached snapshot
// reasonably fresh without blocking requests.
if reader != nil {
logger.DebugContext(ctx, "Serving cached snapshot", "upstream", upstreamURL)
}

if reader == nil {
s.serveSnapshotWithSpool(w, r, repo, upstreamURL)
return
}
defer reader.Close()

for key, values := range headers {
for _, value := range values {
w.Header().Add(key, value)
if err := s.serveSnapshotWithBundle(ctx, w, reader, headers, repo, upstreamURL); err != nil {
logger.ErrorContext(ctx, "Failed to serve snapshot", "upstream", upstreamURL, "error", err)
}
}

func (s *Strategy) handleBundleRequest(w http.ResponseWriter, r *http.Request, host, pathValue string) {
ctx := r.Context()
logger := logging.FromContext(ctx)

repoPath := ExtractRepoPath(strings.TrimSuffix(pathValue, "/snapshot.bundle"))
upstreamURL := "https://" + host + "/" + repoPath

base := r.URL.Query().Get("base")
if base == "" {
http.Error(w, "missing base query parameter", http.StatusBadRequest)
return
}

bKey := bundleCacheKey(upstreamURL, base)

// Try serving from cache first — works on any pod.
if reader, _, err := s.cache.Open(ctx, bKey); err == nil && reader != nil {
defer reader.Close()
w.Header().Set("Content-Type", "application/x-git-bundle")
if _, err := io.Copy(w, reader); err != nil {
logger.WarnContext(ctx, "Failed to stream cached bundle", "upstream", upstreamURL, "error", err)
}
return
}

// Fallback: generate from local mirror.
repo, repoErr := s.cloneManager.GetOrCreate(ctx, upstreamURL)
if repoErr != nil {
logger.ErrorContext(ctx, "Failed to get or create clone", "upstream", upstreamURL, "error", repoErr)
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
if cloneErr := s.ensureCloneReady(ctx, repo); cloneErr != nil {
logger.ErrorContext(ctx, "Clone unavailable for bundle", "upstream", upstreamURL, "error", cloneErr)
http.Error(w, "Repository unavailable", http.StatusServiceUnavailable)
return
}
if _, err = io.Copy(w, reader); err != nil {
logger.ErrorContext(ctx, "Failed to stream snapshot", "upstream", upstreamURL, "error", err)

bundleData, err := s.createBundle(ctx, repo, base)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand why this wouldn't just return an io.Reader rather than buffering?

if err != nil {
logger.WarnContext(ctx, "Failed to create bundle", "upstream", upstreamURL, "base", base, "error", err)
http.Error(w, "Bundle not available", http.StatusNotFound)
return
}

// Cache for future requests from any pod.
s.cacheBundleAsync(ctx, bKey, bundleData)

w.Header().Set("Content-Type", "application/x-git-bundle")
w.Header().Set("Content-Length", strconv.Itoa(len(bundleData)))
if _, err := w.Write(bundleData); err != nil { //nolint:gosec // bundleData is a git bundle generated from a trusted local mirror
logger.WarnContext(ctx, "Failed to write bundle response", "upstream", upstreamURL, "error", err)
}
}

func (s *Strategy) serveSnapshotWithBundle(ctx context.Context, w http.ResponseWriter, reader io.ReadCloser, headers http.Header, repo *gitclone.Repository, upstreamURL string) error {
snapshotCommit := headers.Get("X-Cachew-Snapshot-Commit")
mirrorHead := s.getMirrorHead(ctx, repo)

if snapshotCommit != "" && mirrorHead != "" && snapshotCommit != mirrorHead {
repoPath, err := gitclone.RepoPathFromURL(upstreamURL)
if err == nil {
bundleURL := fmt.Sprintf("/git/%s/snapshot.bundle?base=%s", repoPath, snapshotCommit)
w.Header().Set("X-Cachew-Bundle-Url", bundleURL)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

}

// Proactively generate and cache the bundle so any pod can serve it.
go func() {
bgCtx := context.WithoutCancel(ctx)
logger := logging.FromContext(bgCtx)
bundleData, err := s.createBundle(bgCtx, repo, snapshotCommit)
if err != nil {
logger.WarnContext(bgCtx, "Failed to pre-generate bundle", "upstream", upstreamURL, "error", err)
return
}
s.cacheBundleSync(bgCtx, bundleCacheKey(upstreamURL, snapshotCommit), bundleData)
}()
}

w.Header().Set("Content-Type", "application/zstd")
_, err := io.Copy(w, reader)
return errors.Wrap(err, "stream snapshot")
}

const bundleCacheTTL = 2 * time.Hour

func (s *Strategy) cacheBundleAsync(ctx context.Context, key cache.Key, data []byte) {
go func() {
s.cacheBundleSync(context.WithoutCancel(ctx), key, data)
}()
}

func (s *Strategy) cacheBundleSync(ctx context.Context, key cache.Key, data []byte) {
logger := logging.FromContext(ctx)
headers := http.Header{"Content-Type": {"application/x-git-bundle"}}
wc, err := s.cache.Create(ctx, key, headers, bundleCacheTTL)
if err != nil {
logger.WarnContext(ctx, "Failed to cache bundle", "error", err)
return
}
if _, err := wc.Write(data); err != nil {
logger.WarnContext(ctx, "Failed to write bundle to cache", "error", err)
_ = wc.Close()
return
}
if err := wc.Close(); err != nil {
logger.WarnContext(ctx, "Failed to close bundle cache writer", "error", err)
}
}

func revParse(ctx context.Context, repoDir, ref string) (string, error) {
cmd := exec.CommandContext(ctx, "git", "-C", repoDir, "rev-parse", ref) // #nosec G204 G702
output, err := cmd.Output()
if err != nil {
return "", errors.Wrapf(err, "git rev-parse %s", ref)
}
return strings.TrimSpace(string(output)), nil
}

func (s *Strategy) getMirrorHead(ctx context.Context, repo *gitclone.Repository) string {
head, _ := revParse(ctx, repo.Path(), "HEAD") //nolint:errcheck // best-effort; empty string signals failure to callers
return head
}

func (s *Strategy) createBundle(ctx context.Context, repo *gitclone.Repository, baseCommit string) ([]byte, error) {
// No read lock needed: git bundle create reads objects through git's own
// file-level locking, safe to run concurrently with fetches.
headRef := "HEAD"
if out, err := exec.CommandContext(ctx, "git", "-C", repo.Path(), "symbolic-ref", "HEAD").Output(); err == nil { // #nosec G204 G702
headRef = strings.TrimSpace(string(out))
}

tmpFile, err := os.CreateTemp("", "cachew-bundle-*.bundle")
if err != nil {
return nil, errors.Wrap(err, "create bundle temp file")
}
bundlePath := tmpFile.Name()
defer os.Remove(bundlePath) //nolint:errcheck
if err := tmpFile.Close(); err != nil {
return nil, errors.Wrap(err, "close bundle temp file")
}

cmd := exec.CommandContext(ctx, "git", "-C", repo.Path(), "bundle", "create", //nolint:gosec // baseCommit is a SHA string from rev-parse
bundlePath, headRef, "^"+baseCommit)
if output, err := cmd.CombinedOutput(); err != nil {
return nil, errors.Wrapf(err, "git bundle create: %s", string(output))
}

data, err := os.ReadFile(bundlePath) //nolint:gosec // bundlePath is a temp file we created
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no need to buffer this. The file can be deleted while open, and the reader will continue.

if err != nil {
return nil, errors.Wrap(err, "read bundle file")
}
return data, nil
}

// serveSnapshotWithSpool handles snapshot cache misses using the spool pattern.
Expand Down Expand Up @@ -303,8 +441,7 @@ func (s *Strategy) streamSnapshotDirect(w http.ResponseWriter, r *http.Request,
w.Header().Set("Content-Type", "application/zstd")
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", filepath.Base(repoDir)+".tar.zst"))

excludePatterns := []string{"./.git/*.lock"}
if err := snapshot.StreamTo(ctx, w, repoDir, excludePatterns, s.config.ZstdThreads); err != nil {
if err := snapshot.StreamTo(ctx, w, repoDir, nil, s.config.ZstdThreads); err != nil {
logger.ErrorContext(ctx, "Failed to stream snapshot to client", "upstream", upstreamURL, "error", err)
}
}
Expand Down Expand Up @@ -369,8 +506,7 @@ func (s *Strategy) writeSnapshotSpool(w http.ResponseWriter, r *http.Request, re
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", filepath.Base(repoDir)+".tar.zst"))

tw := NewSpoolTeeWriter(w, spool)
excludePatterns := []string{"./.git/*.lock"}
if err := snapshot.StreamTo(ctx, tw, repoDir, excludePatterns, s.config.ZstdThreads); err != nil {
if err := snapshot.StreamTo(ctx, tw, repoDir, nil, s.config.ZstdThreads); err != nil {
logger.ErrorContext(ctx, "Failed to stream snapshot to client", "upstream", upstreamURL, "error", err)
spool.MarkError(err)
} else {
Expand Down