diff --git a/internal/cache/s3.go b/internal/cache/s3.go index 9256b9c..f2a029f 100644 --- a/internal/cache/s3.go +++ b/internal/cache/s3.go @@ -11,6 +11,7 @@ import ( "net/http" "os" "runtime" + "strconv" "time" "github.com/alecthomas/errors" @@ -244,6 +245,8 @@ func (s *S3) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, err headers.Set("Last-Modified", objInfo.LastModified.UTC().Format(http.TimeFormat)) } + headers.Set("Content-Length", strconv.FormatInt(objInfo.Size, 10)) + // Reset expiration time to implement LRU (same as disk cache). // Only refresh when remaining TTL is below 50% of max to avoid a // server-side copy on every read. diff --git a/internal/snapshot/snapshot.go b/internal/snapshot/snapshot.go index 8486a3e..d29ecb0 100644 --- a/internal/snapshot/snapshot.go +++ b/internal/snapshot/snapshot.go @@ -24,7 +24,9 @@ import ( // The operation is fully streaming - no temporary files are created. // Exclude patterns use tar's --exclude syntax. // threads controls zstd parallelism; 0 uses all available CPU cores. -func Create(ctx context.Context, remote cache.Cache, key cache.Key, directory string, ttl time.Duration, excludePatterns []string, threads int) error { +// Any extra headers are merged into the cache metadata alongside the default +// Content-Type and Content-Disposition headers. +func Create(ctx context.Context, remote cache.Cache, key cache.Key, directory string, ttl time.Duration, excludePatterns []string, threads int, extraHeaders ...http.Header) error { if threads <= 0 { threads = runtime.NumCPU() } @@ -39,6 +41,13 @@ func Create(ctx context.Context, remote cache.Cache, key cache.Key, directory st headers := make(http.Header) headers.Set("Content-Type", "application/zstd") headers.Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", filepath.Base(directory)+".tar.zst")) + for _, eh := range extraHeaders { + for k, vals := range eh { + for _, v := range vals { + headers.Set(k, v) + } + } + } wc, err := remote.Create(ctx, key, headers, ttl) if err != nil { diff --git a/internal/strategy/git/git.go b/internal/strategy/git/git.go index c153d67..a457743 100644 --- a/internal/strategy/git/git.go +++ b/internal/strategy/git/git.go @@ -219,6 +219,11 @@ func (s *Strategy) handleRequest(w http.ResponseWriter, r *http.Request) { return } + if strings.HasSuffix(pathValue, "/snapshot.bundle") { + s.handleBundleRequest(w, r, host, pathValue) + return + } + service := r.URL.Query().Get("service") isReceivePack := service == "git-receive-pack" || strings.HasSuffix(pathValue, "/git-receive-pack") diff --git a/internal/strategy/git/snapshot.go b/internal/strategy/git/snapshot.go index c6002b0..c3ccac3 100644 --- a/internal/strategy/git/snapshot.go +++ b/internal/strategy/git/snapshot.go @@ -8,8 +8,10 @@ import ( "os" "os/exec" "path/filepath" + "strconv" "strings" "sync" + "time" "github.com/alecthomas/errors" @@ -35,6 +37,10 @@ func mirrorSnapshotCacheKey(upstreamURL string) cache.Key { return cache.NewKey(upstreamURL + ".mirror-snapshot") } +func bundleCacheKey(upstreamURL, baseCommit string) cache.Key { + return cache.NewKey(upstreamURL + ".bundle." + baseCommit) +} + // remoteURLForSnapshot returns the URL to embed as remote.origin.url in snapshots. // When a server URL is configured, it returns the cachew URL for the repo so that // git pull goes through cachew. Otherwise it falls back to the upstream URL. @@ -101,10 +107,19 @@ func (s *Strategy) generateAndUploadSnapshot(ctx context.Context, repo *gitclone return err } + // Capture the snapshot's HEAD so we can later build a delta bundle between + // the cached snapshot and the current mirror state. + headSHA, err := revParse(ctx, snapshotDir, "HEAD") + if err != nil { + _ = os.RemoveAll(snapshotDir) //nolint:gosec + return errors.Wrap(err, "rev-parse HEAD for snapshot") + } + extraHeaders := http.Header{} + extraHeaders.Set("X-Cachew-Snapshot-Commit", headSHA) + cacheKey := snapshotCacheKey(upstream) - excludePatterns := []string{"./.git/*.lock"} - err = snapshot.Create(ctx, s.cache, cacheKey, snapshotDir, 0, excludePatterns, s.config.ZstdThreads) + err = snapshot.Create(ctx, s.cache, cacheKey, snapshotDir, 0, nil, s.config.ZstdThreads, extraHeaders) // Always clean up the snapshot working directory. if rmErr := os.RemoveAll(snapshotDir); rmErr != nil { //nolint:gosec // snapshotDir is derived from controlled mirrorRoot + upstream URL @@ -176,8 +191,7 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, repoPath := ExtractRepoPath(strings.TrimSuffix(pathValue, "/snapshot.tar.zst")) upstreamURL := "https://" + host + "/" + repoPath - // Ensure the local mirror is ready and up to date before considering any - // cached snapshot, so we never serve stale data to workstations. + // Ensure the local mirror is ready before considering any cached snapshot. repo, repoErr := s.cloneManager.GetOrCreate(ctx, upstreamURL) if repoErr != nil { logger.ErrorContext(ctx, "Failed to get or create clone", "upstream", upstreamURL, "error", repoErr) @@ -189,23 +203,7 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, http.Error(w, "Repository unavailable", http.StatusServiceUnavailable) return } - // Fetch in the background to keep the mirror fresh for subsequent - // git-fetch/git-pull operations through cachew, but don't block - // snapshot serving on it. - refsStale, err := s.checkRefsStale(ctx, repo) - if err != nil { - logger.WarnContext(ctx, "Failed to check upstream refs", "upstream", upstreamURL, "error", err) - } - if refsStale { - logger.InfoContext(ctx, "Refs stale for snapshot request, fetching in background", "upstream", upstreamURL) - go func() { - bgCtx := context.WithoutCancel(ctx) - if err := repo.Fetch(bgCtx); err != nil { - logger.WarnContext(bgCtx, "Background fetch for snapshot failed", "upstream", upstreamURL, "error", err) - } - }() - } - + s.maybeBackgroundFetch(repo) cacheKey := snapshotCacheKey(upstreamURL) reader, headers, err := s.cache.Open(ctx, cacheKey) @@ -215,30 +213,170 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, return } - // Always serve a cached snapshot if one exists. The workstation will - // git-fetch through cachew after extracting the snapshot, which picks - // up any commits that arrived since the snapshot was built. Regeneration - // happens in the background via the periodic snapshot job and the - // background upload in writeSnapshotSpool, keeping the cached snapshot - // reasonably fresh without blocking requests. - if reader != nil { - logger.DebugContext(ctx, "Serving cached snapshot", "upstream", upstreamURL) - } - if reader == nil { s.serveSnapshotWithSpool(w, r, repo, upstreamURL) return } defer reader.Close() - for key, values := range headers { - for _, value := range values { - w.Header().Add(key, value) + if err := s.serveSnapshotWithBundle(ctx, w, reader, headers, repo, upstreamURL); err != nil { + logger.ErrorContext(ctx, "Failed to serve snapshot", "upstream", upstreamURL, "error", err) + } +} + +func (s *Strategy) handleBundleRequest(w http.ResponseWriter, r *http.Request, host, pathValue string) { + ctx := r.Context() + logger := logging.FromContext(ctx) + + repoPath := ExtractRepoPath(strings.TrimSuffix(pathValue, "/snapshot.bundle")) + upstreamURL := "https://" + host + "/" + repoPath + + base := r.URL.Query().Get("base") + if base == "" { + http.Error(w, "missing base query parameter", http.StatusBadRequest) + return + } + + bKey := bundleCacheKey(upstreamURL, base) + + // Try serving from cache first — works on any pod. + if reader, _, err := s.cache.Open(ctx, bKey); err == nil && reader != nil { + defer reader.Close() + w.Header().Set("Content-Type", "application/x-git-bundle") + if _, err := io.Copy(w, reader); err != nil { + logger.WarnContext(ctx, "Failed to stream cached bundle", "upstream", upstreamURL, "error", err) } + return + } + + // Fallback: generate from local mirror. + repo, repoErr := s.cloneManager.GetOrCreate(ctx, upstreamURL) + if repoErr != nil { + logger.ErrorContext(ctx, "Failed to get or create clone", "upstream", upstreamURL, "error", repoErr) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + if cloneErr := s.ensureCloneReady(ctx, repo); cloneErr != nil { + logger.ErrorContext(ctx, "Clone unavailable for bundle", "upstream", upstreamURL, "error", cloneErr) + http.Error(w, "Repository unavailable", http.StatusServiceUnavailable) + return } - if _, err = io.Copy(w, reader); err != nil { - logger.ErrorContext(ctx, "Failed to stream snapshot", "upstream", upstreamURL, "error", err) + + bundleData, err := s.createBundle(ctx, repo, base) + if err != nil { + logger.WarnContext(ctx, "Failed to create bundle", "upstream", upstreamURL, "base", base, "error", err) + http.Error(w, "Bundle not available", http.StatusNotFound) + return + } + + // Cache for future requests from any pod. + s.cacheBundleAsync(ctx, bKey, bundleData) + + w.Header().Set("Content-Type", "application/x-git-bundle") + w.Header().Set("Content-Length", strconv.Itoa(len(bundleData))) + if _, err := w.Write(bundleData); err != nil { //nolint:gosec // bundleData is a git bundle generated from a trusted local mirror + logger.WarnContext(ctx, "Failed to write bundle response", "upstream", upstreamURL, "error", err) + } +} + +func (s *Strategy) serveSnapshotWithBundle(ctx context.Context, w http.ResponseWriter, reader io.ReadCloser, headers http.Header, repo *gitclone.Repository, upstreamURL string) error { + snapshotCommit := headers.Get("X-Cachew-Snapshot-Commit") + mirrorHead := s.getMirrorHead(ctx, repo) + + if snapshotCommit != "" && mirrorHead != "" && snapshotCommit != mirrorHead { + repoPath, err := gitclone.RepoPathFromURL(upstreamURL) + if err == nil { + bundleURL := fmt.Sprintf("/git/%s/snapshot.bundle?base=%s", repoPath, snapshotCommit) + w.Header().Set("X-Cachew-Bundle-Url", bundleURL) + } + + // Proactively generate and cache the bundle so any pod can serve it. + go func() { + bgCtx := context.WithoutCancel(ctx) + logger := logging.FromContext(bgCtx) + bundleData, err := s.createBundle(bgCtx, repo, snapshotCommit) + if err != nil { + logger.WarnContext(bgCtx, "Failed to pre-generate bundle", "upstream", upstreamURL, "error", err) + return + } + s.cacheBundleSync(bgCtx, bundleCacheKey(upstreamURL, snapshotCommit), bundleData) + }() + } + + w.Header().Set("Content-Type", "application/zstd") + _, err := io.Copy(w, reader) + return errors.Wrap(err, "stream snapshot") +} + +const bundleCacheTTL = 2 * time.Hour + +func (s *Strategy) cacheBundleAsync(ctx context.Context, key cache.Key, data []byte) { + go func() { + s.cacheBundleSync(context.WithoutCancel(ctx), key, data) + }() +} + +func (s *Strategy) cacheBundleSync(ctx context.Context, key cache.Key, data []byte) { + logger := logging.FromContext(ctx) + headers := http.Header{"Content-Type": {"application/x-git-bundle"}} + wc, err := s.cache.Create(ctx, key, headers, bundleCacheTTL) + if err != nil { + logger.WarnContext(ctx, "Failed to cache bundle", "error", err) + return + } + if _, err := wc.Write(data); err != nil { + logger.WarnContext(ctx, "Failed to write bundle to cache", "error", err) + _ = wc.Close() + return + } + if err := wc.Close(); err != nil { + logger.WarnContext(ctx, "Failed to close bundle cache writer", "error", err) + } +} + +func revParse(ctx context.Context, repoDir, ref string) (string, error) { + cmd := exec.CommandContext(ctx, "git", "-C", repoDir, "rev-parse", ref) // #nosec G204 G702 + output, err := cmd.Output() + if err != nil { + return "", errors.Wrapf(err, "git rev-parse %s", ref) + } + return strings.TrimSpace(string(output)), nil +} + +func (s *Strategy) getMirrorHead(ctx context.Context, repo *gitclone.Repository) string { + head, _ := revParse(ctx, repo.Path(), "HEAD") //nolint:errcheck // best-effort; empty string signals failure to callers + return head +} + +func (s *Strategy) createBundle(ctx context.Context, repo *gitclone.Repository, baseCommit string) ([]byte, error) { + // No read lock needed: git bundle create reads objects through git's own + // file-level locking, safe to run concurrently with fetches. + headRef := "HEAD" + if out, err := exec.CommandContext(ctx, "git", "-C", repo.Path(), "symbolic-ref", "HEAD").Output(); err == nil { // #nosec G204 G702 + headRef = strings.TrimSpace(string(out)) + } + + tmpFile, err := os.CreateTemp("", "cachew-bundle-*.bundle") + if err != nil { + return nil, errors.Wrap(err, "create bundle temp file") + } + bundlePath := tmpFile.Name() + defer os.Remove(bundlePath) //nolint:errcheck + if err := tmpFile.Close(); err != nil { + return nil, errors.Wrap(err, "close bundle temp file") + } + + cmd := exec.CommandContext(ctx, "git", "-C", repo.Path(), "bundle", "create", //nolint:gosec // baseCommit is a SHA string from rev-parse + bundlePath, headRef, "^"+baseCommit) + if output, err := cmd.CombinedOutput(); err != nil { + return nil, errors.Wrapf(err, "git bundle create: %s", string(output)) + } + + data, err := os.ReadFile(bundlePath) //nolint:gosec // bundlePath is a temp file we created + if err != nil { + return nil, errors.Wrap(err, "read bundle file") } + return data, nil } // serveSnapshotWithSpool handles snapshot cache misses using the spool pattern. @@ -303,8 +441,7 @@ func (s *Strategy) streamSnapshotDirect(w http.ResponseWriter, r *http.Request, w.Header().Set("Content-Type", "application/zstd") w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", filepath.Base(repoDir)+".tar.zst")) - excludePatterns := []string{"./.git/*.lock"} - if err := snapshot.StreamTo(ctx, w, repoDir, excludePatterns, s.config.ZstdThreads); err != nil { + if err := snapshot.StreamTo(ctx, w, repoDir, nil, s.config.ZstdThreads); err != nil { logger.ErrorContext(ctx, "Failed to stream snapshot to client", "upstream", upstreamURL, "error", err) } } @@ -369,8 +506,7 @@ func (s *Strategy) writeSnapshotSpool(w http.ResponseWriter, r *http.Request, re w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", filepath.Base(repoDir)+".tar.zst")) tw := NewSpoolTeeWriter(w, spool) - excludePatterns := []string{"./.git/*.lock"} - if err := snapshot.StreamTo(ctx, tw, repoDir, excludePatterns, s.config.ZstdThreads); err != nil { + if err := snapshot.StreamTo(ctx, tw, repoDir, nil, s.config.ZstdThreads); err != nil { logger.ErrorContext(ctx, "Failed to stream snapshot to client", "upstream", upstreamURL, "error", err) spool.MarkError(err) } else {