Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 75 additions & 17 deletions internal/cache/tiered.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,43 +146,101 @@ func (t Tiered) backfillReader(ctx context.Context, key Key, src io.ReadCloser,
logger.WarnContext(ctx, "Tier backfill: failed to create writer, skipping", "error", err)
return src
}
return &backfillReadCloser{src: src, dst: w, ctx: ctx, cancel: cancel}
return newBackfillReadCloser(ctx, src, w, cancel)
}

// backfillReadCloser tees reads from src into dst. If the full stream is
// backfillReadCloser tees reads from src into dst asynchronously. Chunks are
// sent to a background goroutine via a buffered channel so the Read path is
// never blocked by disk I/O (up to ~32 MB of buffer). If the full stream is
// consumed and Close completes without error, dst is closed normally
// (committing the cached entry). On any write failure the backfill is
// abandoned but reads continue unaffected.
type backfillReadCloser struct {
src io.ReadCloser
dst io.WriteCloser
ctx context.Context
cancel context.CancelFunc
failed bool
src io.ReadCloser
ch chan []byte
ctx context.Context
cancel context.CancelFunc
done chan error
closed bool
closeMu sync.Mutex
}

const backfillBufSize = 128 // number of chunks buffered (~32 MB at 256 KB each)

func newBackfillReadCloser(ctx context.Context, src io.ReadCloser, dst io.WriteCloser, cancel context.CancelFunc) *backfillReadCloser {
ch := make(chan []byte, backfillBufSize)
done := make(chan error, 1)
b := &backfillReadCloser{src: src, ch: ch, ctx: ctx, cancel: cancel, done: done}
go func() {
var err error
for chunk := range ch {
if err == nil {
if _, wErr := dst.Write(chunk); wErr != nil {
logging.FromContext(ctx).WarnContext(ctx, "Tier backfill: write failed, abandoning", "error", wErr)
err = wErr
cancel()
// Keep draining the channel so the producer isn't blocked.
}
}
}
closeErr := dst.Close()
switch {
case err != nil:
done <- err
case closeErr != nil:
cancel()
done <- closeErr
default:
done <- nil
}
}()
return b
}

func (b *backfillReadCloser) closeChan() {
b.closeMu.Lock()
defer b.closeMu.Unlock()
if !b.closed {
b.closed = true
close(b.ch)
}
}

