From 99c1144f25d19968388d547bd8dfd6d12cbad7a4 Mon Sep 17 00:00:00 2001 From: Elizabeth Worstell Date: Tue, 10 Mar 2026 18:42:46 -0700 Subject: [PATCH 1/2] feat: backfill lower cache tiers on read-through When a higher tier (e.g., S3) has data but a lower tier (e.g., disk) does not, the returned reader now transparently writes to the lowest tier as the caller reads. This ensures disk cache is populated on the first S3 hit after a pod restart, avoiding repeated slow S3 reads. On write failure or partial read, the backfill is safely abandoned via context cancellation per the Cache contract. Amp-Thread-ID: https://ampcode.com/threads/T-019cda52-ee36-738c-86cd-1fd410c47d7f Co-authored-by: Amp --- internal/cache/tiered.go | 65 +++++++++++++++++++++++++++++++++++ internal/cache/tiered_test.go | 41 ++++++++++++++++++++++ 2 files changed, 106 insertions(+) diff --git a/internal/cache/tiered.go b/internal/cache/tiered.go index 8f48a43..8a96e34 100644 --- a/internal/cache/tiered.go +++ b/internal/cache/tiered.go @@ -108,6 +108,9 @@ func (t Tiered) Stat(ctx context.Context, key Key) (http.Header, error) { } // Open returns a reader from the first cache that succeeds. +// When a higher tier hits but lower tiers missed, the returned reader +// transparently backfills the lowest tier as the caller reads, so that +// subsequent Opens are served locally. // // If all caches fail, all errors are returned. func (t Tiered) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, error) { @@ -120,11 +123,73 @@ func (t Tiered) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, } else if err != nil { return nil, nil, errors.WithStack(err) } + if i > 0 { + r = t.backfillReader(ctx, key, r, headers, t.caches[0]) + } return r, headers, nil } return nil, nil, errors.Join(errs...) } +// backfillReader wraps src so that every byte read is also written to dst. +// On successful close the dst entry becomes available for future reads. +// On error or partial read the dst entry is discarded per the Cache contract +// (the context is cancelled, causing the writer to discard on Close). +func (t Tiered) backfillReader(ctx context.Context, key Key, src io.ReadCloser, headers http.Header, dst Cache) io.ReadCloser { + logger := logging.FromContext(ctx) + // Use a cancellable context so we can abort the write on failure. + // The Cache contract guarantees that cancelled-context writes are discarded. + writeCtx, cancel := context.WithCancel(ctx) + w, err := dst.Create(writeCtx, key, headers, 0) // 0 → use the cache's max TTL + if err != nil { + cancel() + logger.WarnContext(ctx, "Tier backfill: failed to create writer, skipping", + "error", err.Error()) + return src + } + return &backfillReadCloser{src: src, dst: w, ctx: ctx, cancel: cancel} +} + +// backfillReadCloser tees reads from src into dst. If the full stream is +// consumed and Close completes without error, dst is closed normally +// (committing the cached entry). On any write failure the backfill is +// abandoned but reads continue unaffected. +type backfillReadCloser struct { + src io.ReadCloser + dst io.WriteCloser + ctx context.Context + cancel context.CancelFunc + failed bool +} + +func (b *backfillReadCloser) Read(p []byte) (int, error) { + n, err := b.src.Read(p) + if n > 0 && !b.failed { + if _, wErr := b.dst.Write(p[:n]); wErr != nil { + logging.FromContext(b.ctx).WarnContext(b.ctx, "Tier backfill: write failed, abandoning", + "error", wErr.Error()) + b.failed = true + b.cancel() + } + } + return n, err +} + +func (b *backfillReadCloser) Close() error { + srcErr := b.src.Close() + if b.failed || srcErr != nil { + b.cancel() + _ = b.dst.Close() + return errors.WithStack(srcErr) + } + if err := b.dst.Close(); err != nil { + logging.FromContext(b.ctx).WarnContext(b.ctx, "Tier backfill: close failed", + "error", err.Error()) + } + b.cancel() + return nil +} + func (t Tiered) String() string { names := make([]string, len(t.caches)) for i, c := range t.caches { diff --git a/internal/cache/tiered_test.go b/internal/cache/tiered_test.go index d6ae52f..f6a37cc 100644 --- a/internal/cache/tiered_test.go +++ b/internal/cache/tiered_test.go @@ -1,6 +1,7 @@ package cache_test import ( + "io" "log/slog" "os" "testing" @@ -24,6 +25,46 @@ func TestTieredCache(t *testing.T) { }) } +func TestTieredBackfill(t *testing.T) { + _, ctx := logging.Configure(t.Context(), logging.Config{}) + + memory, err := cache.NewMemory(ctx, cache.MemoryConfig{LimitMB: 1024, MaxTTL: time.Hour}) + assert.NoError(t, err) + disk, err := cache.NewDisk(ctx, cache.DiskConfig{Root: t.TempDir(), LimitMB: 1024, MaxTTL: time.Hour}) + assert.NoError(t, err) + tiered := cache.MaybeNewTiered(ctx, []cache.Cache{memory, disk}) + + key := cache.NewKey("backfill-test") + content := []byte("hello backfill") + + // Write only to disk (tier 1), simulating S3 having data but memory/disk-L1 not. + w, err := disk.Create(ctx, key, nil, time.Hour) + assert.NoError(t, err) + _, err = w.Write(content) + assert.NoError(t, err) + assert.NoError(t, w.Close()) + + // Verify memory (tier 0) does not have it yet. + _, _, err = memory.Open(ctx, key) + assert.IsError(t, err, os.ErrNotExist) + + // Open through tiered — should hit disk and backfill memory. + r, _, err := tiered.Open(ctx, key) + assert.NoError(t, err) + data, err := io.ReadAll(r) + assert.NoError(t, err) + assert.NoError(t, r.Close()) + assert.Equal(t, content, data) + + // Now memory (tier 0) should have the entry. + r2, _, err := memory.Open(ctx, key) + assert.NoError(t, err) + data2, err := io.ReadAll(r2) + assert.NoError(t, err) + assert.NoError(t, r2.Close()) + assert.Equal(t, content, data2) +} + func TestTieredCacheSoak(t *testing.T) { if os.Getenv("SOAK_TEST") == "" { t.Skip("Skipping soak test; set SOAK_TEST=1 to run") From fdb369c5a066577da3fc6fb19f5ba56d58988ac3 Mon Sep 17 00:00:00 2001 From: Elizabeth Worstell Date: Tue, 10 Mar 2026 18:52:35 -0700 Subject: [PATCH 2/2] fix: suppress wrapcheck lint for io.Reader contract Co-authored-by: Amp Amp-Thread-ID: https://ampcode.com/threads/T-019cda52-ee36-738c-86cd-1fd410c47d7f --- internal/cache/tiered.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/cache/tiered.go b/internal/cache/tiered.go index 8a96e34..cac028e 100644 --- a/internal/cache/tiered.go +++ b/internal/cache/tiered.go @@ -172,7 +172,7 @@ func (b *backfillReadCloser) Read(p []byte) (int, error) { b.cancel() } } - return n, err + return n, err //nolint:wrapcheck // must return unwrapped io.EOF per io.Reader contract } func (b *backfillReadCloser) Close() error {