Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ Git causes a number of problems for us, but the most obvious are:
To solve this we apply two different strategies on the server:

1. Periodic full `.tar.zst` snapshots of the repository. These snapshots restore 4-5x faster than `git clone`.
Shallow snapshots are also supported via `?depth=N` (e.g., `/git/{host}/{repo}/snapshot.tar.zst?depth=100`),
which produces much smaller snapshots for large repositories. Shallow snapshots are generated on-demand
on first request and then refreshed periodically. Note: shallow snapshots do not include the working tree
(`--no-checkout`), so clients must run `git checkout` after extracting.
2. Passthrough caching of the packs returned by `POST /repo.git/git-upload-pack` to support incremental pulls.

On the client we redirect git to the proxy:
Expand Down
6 changes: 5 additions & 1 deletion internal/strategy/git/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,9 @@ import (
)

func (s *Strategy) GenerateAndUploadSnapshot(ctx context.Context, repo *gitclone.Repository) error {
return s.generateAndUploadSnapshot(ctx, repo)
return s.generateAndUploadSnapshot(ctx, repo, 0)
}

func (s *Strategy) GenerateAndUploadShallowSnapshot(ctx context.Context, repo *gitclone.Repository, depth int) error {
return s.generateAndUploadSnapshot(ctx, repo, depth)
}
23 changes: 12 additions & 11 deletions internal/strategy/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,18 @@ type Config struct {
}

type Strategy struct {
config Config
cache cache.Cache
cloneManager *gitclone.Manager
httpClient *http.Client
proxy *httputil.ReverseProxy
ctx context.Context
scheduler jobscheduler.Scheduler
spoolsMu sync.Mutex
spools map[string]*RepoSpools
tokenManager *githubapp.TokenManager
snapshotMu sync.Map // keyed by upstream URL, values are *sync.Mutex
config Config
cache cache.Cache
cloneManager *gitclone.Manager
httpClient *http.Client
proxy *httputil.ReverseProxy
ctx context.Context
scheduler jobscheduler.Scheduler
spoolsMu sync.Mutex
spools map[string]*RepoSpools
tokenManager *githubapp.TokenManager
snapshotMu sync.Map // keyed by upstream URL, values are *sync.Mutex
scheduledSnapshots sync.Map // tracks scheduled snapshot job keys to avoid duplicates
}