func (b *backfillReadCloser) Read(p []byte) (int, error) {
n, err := b.src.Read(p)
if n > 0 && !b.failed {
if _, wErr := b.dst.Write(p[:n]); wErr != nil {
logging.FromContext(b.ctx).WarnContext(b.ctx, "Tier backfill: write failed, abandoning", "error", wErr)
b.failed = true
b.cancel()
if n > 0 {
b.closeMu.Lock()
if !b.closed {
// Copy the data — p is reused by the caller.
chunk := make([]byte, n)
copy(chunk, p[:n])
select {
case b.ch <- chunk:
default:
// Buffer full — abandon backfill.
b.closed = true
close(b.ch)
b.cancel()
}
}
b.closeMu.Unlock()
}
if err != nil {
b.closeChan()
}
return n, err //nolint:wrapcheck // must return unwrapped io.EOF per io.Reader contract
}

func (b *backfillReadCloser) Close() error {
srcErr := b.src.Close()
if b.failed || srcErr != nil {
b.closeChan()
// Wait for the background writer to finish.
bgErr := <-b.done
if srcErr != nil || bgErr != nil {
b.cancel()
_ = b.dst.Close()
return errors.WithStack(srcErr)
Comment on lines +239 to 241
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Return background close errors from backfill reader

When the async backfill goroutine reports an error (bgErr != nil) but src.Close() succeeds, Close() still returns errors.WithStack(srcErr), which is nil. This hides destination write/close failures that were previously surfaced, making backfill failures silent and preventing callers from observing that the cache write failed.

Useful? React with 👍 / 👎.

}
dstErr := b.dst.Close()
b.cancel()
return errors.WithStack(dstErr)
return nil
}

func (t Tiered) String() string {
Expand Down
27 changes: 15 additions & 12 deletions internal/strategy/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,20 @@ type Config struct {
}

type Strategy struct {
config Config
cache cache.Cache
cloneManager *gitclone.Manager
httpClient *http.Client
proxy *httputil.ReverseProxy
ctx context.Context
scheduler jobscheduler.Scheduler
spoolsMu sync.Mutex
spools map[string]*RepoSpools
tokenManager *githubapp.TokenManager
snapshotMu sync.Map // keyed by upstream URL, values are *sync.Mutex
snapshotSpools sync.Map // keyed by upstream URL, values are *snapshotSpoolEntry
config Config
cache cache.Cache
cloneManager *gitclone.Manager
httpClient *http.Client
proxy *httputil.ReverseProxy
ctx context.Context
scheduler jobscheduler.Scheduler
spoolsMu sync.Mutex
spools map[string]*RepoSpools
tokenManager *githubapp.TokenManager
snapshotMu sync.Map // keyed by upstream URL, values are *sync.Mutex
snapshotSpools sync.Map // keyed by upstream URL, values are *snapshotSpoolEntry
coldSnapshotMu sync.Map // keyed by upstream URL, values are *coldSnapshotEntry
deferredRestoreOnce sync.Map // keyed by upstream URL, ensures at most one deferred restore per repo
}

func New(
Expand Down Expand Up @@ -522,6 +524,7 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) {

if err != nil {
logger.ErrorContext(ctx, "Clone failed", "upstream", upstream, "error", err)
repo.ResetToEmpty()
return
}

Expand Down
130 changes: 128 additions & 2 deletions internal/strategy/git/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,20 +191,72 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request,
repoPath := ExtractRepoPath(strings.TrimSuffix(pathValue, "/snapshot.tar.zst"))
upstreamURL := "https://" + host + "/" + repoPath

// Ensure the local mirror is ready before considering any cached snapshot.
repo, repoErr := s.cloneManager.GetOrCreate(ctx, upstreamURL)
if repoErr != nil {
logger.ErrorContext(ctx, "Failed to get or create clone", "upstream", upstreamURL, "error", repoErr)
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}

cacheKey := snapshotCacheKey(upstreamURL)

// On cold start the local mirror may not be ready yet. Check the S3 cache
// first so we can stream a cached snapshot to the client immediately while
// the mirror restores in the background. This avoids blocking the client
// behind the full S3-download → extract → git-fetch pipeline.
if repo.State() != gitclone.StateReady {
entry := &coldSnapshotEntry{done: make(chan struct{})}
if existing, loaded := s.coldSnapshotMu.LoadOrStore(upstreamURL, entry); loaded {
winner := existing.(*coldSnapshotEntry)
<-winner.done
reader, _, openErr := s.cache.Open(ctx, cacheKey)
if openErr == nil && reader != nil {
winner.serving.Add(1)
defer func() {
_ = reader.Close()
winner.serving.Done()
}()
logger.InfoContext(ctx, "Serving locally cached snapshot after waiting for in-flight fill", "upstream", upstreamURL)
w.Header().Set("Content-Type", "application/zstd")
if _, err := io.Copy(w, reader); err != nil {
logger.WarnContext(ctx, "Failed to stream locally cached snapshot", "upstream", upstreamURL, "error", err)
}
return
}
} else {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried a few iterations of using the spool system but they were all quite slow. This version just lets the first requestor pull the snapshot from the backend and any requests that come in while that happens block until that one finishes and buffers it to disk. After that the other requests resume from the cached disk copy. Without this I could pretty easily bottleneck simultaneous clients by trying to download the snapshot from the s3 backend at the same time.

defer func() {
close(entry.done)
s.coldSnapshotMu.Delete(upstreamURL)
}()
reader, _, openErr := s.cache.Open(ctx, cacheKey)
if openErr == nil && reader != nil {
logger.InfoContext(ctx, "Serving cached snapshot while mirror warms up", "upstream", upstreamURL)
w.Header().Set("Content-Type", "application/zstd")
if _, err := io.Copy(w, reader); err != nil {
logger.WarnContext(ctx, "Failed to stream cached snapshot", "upstream", upstreamURL, "error", err)
}
_ = reader.Close()
// Schedule a deferred mirror restore so the mirror eventually
// becomes hot and cachew can generate fresh bundle deltas.
// Without this, repos that only ever serve cached snapshots
// would never restore their mirror.
s.scheduleDeferredMirrorRestore(ctx, repo, entry)
return
}
if reader != nil {
_ = reader.Close()
}
}
}

// Either the mirror is already ready or no cached snapshot exists — fall
// through to the original path which blocks until the mirror is available.
if cloneErr := s.ensureCloneReady(ctx, repo); cloneErr != nil {
logger.ErrorContext(ctx, "Clone unavailable for snapshot", "upstream", upstreamURL, "error", cloneErr)
http.Error(w, "Repository unavailable", http.StatusServiceUnavailable)
return
}
s.maybeBackgroundFetch(repo)
cacheKey := snapshotCacheKey(upstreamURL)

reader, headers, err := s.cache.Open(ctx, cacheKey)
if err != nil && !errors.Is(err, os.ErrNotExist) {
Expand Down Expand Up @@ -283,6 +335,12 @@ func (s *Strategy) serveSnapshotWithBundle(ctx context.Context, w http.ResponseW
snapshotCommit := headers.Get("X-Cachew-Snapshot-Commit")
mirrorHead := s.getMirrorHead(ctx, repo)

// Forward the snapshot commit to the client so it knows whether the
// snapshot is fresh (no bundle URL = already at HEAD, skip freshen).
if snapshotCommit != "" {
w.Header().Set("X-Cachew-Snapshot-Commit", snapshotCommit)
}

if snapshotCommit != "" && mirrorHead != "" && snapshotCommit != mirrorHead {
repoPath, err := gitclone.RepoPathFromURL(upstreamURL)
if err == nil {
Expand Down Expand Up @@ -539,6 +597,69 @@ func (s *Strategy) writeSnapshotSpool(w http.ResponseWriter, r *http.Request, re
}
}

// scheduleDeferredMirrorRestore schedules a one-shot background mirror restore
// for a repo that was served from a cached S3 snapshot on cold start. Without
// this, repos that only serve cached snapshots would never warm their mirror,
// preventing cachew from generating fresh bundle deltas.
//
// Submitted to the scheduler immediately after the first S3 snapshot stream
// completes. By this point the client snapshot is backfilled to local disk, so
// subsequent snapshot serves read from NVMe and don't compete for S3 bandwidth.
// The scheduler's concurrency limit naturally throttles the restore against
// other background work. Only one restore is scheduled per upstream URL.
func (s *Strategy) scheduleDeferredMirrorRestore(ctx context.Context, repo *gitclone.Repository, coldEntry *coldSnapshotEntry) {
upstream := repo.UpstreamURL()
if _, loaded := s.deferredRestoreOnce.LoadOrStore(upstream, true); loaded {
return
Comment on lines +612 to +613
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Retry deferred mirror restore after failed attempt

scheduleDeferredMirrorRestore permanently sets deferredRestoreOnce on the first cold snapshot serve, but the key is never cleared when the deferred restore fails (tryRestoreSnapshot or FetchLenient paths reset the repo to StateEmpty and return). For repos that are mostly accessed via /snapshot.tar.zst, a single transient failure means every later cold request will keep serving cached snapshots while skipping restore forever, so the mirror never becomes ready again on that process.

Useful? React with 👍 / 👎.

}

logger := logging.FromContext(ctx)
logger.InfoContext(ctx, "Scheduling deferred mirror restore", "upstream", upstream)

s.scheduler.Submit(upstream, "deferred-mirror-restore", func(ctx context.Context) error {
logger := logging.FromContext(ctx)
if repo.State() == gitclone.StateReady {
logger.InfoContext(ctx, "Mirror already ready, skipping deferred restore", "upstream", upstream)
return nil
}
if !repo.TryStartCloning() {
logger.InfoContext(ctx, "Mirror restore already in progress, skipping", "upstream", upstream)
return nil
}
// Wait for all in-flight cold snapshot serves to finish so the
// restore's disk writes don't compete with local cache reads.
coldEntry.serving.Wait()

logger.InfoContext(ctx, "Starting deferred mirror restore", "upstream", upstream)

if err := s.tryRestoreSnapshot(ctx, repo); err != nil {
logger.WarnContext(ctx, "Deferred mirror snapshot restore failed", "upstream", upstream, "error", err)
repo.ResetToEmpty()
return nil
}

if err := repo.FetchLenient(ctx, gitclone.CloneTimeout); err != nil {
logger.WarnContext(ctx, "Deferred mirror post-restore fetch failed", "upstream", upstream, "error", err)
repo.ResetToEmpty()
if rmErr := os.RemoveAll(repo.Path()); rmErr != nil {
logger.WarnContext(ctx, "Failed to remove mirror after failed fetch", "upstream", upstream, "error", rmErr)
}
return nil
}

repo.MarkReady()
logger.InfoContext(ctx, "Deferred mirror restore completed", "upstream", upstream)

if s.config.SnapshotInterval > 0 {
s.scheduleSnapshotJobs(repo)
}
if s.config.RepackInterval > 0 {
s.scheduleRepackJobs(repo)
}
return nil
})
}

// snapshotSpoolEntry holds a spool and a ready channel used to coordinate
// writer election. The first goroutine stores the entry via LoadOrStore and
// becomes the writer. It closes ready once the spool is created (or on
Expand All @@ -548,6 +669,11 @@ type snapshotSpoolEntry struct {
ready chan struct{}
}

type coldSnapshotEntry struct {
done chan struct{}
serving sync.WaitGroup // tracks all in-flight snapshot serves (winner + followers)
}

func snapshotSpoolDirForURL(mirrorRoot, upstreamURL string) (string, error) {
repoPath, err := gitclone.RepoPathFromURL(upstreamURL)
if err != nil {
Expand Down
Loading