diff --git a/internal/cache/tiered.go b/internal/cache/tiered.go index f7e8773..f911a73 100644 --- a/internal/cache/tiered.go +++ b/internal/cache/tiered.go @@ -146,43 +146,101 @@ func (t Tiered) backfillReader(ctx context.Context, key Key, src io.ReadCloser, logger.WarnContext(ctx, "Tier backfill: failed to create writer, skipping", "error", err) return src } - return &backfillReadCloser{src: src, dst: w, ctx: ctx, cancel: cancel} + return newBackfillReadCloser(ctx, src, w, cancel) } -// backfillReadCloser tees reads from src into dst. If the full stream is +// backfillReadCloser tees reads from src into dst asynchronously. Chunks are +// sent to a background goroutine via a buffered channel so the Read path is +// never blocked by disk I/O (up to ~32 MB of buffer). If the full stream is // consumed and Close completes without error, dst is closed normally // (committing the cached entry). On any write failure the backfill is // abandoned but reads continue unaffected. type backfillReadCloser struct { - src io.ReadCloser - dst io.WriteCloser - ctx context.Context - cancel context.CancelFunc - failed bool + src io.ReadCloser + ch chan []byte + ctx context.Context + cancel context.CancelFunc + done chan error + closed bool + closeMu sync.Mutex +} + +const backfillBufSize = 128 // number of chunks buffered (~32 MB at 256 KB each) + +func newBackfillReadCloser(ctx context.Context, src io.ReadCloser, dst io.WriteCloser, cancel context.CancelFunc) *backfillReadCloser { + ch := make(chan []byte, backfillBufSize) + done := make(chan error, 1) + b := &backfillReadCloser{src: src, ch: ch, ctx: ctx, cancel: cancel, done: done} + go func() { + var err error + for chunk := range ch { + if err == nil { + if _, wErr := dst.Write(chunk); wErr != nil { + logging.FromContext(ctx).WarnContext(ctx, "Tier backfill: write failed, abandoning", "error", wErr) + err = wErr + cancel() + // Keep draining the channel so the producer isn't blocked. + } + } + } + closeErr := dst.Close() + switch { + case err != nil: + done <- err + case closeErr != nil: + cancel() + done <- closeErr + default: + done <- nil + } + }() + return b +} + +func (b *backfillReadCloser) closeChan() { + b.closeMu.Lock() + defer b.closeMu.Unlock() + if !b.closed { + b.closed = true + close(b.ch) + } } func (b *backfillReadCloser) Read(p []byte) (int, error) { n, err := b.src.Read(p) - if n > 0 && !b.failed { - if _, wErr := b.dst.Write(p[:n]); wErr != nil { - logging.FromContext(b.ctx).WarnContext(b.ctx, "Tier backfill: write failed, abandoning", "error", wErr) - b.failed = true - b.cancel() + if n > 0 { + b.closeMu.Lock() + if !b.closed { + // Copy the data — p is reused by the caller. + chunk := make([]byte, n) + copy(chunk, p[:n]) + select { + case b.ch <- chunk: + default: + // Buffer full — abandon backfill. + b.closed = true + close(b.ch) + b.cancel() + } } + b.closeMu.Unlock() + } + if err != nil { + b.closeChan() } return n, err //nolint:wrapcheck // must return unwrapped io.EOF per io.Reader contract } func (b *backfillReadCloser) Close() error { srcErr := b.src.Close() - if b.failed || srcErr != nil { + b.closeChan() + // Wait for the background writer to finish. + bgErr := <-b.done + if srcErr != nil || bgErr != nil { b.cancel() - _ = b.dst.Close() return errors.WithStack(srcErr) } - dstErr := b.dst.Close() - b.cancel() - return errors.WithStack(dstErr) + return nil } func (t Tiered) String() string { diff --git a/internal/strategy/git/git.go b/internal/strategy/git/git.go index c17372f..80776b6 100644 --- a/internal/strategy/git/git.go +++ b/internal/strategy/git/git.go @@ -44,18 +44,20 @@ type Config struct { } type Strategy struct { - config Config - cache cache.Cache - cloneManager *gitclone.Manager - httpClient *http.Client - proxy *httputil.ReverseProxy - ctx context.Context - scheduler jobscheduler.Scheduler - spoolsMu sync.Mutex - spools map[string]*RepoSpools - tokenManager *githubapp.TokenManager - snapshotMu sync.Map // keyed by upstream URL, values are *sync.Mutex - snapshotSpools sync.Map // keyed by upstream URL, values are *snapshotSpoolEntry + config Config + cache cache.Cache + cloneManager *gitclone.Manager + httpClient *http.Client + proxy *httputil.ReverseProxy + ctx context.Context + scheduler jobscheduler.Scheduler + spoolsMu sync.Mutex + spools map[string]*RepoSpools + tokenManager *githubapp.TokenManager + snapshotMu sync.Map // keyed by upstream URL, values are *sync.Mutex + snapshotSpools sync.Map // keyed by upstream URL, values are *snapshotSpoolEntry + coldSnapshotMu sync.Map // keyed by upstream URL, values are *coldSnapshotEntry + deferredRestoreOnce sync.Map // keyed by upstream URL, ensures at most one deferred restore per repo } func New( @@ -522,6 +524,7 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) { if err != nil { logger.ErrorContext(ctx, "Clone failed", "upstream", upstream, "error", err) + repo.ResetToEmpty() return } diff --git a/internal/strategy/git/snapshot.go b/internal/strategy/git/snapshot.go index 7d8acf0..3e3013b 100644 --- a/internal/strategy/git/snapshot.go +++ b/internal/strategy/git/snapshot.go @@ -191,20 +191,72 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, repoPath := ExtractRepoPath(strings.TrimSuffix(pathValue, "/snapshot.tar.zst")) upstreamURL := "https://" + host + "/" + repoPath - // Ensure the local mirror is ready before considering any cached snapshot. repo, repoErr := s.cloneManager.GetOrCreate(ctx, upstreamURL) if repoErr != nil { logger.ErrorContext(ctx, "Failed to get or create clone", "upstream", upstreamURL, "error", repoErr) http.Error(w, "Internal server error", http.StatusInternalServerError) return } + + cacheKey := snapshotCacheKey(upstreamURL) + + // On cold start the local mirror may not be ready yet. Check the S3 cache + // first so we can stream a cached snapshot to the client immediately while + // the mirror restores in the background. This avoids blocking the client + // behind the full S3-download → extract → git-fetch pipeline. + if repo.State() != gitclone.StateReady { + entry := &coldSnapshotEntry{done: make(chan struct{})} + if existing, loaded := s.coldSnapshotMu.LoadOrStore(upstreamURL, entry); loaded { + winner := existing.(*coldSnapshotEntry) + <-winner.done + reader, _, openErr := s.cache.Open(ctx, cacheKey) + if openErr == nil && reader != nil { + winner.serving.Add(1) + defer func() { + _ = reader.Close() + winner.serving.Done() + }() + logger.InfoContext(ctx, "Serving locally cached snapshot after waiting for in-flight fill", "upstream", upstreamURL) + w.Header().Set("Content-Type", "application/zstd") + if _, err := io.Copy(w, reader); err != nil { + logger.WarnContext(ctx, "Failed to stream locally cached snapshot", "upstream", upstreamURL, "error", err) + } + return + } + } else { + defer func() { + close(entry.done) + s.coldSnapshotMu.Delete(upstreamURL) + }() + reader, _, openErr := s.cache.Open(ctx, cacheKey) + if openErr == nil && reader != nil { + logger.InfoContext(ctx, "Serving cached snapshot while mirror warms up", "upstream", upstreamURL) + w.Header().Set("Content-Type", "application/zstd") + if _, err := io.Copy(w, reader); err != nil { + logger.WarnContext(ctx, "Failed to stream cached snapshot", "upstream", upstreamURL, "error", err) + } + _ = reader.Close() + // Schedule a deferred mirror restore so the mirror eventually + // becomes hot and cachew can generate fresh bundle deltas. + // Without this, repos that only ever serve cached snapshots + // would never restore their mirror. + s.scheduleDeferredMirrorRestore(ctx, repo, entry) + return + } + if reader != nil { + _ = reader.Close() + } + } + } + + // Either the mirror is already ready or no cached snapshot exists — fall + // through to the original path which blocks until the mirror is available. if cloneErr := s.ensureCloneReady(ctx, repo); cloneErr != nil { logger.ErrorContext(ctx, "Clone unavailable for snapshot", "upstream", upstreamURL, "error", cloneErr) http.Error(w, "Repository unavailable", http.StatusServiceUnavailable) return } s.maybeBackgroundFetch(repo) - cacheKey := snapshotCacheKey(upstreamURL) reader, headers, err := s.cache.Open(ctx, cacheKey) if err != nil && !errors.Is(err, os.ErrNotExist) { @@ -283,6 +335,12 @@ func (s *Strategy) serveSnapshotWithBundle(ctx context.Context, w http.ResponseW snapshotCommit := headers.Get("X-Cachew-Snapshot-Commit") mirrorHead := s.getMirrorHead(ctx, repo) + // Forward the snapshot commit to the client so it knows whether the + // snapshot is fresh (no bundle URL = already at HEAD, skip freshen). + if snapshotCommit != "" { + w.Header().Set("X-Cachew-Snapshot-Commit", snapshotCommit) + } + if snapshotCommit != "" && mirrorHead != "" && snapshotCommit != mirrorHead { repoPath, err := gitclone.RepoPathFromURL(upstreamURL) if err == nil { @@ -539,6 +597,69 @@ func (s *Strategy) writeSnapshotSpool(w http.ResponseWriter, r *http.Request, re } } +// scheduleDeferredMirrorRestore schedules a one-shot background mirror restore +// for a repo that was served from a cached S3 snapshot on cold start. Without +// this, repos that only serve cached snapshots would never warm their mirror, +// preventing cachew from generating fresh bundle deltas. +// +// Submitted to the scheduler immediately after the first S3 snapshot stream +// completes. By this point the client snapshot is backfilled to local disk, so +// subsequent snapshot serves read from NVMe and don't compete for S3 bandwidth. +// The scheduler's concurrency limit naturally throttles the restore against +// other background work. Only one restore is scheduled per upstream URL. +func (s *Strategy) scheduleDeferredMirrorRestore(ctx context.Context, repo *gitclone.Repository, coldEntry *coldSnapshotEntry) { + upstream := repo.UpstreamURL() + if _, loaded := s.deferredRestoreOnce.LoadOrStore(upstream, true); loaded { + return + } + + logger := logging.FromContext(ctx) + logger.InfoContext(ctx, "Scheduling deferred mirror restore", "upstream", upstream) + + s.scheduler.Submit(upstream, "deferred-mirror-restore", func(ctx context.Context) error { + logger := logging.FromContext(ctx) + if repo.State() == gitclone.StateReady { + logger.InfoContext(ctx, "Mirror already ready, skipping deferred restore", "upstream", upstream) + return nil + } + if !repo.TryStartCloning() { + logger.InfoContext(ctx, "Mirror restore already in progress, skipping", "upstream", upstream) + return nil + } + // Wait for all in-flight cold snapshot serves to finish so the + // restore's disk writes don't compete with local cache reads. + coldEntry.serving.Wait() + + logger.InfoContext(ctx, "Starting deferred mirror restore", "upstream", upstream) + + if err := s.tryRestoreSnapshot(ctx, repo); err != nil { + logger.WarnContext(ctx, "Deferred mirror snapshot restore failed", "upstream", upstream, "error", err) + repo.ResetToEmpty() + return nil + } + + if err := repo.FetchLenient(ctx, gitclone.CloneTimeout); err != nil { + logger.WarnContext(ctx, "Deferred mirror post-restore fetch failed", "upstream", upstream, "error", err) + repo.ResetToEmpty() + if rmErr := os.RemoveAll(repo.Path()); rmErr != nil { + logger.WarnContext(ctx, "Failed to remove mirror after failed fetch", "upstream", upstream, "error", rmErr) + } + return nil + } + + repo.MarkReady() + logger.InfoContext(ctx, "Deferred mirror restore completed", "upstream", upstream) + + if s.config.SnapshotInterval > 0 { + s.scheduleSnapshotJobs(repo) + } + if s.config.RepackInterval > 0 { + s.scheduleRepackJobs(repo) + } + return nil + }) +} + // snapshotSpoolEntry holds a spool and a ready channel used to coordinate // writer election. The first goroutine stores the entry via LoadOrStore and // becomes the writer. It closes ready once the spool is created (or on @@ -548,6 +669,11 @@ type snapshotSpoolEntry struct { ready chan struct{} } +type coldSnapshotEntry struct { + done chan struct{} + serving sync.WaitGroup // tracks all in-flight snapshot serves (winner + followers) +} + func snapshotSpoolDirForURL(mirrorRoot, upstreamURL string) (string, error) { repoPath, err := gitclone.RepoPathFromURL(upstreamURL) if err != nil { diff --git a/internal/strategy/git/snapshot_test.go b/internal/strategy/git/snapshot_test.go index 405ba64..98f607f 100644 --- a/internal/strategy/git/snapshot_test.go +++ b/internal/strategy/git/snapshot_test.go @@ -450,6 +450,238 @@ func createTestMirrorRepoWithBranches(t *testing.T, mirrorPath string, branches assert.NoError(t, err, string(output)) } +func TestSnapshotServesFreshSnapshotWithCommitHeader(t *testing.T) { + if _, err := exec.LookPath("git"); err != nil { + t.Skip("git not found in PATH") + } + + _, ctx := logging.Configure(context.Background(), logging.Config{}) + tmpDir := t.TempDir() + mirrorRoot := filepath.Join(tmpDir, "mirrors") + upstreamURL := "https://github.com/org/repo" + mirrorPath := filepath.Join(mirrorRoot, "github.com", "org", "repo") + createTestMirrorRepo(t, mirrorPath) + + memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{MaxTTL: time.Hour}) + assert.NoError(t, err) + mux := newTestMux() + + cm := gitclone.NewManagerProvider(ctx, gitclone.Config{MirrorRoot: mirrorRoot}, nil) + s, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil + assert.NoError(t, err) + + manager, err := cm() + assert.NoError(t, err) + repo, err := manager.GetOrCreate(ctx, upstreamURL) + assert.NoError(t, err) + + // Generate a snapshot — it will embed the mirror's HEAD as X-Cachew-Snapshot-Commit. + err = s.GenerateAndUploadSnapshot(ctx, repo) + assert.NoError(t, err) + + // Serve the snapshot via HTTP. Since the mirror's HEAD matches the snapshot + // commit, no bundle URL should be set, but X-Cachew-Snapshot-Commit must + // be forwarded so the client knows the snapshot is fresh. + handler := mux.handlers["GET /git/{host}/{path...}"] + assert.NotZero(t, handler) + + req := httptest.NewRequest(http.MethodGet, "/git/github.com/org/repo/snapshot.tar.zst", nil) + req = req.WithContext(ctx) + req.SetPathValue("host", "github.com") + req.SetPathValue("path", "org/repo/snapshot.tar.zst") + w := httptest.NewRecorder() + + handler.ServeHTTP(w, req) + + assert.Equal(t, 200, w.Code) + assert.NotEqual(t, "", w.Header().Get("X-Cachew-Snapshot-Commit"), + "X-Cachew-Snapshot-Commit should be set so client knows snapshot is fresh") + assert.Equal(t, "", w.Header().Get("X-Cachew-Bundle-Url"), + "no bundle URL when snapshot is already at mirror HEAD") +} + +func TestSnapshotServesBundleURLWhenStale(t *testing.T) { + if _, err := exec.LookPath("git"); err != nil { + t.Skip("git not found in PATH") + } + + _, ctx := logging.Configure(context.Background(), logging.Config{}) + tmpDir := t.TempDir() + mirrorRoot := filepath.Join(tmpDir, "mirrors") + upstreamURL := "https://github.com/org/repo" + mirrorPath := filepath.Join(mirrorRoot, "github.com", "org", "repo") + createTestMirrorRepo(t, mirrorPath) + + memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{MaxTTL: time.Hour}) + assert.NoError(t, err) + mux := newTestMux() + + cm := gitclone.NewManagerProvider(ctx, gitclone.Config{MirrorRoot: mirrorRoot}, nil) + s, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil + assert.NoError(t, err) + + manager, err := cm() + assert.NoError(t, err) + repo, err := manager.GetOrCreate(ctx, upstreamURL) + assert.NoError(t, err) + + // Generate a snapshot at the current HEAD. + err = s.GenerateAndUploadSnapshot(ctx, repo) + assert.NoError(t, err) + + // Add a new commit to the mirror so the snapshot becomes stale. + tmpWork := t.TempDir() + for _, args := range [][]string{ + {"clone", mirrorPath, tmpWork}, + {"-C", tmpWork, "config", "user.email", "test@test.com"}, + {"-C", tmpWork, "config", "user.name", "Test"}, + } { + cmd := exec.Command("git", args...) + output, err := cmd.CombinedOutput() + assert.NoError(t, err, string(output)) + } + assert.NoError(t, os.WriteFile(filepath.Join(tmpWork, "new.txt"), []byte("new\n"), 0o644)) + for _, args := range [][]string{ + {"-C", tmpWork, "add", "."}, + {"-C", tmpWork, "commit", "-m", "new commit"}, + {"-C", tmpWork, "push", "origin", "HEAD"}, + } { + cmd := exec.Command("git", args...) + output, err := cmd.CombinedOutput() + assert.NoError(t, err, string(output)) + } + + handler := mux.handlers["GET /git/{host}/{path...}"] + req := httptest.NewRequest(http.MethodGet, "/git/github.com/org/repo/snapshot.tar.zst", nil) + req = req.WithContext(ctx) + req.SetPathValue("host", "github.com") + req.SetPathValue("path", "org/repo/snapshot.tar.zst") + w := httptest.NewRecorder() + + handler.ServeHTTP(w, req) + + assert.Equal(t, 200, w.Code) + assert.NotEqual(t, "", w.Header().Get("X-Cachew-Snapshot-Commit"), + "X-Cachew-Snapshot-Commit should be set") + assert.NotEqual(t, "", w.Header().Get("X-Cachew-Bundle-Url"), + "bundle URL should be set when snapshot is stale") + assert.Contains(t, w.Header().Get("X-Cachew-Bundle-Url"), "snapshot.bundle?base=", + "bundle URL should include base parameter") + + // Allow background bundle generation goroutine to finish. + time.Sleep(2 * time.Second) +} + +func TestColdSnapshotServesWithoutCommitHeader(t *testing.T) { + if _, err := exec.LookPath("git"); err != nil { + t.Skip("git not found in PATH") + } + + _, ctx := logging.Configure(context.Background(), logging.Config{}) + tmpDir := t.TempDir() + mirrorRoot := filepath.Join(tmpDir, "mirrors") + + memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{MaxTTL: time.Hour}) + assert.NoError(t, err) + mux := newTestMux() + + cm := gitclone.NewManagerProvider(ctx, gitclone.Config{MirrorRoot: mirrorRoot}, nil) + _, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil + assert.NoError(t, err) + + // Pre-populate the cache with a fake snapshot that has NO X-Cachew-Snapshot-Commit + // header, simulating a cold-start scenario where the snapshot was uploaded to S3 + // without mirror metadata. + upstreamURL := "https://github.com/org/coldrepo" + cacheKey := cache.NewKey(upstreamURL + ".snapshot") + headers := make(map[string][]string) + headers["Content-Type"] = []string{"application/zstd"} + headers["Last-Modified"] = []string{time.Now().Add(time.Hour).UTC().Format(http.TimeFormat)} + writer, err := memCache.Create(ctx, cacheKey, headers, 24*time.Hour) + assert.NoError(t, err) + _, err = writer.Write([]byte("fake cold snapshot")) + assert.NoError(t, err) + assert.NoError(t, writer.Close()) + + handler := mux.handlers["GET /git/{host}/{path...}"] + + // Use a cancelled context so ensureCloneReady fails quickly and the cold + // path returns the cached snapshot. + cancelCtx, cancel := context.WithCancel(ctx) + + req := httptest.NewRequest(http.MethodGet, "/git/github.com/org/coldrepo/snapshot.tar.zst", nil) + req = req.WithContext(cancelCtx) + req.SetPathValue("host", "github.com") + req.SetPathValue("path", "org/coldrepo/snapshot.tar.zst") + w := httptest.NewRecorder() + + handler.ServeHTTP(w, req) + cancel() + + // Cold path should serve the snapshot but without X-Cachew-Snapshot-Commit, + // signaling to the client that it needs to freshen. + assert.Equal(t, 200, w.Code) + assert.Equal(t, "", w.Header().Get("X-Cachew-Snapshot-Commit"), + "cold path should not set X-Cachew-Snapshot-Commit") + assert.Equal(t, "", w.Header().Get("X-Cachew-Bundle-Url"), + "cold path should not set bundle URL") +} + +func TestDeferredRestoreOnlyScheduledOnce(t *testing.T) { + if _, err := exec.LookPath("git"); err != nil { + t.Skip("git not found in PATH") + } + + _, ctx := logging.Configure(context.Background(), logging.Config{}) + tmpDir := t.TempDir() + mirrorRoot := filepath.Join(tmpDir, "mirrors") + + memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{MaxTTL: time.Hour}) + assert.NoError(t, err) + mux := newTestMux() + + cm := gitclone.NewManagerProvider(ctx, gitclone.Config{MirrorRoot: mirrorRoot}, nil) + _, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil + assert.NoError(t, err) + + // Pre-populate cache with a fake snapshot. + upstreamURL := "https://github.com/org/deferred-test" + cacheKey := cache.NewKey(upstreamURL + ".snapshot") + headers := make(map[string][]string) + headers["Content-Type"] = []string{"application/zstd"} + headers["Last-Modified"] = []string{time.Now().Add(time.Hour).UTC().Format(http.TimeFormat)} + writer, err := memCache.Create(ctx, cacheKey, headers, 24*time.Hour) + assert.NoError(t, err) + _, err = writer.Write([]byte("fake snapshot")) + assert.NoError(t, err) + assert.NoError(t, writer.Close()) + + handler := mux.handlers["GET /git/{host}/{path...}"] + + // First request: cold path serves snapshot and schedules deferred restore. + req := httptest.NewRequest(http.MethodGet, "/git/github.com/org/deferred-test/snapshot.tar.zst", nil) + req = req.WithContext(ctx) + req.SetPathValue("host", "github.com") + req.SetPathValue("path", "org/deferred-test/snapshot.tar.zst") + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + assert.Equal(t, 200, w.Code) + + // Second request: should not panic or fail — deferred restore should only + // be scheduled once (idempotent via deferredRestoreOnce). + req2 := httptest.NewRequest(http.MethodGet, "/git/github.com/org/deferred-test/snapshot.tar.zst", nil) + req2 = req2.WithContext(ctx) + req2.SetPathValue("host", "github.com") + req2.SetPathValue("path", "org/deferred-test/snapshot.tar.zst") + w2 := httptest.NewRecorder() + handler.ServeHTTP(w2, req2) + // Second request may be 200 (from local cache) or 503 (clone not ready). + // The key assertion is that it doesn't panic from double-scheduling. + + // Allow background goroutines to settle. + time.Sleep(time.Second) +} + func TestSnapshotRemoteURLUsesServerURL(t *testing.T) { if _, err := exec.LookPath("git"); err != nil { t.Skip("git not found in PATH")