func New(
Expand Down
107 changes: 89 additions & 18 deletions internal/strategy/git/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package git

import (
"context"
"fmt"
"io"
"log/slog"
"net/http"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -20,12 +22,24 @@ import (
"github.com/block/cachew/internal/snapshot"
)

func snapshotDirForURL(mirrorRoot, upstreamURL string) (string, error) {
func snapshotDirForURL(mirrorRoot, upstreamURL string, depth int) (string, error) {
repoPath, err := gitclone.RepoPathFromURL(upstreamURL)
if err != nil {
return "", errors.Wrap(err, "resolve snapshot directory")
}
return filepath.Join(mirrorRoot, ".snapshots", repoPath), nil
dir := filepath.Join(mirrorRoot, ".snapshots", repoPath)
if depth > 0 {
dir += fmt.Sprintf("-depth-%d", depth)
}
return dir, nil
}

func snapshotCacheKey(upstreamURL string, depth int) cache.Key {
suffix := ".snapshot"
if depth > 0 {
suffix = fmt.Sprintf(".snapshot-depth-%d", depth)
}
return cache.NewKey(upstreamURL + suffix)
}

// remoteURLForSnapshot returns the URL to embed as remote.origin.url in snapshots.
Expand All @@ -42,34 +56,51 @@ func (s *Strategy) remoteURLForSnapshot(upstream string) string {
return s.config.ServerURL + "/git/" + repoPath
}

func (s *Strategy) generateAndUploadSnapshot(ctx context.Context, repo *gitclone.Repository) error {
func (s *Strategy) generateAndUploadSnapshot(ctx context.Context, repo *gitclone.Repository, depth int) error {
logger := logging.FromContext(ctx)
upstream := repo.UpstreamURL()

logger.InfoContext(ctx, "Snapshot generation started", slog.String("upstream", upstream))
logger.InfoContext(ctx, "Snapshot generation started",
slog.String("upstream", upstream),
slog.Int("depth", depth))

mu := s.snapshotMutexFor(upstream)
mu := s.snapshotMutexFor(upstream, depth)
mu.Lock()
defer mu.Unlock()

mirrorRoot := s.cloneManager.Config().MirrorRoot
snapshotDir, err := snapshotDirForURL(mirrorRoot, upstream)
snapshotDir, err := snapshotDirForURL(mirrorRoot, upstream, depth)
if err != nil {
return err
}

// Clean any previous snapshot working directory.
// #nosec G703 - snapshotDir is constructed from mirrorRoot + sanitised repo path
if err := os.RemoveAll(snapshotDir); err != nil {
return errors.Wrap(err, "remove previous snapshot dir")
}
// #nosec G703 - snapshotDir is constructed from mirrorRoot + sanitised repo path
if err := os.MkdirAll(filepath.Dir(snapshotDir), 0o750); err != nil {
return errors.Wrap(err, "create snapshot parent dir")
}

// Hold a read lock to exclude concurrent fetches while cloning.
if err := repo.WithReadLock(func() error {
// #nosec G204 - repo.Path() and snapshotDir are controlled by us
cmd := exec.CommandContext(ctx, "git", "clone", repo.Path(), snapshotDir)
// #nosec G204 - repo.Path(), snapshotDir, and depth are controlled by us
args := []string{"clone"}
source := repo.Path()
if depth > 0 {
args = append(args, "--depth", strconv.Itoa(depth), "--no-checkout")
// git ignores --depth for local path clones (uses hardlinks instead).
// Use file:// protocol to force pack transfer with depth support.
absPath, err := filepath.Abs(source)
if err != nil {
return errors.Wrap(err, "resolve absolute path for file:// URL")
}
source = "file://" + absPath
}
args = append(args, source, snapshotDir)
cmd := exec.CommandContext(ctx, "git", args...)
if output, err := cmd.CombinedOutput(); err != nil {
return errors.Wrapf(err, "git clone for snapshot: %s", string(output))
}
Expand All @@ -82,37 +113,60 @@ func (s *Strategy) generateAndUploadSnapshot(ctx context.Context, repo *gitclone
}
return nil
}); err != nil {
// #nosec G703 - snapshotDir is constructed from mirrorRoot + sanitised repo path
_ = os.RemoveAll(snapshotDir)
return errors.WithStack(err)
}

cacheKey := cache.NewKey(upstream + ".snapshot")
cacheKey := snapshotCacheKey(upstream, depth)
ttl := 7 * 24 * time.Hour
excludePatterns := []string{"*.lock"}

err = snapshot.Create(ctx, s.cache, cacheKey, snapshotDir, ttl, excludePatterns, s.config.ZstdThreads)

// Always clean up the snapshot working directory.
// #nosec G703 - snapshotDir is constructed from mirrorRoot + sanitised repo path
if rmErr := os.RemoveAll(snapshotDir); rmErr != nil {
logger.WarnContext(ctx, "Failed to clean up snapshot dir", slog.String("error", rmErr.Error()))
}
if err != nil {
logger.ErrorContext(ctx, "Snapshot generation failed", slog.String("upstream", upstream), slog.String("error", err.Error()))
logger.ErrorContext(ctx, "Snapshot generation failed",
slog.String("upstream", upstream),
slog.Int("depth", depth),
slog.String("error", err.Error()))
return errors.Wrap(err, "create snapshot")
}

logger.InfoContext(ctx, "Snapshot generation completed", slog.String("upstream", upstream))
logger.InfoContext(ctx, "Snapshot generation completed",
slog.String("upstream", upstream),
slog.Int("depth", depth))
return nil
}

func (s *Strategy) scheduleSnapshotJobs(repo *gitclone.Repository) {
s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "snapshot-periodic", s.config.SnapshotInterval, func(ctx context.Context) error {
return s.generateAndUploadSnapshot(ctx, repo)
s.scheduleSnapshotJobsWithDepth(repo, 0)
}

func (s *Strategy) scheduleSnapshotJobsWithDepth(repo *gitclone.Repository, depth int) {
jobID := "snapshot-periodic"
if depth > 0 {
jobID = fmt.Sprintf("snapshot-depth-%d-periodic", depth)
}
scheduleKey := repo.UpstreamURL() + ":" + jobID
if _, alreadyScheduled := s.scheduledSnapshots.LoadOrStore(scheduleKey, true); alreadyScheduled {
return
}
s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), jobID, s.config.SnapshotInterval, func(ctx context.Context) error {
return s.generateAndUploadSnapshot(ctx, repo, depth)
})
}

func (s *Strategy) snapshotMutexFor(upstreamURL string) *sync.Mutex {
mu, _ := s.snapshotMu.LoadOrStore(upstreamURL, &sync.Mutex{})
func (s *Strategy) snapshotMutexFor(upstreamURL string, depth int) *sync.Mutex {
key := upstreamURL
if depth > 0 {
key = fmt.Sprintf("%s-depth-%d", upstreamURL, depth)
}
mu, _ := s.snapshotMu.LoadOrStore(key, &sync.Mutex{})
return mu.(*sync.Mutex)
}

Expand All @@ -122,7 +176,18 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request,

repoPath := ExtractRepoPath(strings.TrimSuffix(pathValue, "/snapshot.tar.zst"))
upstreamURL := "https://" + host + "/" + repoPath
cacheKey := cache.NewKey(upstreamURL + ".snapshot")

var depth int
if d := r.URL.Query().Get("depth"); d != "" {
var err error
depth, err = strconv.Atoi(d)
if err != nil || depth < 0 {
http.Error(w, "invalid depth parameter", http.StatusBadRequest)
return
}
}

cacheKey := snapshotCacheKey(upstreamURL, depth)

reader, headers, err := s.cache.Open(ctx, cacheKey)
if errors.Is(err, os.ErrNotExist) {
Expand All @@ -141,18 +206,24 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request,
http.Error(w, "Repository unavailable", http.StatusServiceUnavailable)
return
}
if genErr := s.generateAndUploadSnapshot(ctx, repo); genErr != nil {
if genErr := s.generateAndUploadSnapshot(ctx, repo, depth); genErr != nil {
logger.ErrorContext(ctx, "On-demand snapshot generation failed",
slog.String("upstream", upstreamURL),
slog.String("error", genErr.Error()))
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
// Schedule periodic refresh so this snapshot stays fresh.
if s.config.SnapshotInterval > 0 {
s.scheduleSnapshotJobsWithDepth(repo, depth)
}
reader, headers, err = s.cache.Open(ctx, cacheKey)
}
if err != nil {
if errors.Is(err, os.ErrNotExist) {
logger.DebugContext(ctx, "snapshot not found in cache", slog.String("upstream", upstreamURL))
logger.DebugContext(ctx, "Snapshot not found in cache",
slog.String("upstream", upstreamURL),
slog.Int("depth", depth))
http.NotFound(w, r)
return
}
Expand Down
123 changes: 123 additions & 0 deletions internal/strategy/git/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,129 @@ func TestSnapshotGenerationViaLocalClone(t *testing.T) {
assert.True(t, os.IsNotExist(err))
}

func TestShallowSnapshotGeneration(t *testing.T) {
if _, err := exec.LookPath("git"); err != nil {
t.Skip("git not found in PATH")
}

_, ctx := logging.Configure(context.Background(), logging.Config{})
tmpDir := t.TempDir()
mirrorRoot := filepath.Join(tmpDir, "mirrors")
upstreamURL := "https://github.com/org/repo"

mirrorPath := filepath.Join(mirrorRoot, "github.com", "org", "repo")
createTestMirrorRepo(t, mirrorPath)

memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{})
assert.NoError(t, err)
mux := newTestMux()

cm := gitclone.NewManagerProvider(ctx, gitclone.Config{MirrorRoot: mirrorRoot}, nil)
s, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
assert.NoError(t, err)

manager, err := cm()
assert.NoError(t, err)
repo, err := manager.GetOrCreate(ctx, upstreamURL)
assert.NoError(t, err)

// Generate a shallow snapshot with depth=1.
err = s.GenerateAndUploadShallowSnapshot(ctx, repo, 1)
assert.NoError(t, err)

// Shallow snapshot uses a different cache key than full snapshot.
shallowKey := cache.NewKey(upstreamURL + ".snapshot-depth-1")
_, headers, err := memCache.Open(ctx, shallowKey)
assert.NoError(t, err)
assert.Equal(t, "application/zstd", headers.Get("Content-Type"))

// Full snapshot should not exist.
fullKey := cache.NewKey(upstreamURL + ".snapshot")
_, _, err = memCache.Open(ctx, fullKey)
assert.True(t, os.IsNotExist(err))

// Restore and verify.
restoreDir := filepath.Join(tmpDir, "restored-shallow")
err = snapshot.Restore(ctx, memCache, shallowKey, restoreDir, 0)
assert.NoError(t, err)

// Shallow snapshots use --no-checkout, so we need to checkout after restoring.
cmd := exec.Command("git", "-C", restoreDir, "checkout")
output, err := cmd.CombinedOutput()
assert.NoError(t, err, string(output))

data, err := os.ReadFile(filepath.Join(restoreDir, "hello.txt"))
assert.NoError(t, err)
assert.Equal(t, "hello\n", string(data))

// Verify it's actually shallow.
cmd = exec.Command("git", "-C", restoreDir, "rev-list", "--count", "HEAD")
output, err = cmd.CombinedOutput()
assert.NoError(t, err, string(output))
assert.Equal(t, "1\n", string(output))
}

func TestShallowSnapshotHTTPEndpoint(t *testing.T) {
_, ctx := logging.Configure(context.Background(), logging.Config{})
tmpDir := t.TempDir()

memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{})
assert.NoError(t, err)
mux := newTestMux()

cm := gitclone.NewManagerProvider(ctx, gitclone.Config{MirrorRoot: tmpDir}, nil)
_, err = git.New(ctx, git.Config{
SnapshotInterval: 24 * time.Hour,
}, newTestScheduler(ctx, t), memCache, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
assert.NoError(t, err)

// Create a shallow snapshot in the cache.
upstreamURL := "https://github.com/org/repo"
cacheKey := cache.NewKey(upstreamURL + ".snapshot-depth-100")
snapshotData := []byte("shallow snapshot data")

headers := make(map[string][]string)
headers["Content-Type"] = []string{"application/zstd"}
writer, err := memCache.Create(ctx, cacheKey, headers, 24*time.Hour)
assert.NoError(t, err)
_, err = writer.Write(snapshotData)
assert.NoError(t, err)
err = writer.Close()
assert.NoError(t, err)

handler := mux.handlers["GET /git/{host}/{path...}"]
assert.NotZero(t, handler)

// Request with ?depth=100 should return the shallow snapshot.
req := httptest.NewRequest(http.MethodGet, "/git/github.com/org/repo/snapshot.tar.zst?depth=100", nil)
req = req.WithContext(ctx)
req.SetPathValue("host", "github.com")
req.SetPathValue("path", "org/repo/snapshot.tar.zst")
w := httptest.NewRecorder()

handler.ServeHTTP(w, req)

assert.Equal(t, 200, w.Code)
assert.Equal(t, snapshotData, w.Body.Bytes())

// Request without depth should NOT return the shallow snapshot.
req = httptest.NewRequest(http.MethodGet, "/git/github.com/org/repo/snapshot.tar.zst", nil)
req = req.WithContext(ctx)
req.SetPathValue("host", "github.com")
req.SetPathValue("path", "org/repo/snapshot.tar.zst")
w = httptest.NewRecorder()

// Cancel context so on-demand generation doesn't block.
cancelCtx, cancel := context.WithCancel(ctx)
cancel()
req = req.WithContext(cancelCtx)

handler.ServeHTTP(w, req)

// Should fail (no full snapshot exists, and on-demand gen fails with cancelled context).
assert.NotEqual(t, 200, w.Code)
}

func TestSnapshotRemoteURLUsesServerURL(t *testing.T) {
if _, err := exec.LookPath("git"); err != nil {
t.Skip("git not found in PATH")
Expand Down