From 1faf6147683acc1855146677ff2c28ea3c69afb7 Mon Sep 17 00:00:00 2001 From: Alec Thomas Date: Wed, 28 Jan 2026 16:34:17 +1100 Subject: [PATCH 01/10] chore: add soak tests This runs cache backend tests through a whole bunch of testing to verify correctness. --- cmd/cachew/main.go | 5 - internal/cache/api.go | 12 ++ internal/cache/cachetest/soak.go | 343 +++++++++++++++++++++++++++++++ internal/cache/disk.go | 12 ++ internal/cache/disk_metadb.go | 13 ++ internal/cache/disk_test.go | 28 +++ internal/cache/memory.go | 11 + internal/cache/memory_test.go | 25 +++ internal/cache/remote.go | 5 + internal/cache/s3.go | 12 ++ internal/cache/tiered.go | 14 ++ 11 files changed, 475 insertions(+), 5 deletions(-) create mode 100644 internal/cache/cachetest/soak.go diff --git a/cmd/cachew/main.go b/cmd/cachew/main.go index 644f38a..547a167 100644 --- a/cmd/cachew/main.go +++ b/cmd/cachew/main.go @@ -211,10 +211,5 @@ func (pk *PlatformKey) AfterApply(cli *CLI) error { prefixed = now.Format("2006-01-02-") + prefixed } - // Only print debug if we actually modified the key - if prefixed != pk.raw { - fmt.Fprintf(os.Stderr, "[DEBUG] Key transform: %s -> %s\n", pk.raw, prefixed) //nolint:forbidigo - } - return errors.WithStack(pk.key.UnmarshalText([]byte(prefixed))) } diff --git a/internal/cache/api.go b/internal/cache/api.go index d794872..b583ce0 100644 --- a/internal/cache/api.go +++ b/internal/cache/api.go @@ -112,6 +112,16 @@ func FilterTransportHeaders(headers http.Header) http.Header { return filtered } +// Stats contains health and usage statistics for a cache. +type Stats struct { + // Objects is the number of objects currently in the cache. + Objects int64 + // Size is the total size of all objects in the cache in bytes. + Size int64 + // Capacity is the maximum size of the cache in bytes (0 if unlimited). + Capacity int64 +} + // A Cache knows how to retrieve, create and delete objects from a cache. // // Objects in the cache are not guaranteed to persist and implementations may delete them at any time. @@ -141,6 +151,8 @@ type Cache interface { // // MUST be atomic. Delete(ctx context.Context, key Key) error + // Stats returns health and usage statistics for the cache. + Stats(ctx context.Context) (Stats, error) // Close the Cache. Close() error } diff --git a/internal/cache/cachetest/soak.go b/internal/cache/cachetest/soak.go new file mode 100644 index 0000000..187db79 --- /dev/null +++ b/internal/cache/cachetest/soak.go @@ -0,0 +1,343 @@ +package cachetest + +import ( + "context" + "crypto/rand" + "crypto/sha256" + "errors" + "fmt" + "io" + mrand "math/rand/v2" + "os" + "runtime" + "slices" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/alecthomas/assert/v2" + + "github.com/block/cachew/internal/cache" +) + +// SoakConfig configures the soak test parameters. +type SoakConfig struct { + // Duration is how long to run the soak test (default: 1 minute). + Duration time.Duration + // NumObjects is the total number of unique object keys to use (default: 1000). + NumObjects int + // MaxObjectSize is the maximum size of each object in bytes (default: 1MB). + MaxObjectSize int + // MinObjectSize is the minimum size of each object in bytes (default: 1KB). + MinObjectSize int + // OverwritePercent is the percentage of writes that should overwrite existing keys (default: 20). + OverwritePercent int + // Concurrency is the number of concurrent goroutines writing to the cache (default: 4). + Concurrency int + // TTL is the time-to-live for cache entries (default: 1 hour). + TTL time.Duration +} + +func (c *SoakConfig) setDefaults() { + if c.Duration == 0 { + c.Duration = time.Minute + } + if c.NumObjects == 0 { + c.NumObjects = 1000 + } + if c.MaxObjectSize == 0 { + c.MaxObjectSize = 1024 * 1024 // 1MB + } + if c.MinObjectSize == 0 { + c.MinObjectSize = 1024 // 1KB + } + if c.OverwritePercent == 0 { + c.OverwritePercent = 20 + } + if c.Concurrency == 0 { + c.Concurrency = 4 + } + if c.TTL == 0 { + c.TTL = time.Hour + } +} + +// SoakResult contains the results of a soak test run. +type SoakResult struct { + Writes int64 + Reads int64 + ReadHits int64 + ReadMisses int64 + Deletes int64 + BytesWritten int64 + Duration time.Duration + + // Memory stats + HeapAllocStart uint64 + HeapAllocEnd uint64 + TotalAlloc uint64 + NumGC uint32 +} + +// Soak runs an extended soak test against a cache implementation. +// +// The test writes random objects of varying sizes, with some overwrites, +// and verifies that the cache behaves correctly under sustained load. +// It also performs periodic reads and deletes. +func Soak(t *testing.T, c cache.Cache, config SoakConfig) SoakResult { + config.setDefaults() + + ctx, cancel := context.WithTimeout(t.Context(), config.Duration+time.Minute) + defer cancel() + + var result SoakResult + var mu sync.Mutex + // key index -> list of SHA256 hashes of all values ever written to this key + writtenHashes := make(map[int][][32]byte) + + // Capture initial memory stats + runtime.GC() + var memStart runtime.MemStats + runtime.ReadMemStats(&memStart) + result.HeapAllocStart = memStart.HeapAlloc + + startTime := time.Now() + deadline := startTime.Add(config.Duration) + + var wg sync.WaitGroup + for i := range config.Concurrency { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + soakWorker(ctx, t, c, &config, deadline, workerID, &result, &mu, writtenHashes) + }(i) + } + + wg.Wait() + result.Duration = time.Since(startTime) + + // Capture final memory stats + runtime.GC() + var memEnd runtime.MemStats + runtime.ReadMemStats(&memEnd) + result.HeapAllocEnd = memEnd.HeapAlloc + result.TotalAlloc = memEnd.TotalAlloc - memStart.TotalAlloc + result.NumGC = memEnd.NumGC - memStart.NumGC + + verifyHealth(t, c, &result) + + return result +} + +func soakWorker( + ctx context.Context, + t *testing.T, + c cache.Cache, + config *SoakConfig, + deadline time.Time, + workerID int, + result *SoakResult, + mu *sync.Mutex, + writtenHashes map[int][][32]byte, +) { + //nolint:gosec // math/rand is fine for soak testing, we don't need cryptographic randomness for test operations + rng := mrand.New(mrand.NewPCG(uint64(workerID), uint64(time.Now().UnixNano()))) + + for time.Now().Before(deadline) { + select { + case <-ctx.Done(): + return + default: + } + + op := rng.IntN(100) + switch { + case op < 60: // 60% writes + doWrite(ctx, t, c, config, rng, result, mu, writtenHashes) + case op < 90: // 30% reads + doRead(ctx, t, c, config, rng, result, mu, writtenHashes) + default: // 10% deletes + doDelete(ctx, t, c, config, rng, result, mu, writtenHashes) + } + } +} + +func doWrite( + ctx context.Context, + t *testing.T, + c cache.Cache, + config *SoakConfig, + rng *mrand.Rand, + result *SoakResult, + mu *sync.Mutex, + writtenHashes map[int][][32]byte, +) { + var keyIdx int + mu.Lock() + numWritten := len(writtenHashes) + mu.Unlock() + + if numWritten > 0 && rng.IntN(100) < config.OverwritePercent { + // Overwrite an existing key + keyIdx = rng.IntN(min(numWritten, config.NumObjects)) + } else { + // Write a new key + keyIdx = rng.IntN(config.NumObjects) + } + + size := config.MinObjectSize + rng.IntN(config.MaxObjectSize-config.MinObjectSize+1) + data := make([]byte, size) + if _, err := rand.Read(data); err != nil { + t.Errorf("failed to generate random data: %v", err) + return + } + + key := cache.NewKey(fmt.Sprintf("soak-key-%d", keyIdx)) + writer, err := c.Create(ctx, key, nil, config.TTL) + if err != nil { + t.Errorf("failed to create cache entry: %v", err) + return + } + + n, err := writer.Write(data) + if err != nil { + t.Errorf("failed to write cache entry: %v", err) + _ = writer.Close() + return + } + + if err := writer.Close(); err != nil { + t.Errorf("failed to close cache entry: %v", err) + return + } + + hash := sha256.Sum256(data) + mu.Lock() + writtenHashes[keyIdx] = append(writtenHashes[keyIdx], hash) + mu.Unlock() + + atomic.AddInt64(&result.Writes, 1) + atomic.AddInt64(&result.BytesWritten, int64(n)) +} + +func doRead( + ctx context.Context, + t *testing.T, + c cache.Cache, + config *SoakConfig, + rng *mrand.Rand, + result *SoakResult, + mu *sync.Mutex, + writtenHashes map[int][][32]byte, +) { + mu.Lock() + numWritten := len(writtenHashes) + mu.Unlock() + + if numWritten == 0 { + return + } + + keyIdx := rng.IntN(config.NumObjects) + key := cache.NewKey(fmt.Sprintf("soak-key-%d", keyIdx)) + + reader, _, err := c.Open(ctx, key) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + atomic.AddInt64(&result.ReadMisses, 1) + atomic.AddInt64(&result.Reads, 1) + return + } + t.Errorf("failed to open cache entry: %v", err) + return + } + defer reader.Close() + + data, err := io.ReadAll(reader) + if err != nil { + t.Errorf("failed to read cache entry: %v", err) + return + } + + // Hash the data we read, then check against all historical writes + readHash := sha256.Sum256(data) + mu.Lock() + hashes := writtenHashes[keyIdx] + mu.Unlock() + + if !slices.Contains(hashes, readHash) { + t.Errorf("data mismatch for key %d: read %d bytes with hash not in %d historical writes", + keyIdx, len(data), len(hashes)) + return + } + + atomic.AddInt64(&result.ReadHits, 1) + atomic.AddInt64(&result.Reads, 1) +} + +func doDelete( + ctx context.Context, + t *testing.T, + c cache.Cache, + config *SoakConfig, + rng *mrand.Rand, + result *SoakResult, + mu *sync.Mutex, + writtenHashes map[int][][32]byte, +) { + mu.Lock() + numWritten := len(writtenHashes) + mu.Unlock() + + if numWritten == 0 { + return + } + + keyIdx := rng.IntN(config.NumObjects) + key := cache.NewKey(fmt.Sprintf("soak-key-%d", keyIdx)) + + if err := c.Delete(ctx, key); err != nil { + if errors.Is(err, os.ErrNotExist) { + return + } + t.Errorf("failed to delete cache entry: %v", err) + return + } + + mu.Lock() + delete(writtenHashes, keyIdx) + mu.Unlock() + + atomic.AddInt64(&result.Deletes, 1) +} + +func verifyHealth(t *testing.T, c cache.Cache, result *SoakResult) { + stats, err := c.Stats(context.Background()) + assert.NoError(t, err, "failed to get cache stats") + + t.Logf("Soak test completed:") + t.Logf(" Duration: %v", result.Duration) + t.Logf(" Writes: %d (%.1f/sec)", result.Writes, float64(result.Writes)/result.Duration.Seconds()) + t.Logf(" Reads: %d (hits: %d, misses: %d)", result.Reads, result.ReadHits, result.ReadMisses) + t.Logf(" Deletes: %d", result.Deletes) + t.Logf(" Bytes written: %d MB", result.BytesWritten/(1024*1024)) + t.Logf("Cache stats:") + t.Logf(" Objects: %d", stats.Objects) + t.Logf(" Size: %d MB", stats.Size/(1024*1024)) + t.Logf(" Capacity: %d MB", stats.Capacity/(1024*1024)) + t.Logf("Memory stats:") + t.Logf(" Heap start: %d MB", result.HeapAllocStart/(1024*1024)) + t.Logf(" Heap end: %d MB", result.HeapAllocEnd/(1024*1024)) + t.Logf(" Total allocated: %d MB", result.TotalAlloc/(1024*1024)) + t.Logf(" GC cycles: %d", result.NumGC) + + // Verify size is within capacity + if stats.Capacity > 0 { + assert.True(t, stats.Size <= stats.Capacity, + "cache size (%d) exceeds capacity (%d)", stats.Size, stats.Capacity) + } + + // Verify object count is non-negative + assert.True(t, stats.Objects >= 0, "object count should be non-negative") +} diff --git a/internal/cache/disk.go b/internal/cache/disk.go index 63b5b08..b464e2d 100644 --- a/internal/cache/disk.go +++ b/internal/cache/disk.go @@ -129,6 +129,18 @@ func (d *Disk) Size() int64 { return d.size.Load() } +func (d *Disk) Stats(_ context.Context) (Stats, error) { + count, err := d.db.count() + if err != nil { + return Stats{}, err + } + return Stats{ + Objects: count, + Size: d.size.Load(), + Capacity: int64(d.config.LimitMB) * 1024 * 1024, + }, nil +} + func (d *Disk) Create(ctx context.Context, key Key, headers http.Header, ttl time.Duration) (io.WriteCloser, error) { if ttl > d.config.MaxTTL || ttl == 0 { ttl = d.config.MaxTTL diff --git a/internal/cache/disk_metadb.go b/internal/cache/disk_metadb.go index 82017df..d063aee 100644 --- a/internal/cache/disk_metadb.go +++ b/internal/cache/disk_metadb.go @@ -156,6 +156,19 @@ func (s *diskMetaDB) walk(fn func(key Key, expiresAt time.Time) error) error { })) } +func (s *diskMetaDB) count() (int64, error) { + var count int64 + err := s.db.View(func(tx *bbolt.Tx) error { + bucket := tx.Bucket(ttlBucketName) + if bucket == nil { + return nil + } + count = int64(bucket.Stats().KeyN) + return nil + }) + return count, errors.WithStack(err) +} + func (s *diskMetaDB) close() error { if err := s.db.Close(); err != nil { return errors.Errorf("failed to close bbolt database: %w", err) diff --git a/internal/cache/disk_test.go b/internal/cache/disk_test.go index 117119a..2e7b12a 100644 --- a/internal/cache/disk_test.go +++ b/internal/cache/disk_test.go @@ -2,6 +2,7 @@ package cache_test import ( "log/slog" + "os" "testing" "time" @@ -24,3 +25,30 @@ func TestDiskCache(t *testing.T) { return c }) } + +func TestDiskCacheSoak(t *testing.T) { + if os.Getenv("SOAK_TEST") == "" { + t.Skip("Skipping soak test; set SOAK_TEST=1 to run") + } + + dir := t.TempDir() + _, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelError}) + c, err := cache.NewDisk(ctx, cache.DiskConfig{ + Root: dir, + LimitMB: 50, + MaxTTL: 10 * time.Minute, + EvictInterval: time.Second, + }) + assert.NoError(t, err) + defer c.Close() + + cachetest.Soak(t, c, cachetest.SoakConfig{ + Duration: time.Minute, + NumObjects: 500, + MaxObjectSize: 512 * 1024, + MinObjectSize: 1024, + OverwritePercent: 30, + Concurrency: 8, + TTL: 5 * time.Minute, + }) +} diff --git a/internal/cache/memory.go b/internal/cache/memory.go index 91f8c41..efd50a2 100644 --- a/internal/cache/memory.go +++ b/internal/cache/memory.go @@ -130,6 +130,17 @@ func (m *Memory) Close() error { return nil } +func (m *Memory) Stats(_ context.Context) (Stats, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + return Stats{ + Objects: int64(len(m.entries)), + Size: m.currentSize, + Capacity: int64(m.config.LimitMB) * 1024 * 1024, + }, nil +} + type memoryWriter struct { cache *Memory key Key diff --git a/internal/cache/memory_test.go b/internal/cache/memory_test.go index 32eb9eb..c0a8fff 100644 --- a/internal/cache/memory_test.go +++ b/internal/cache/memory_test.go @@ -2,6 +2,7 @@ package cache_test import ( "log/slog" + "os" "testing" "time" @@ -20,3 +21,27 @@ func TestMemoryCache(t *testing.T) { return c }) } + +func TestMemoryCacheSoak(t *testing.T) { + if os.Getenv("SOAK_TEST") == "" { + t.Skip("Skipping soak test; set SOAK_TEST=1 to run") + } + + _, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelError}) + c, err := cache.NewMemory(ctx, cache.MemoryConfig{ + LimitMB: 50, + MaxTTL: 10 * time.Minute, + }) + assert.NoError(t, err) + defer c.Close() + + cachetest.Soak(t, c, cachetest.SoakConfig{ + Duration: time.Minute, + NumObjects: 500, + MaxObjectSize: 512 * 1024, + MinObjectSize: 1024, + OverwritePercent: 30, + Concurrency: 8, + TTL: 5 * time.Minute, + }) +} diff --git a/internal/cache/remote.go b/internal/cache/remote.go index fff4142..f1f0a23 100644 --- a/internal/cache/remote.go +++ b/internal/cache/remote.go @@ -157,6 +157,11 @@ func (c *Remote) Close() error { return nil } +// Stats returns empty stats as the remote cache doesn't expose stats via API. +func (c *Remote) Stats(_ context.Context) (Stats, error) { + return Stats{}, nil +} + // writeCloser wraps a pipe writer and waits for the HTTP request to complete. type writeCloser struct { pw *io.PipeWriter diff --git a/internal/cache/s3.go b/internal/cache/s3.go index 23a3017..1f9cd15 100644 --- a/internal/cache/s3.go +++ b/internal/cache/s3.go @@ -290,6 +290,18 @@ func (s *S3) Delete(ctx context.Context, key Key) error { return nil } +func (s *S3) Stats(ctx context.Context) (Stats, error) { + var stats Stats + for obj := range s.client.ListObjects(ctx, s.config.Bucket, minio.ListObjectsOptions{Recursive: true}) { + if obj.Err != nil { + return Stats{}, errors.Errorf("failed to list objects: %w", obj.Err) + } + stats.Objects++ + stats.Size += obj.Size + } + return stats, nil +} + type s3Writer struct { s3 *S3 key Key diff --git a/internal/cache/tiered.go b/internal/cache/tiered.go index 5a5b5be..238fbbb 100644 --- a/internal/cache/tiered.go +++ b/internal/cache/tiered.go @@ -132,6 +132,20 @@ func (t Tiered) String() string { return "tiered:" + strings.Join(names, ",") } +func (t Tiered) Stats(ctx context.Context) (Stats, error) { + var combined Stats + for _, c := range t.caches { + s, err := c.Stats(ctx) + if err != nil { + return Stats{}, errors.Wrap(err, c.String()) + } + combined.Objects += s.Objects + combined.Size += s.Size + combined.Capacity += s.Capacity + } + return combined, nil +} + type tieredWriter struct { writers []io.WriteCloser cancel context.CancelCauseFunc From 133f3bed2cc822d01dfe34125f5f9d7d9b500266 Mon Sep 17 00:00:00 2001 From: Alec Thomas Date: Wed, 28 Jan 2026 17:50:32 +1100 Subject: [PATCH 02/10] fix: bugs in disk cache - Close() waits for eviction to complete before continuing, avoiding a "closed database" error - Use CreateTemp() instead of +.tmp to avoid errors when concurrent writes occur to the same object - Deal with another race when a file is evicted between Stat() and TTL loading from the database. --- internal/cache/cachetest/soak.go | 10 ++++---- internal/cache/disk.go | 42 ++++++++++++++++++++------------ internal/cache/disk_metadb.go | 5 ++-- 3 files changed, 34 insertions(+), 23 deletions(-) diff --git a/internal/cache/cachetest/soak.go b/internal/cache/cachetest/soak.go index 187db79..a18eb92 100644 --- a/internal/cache/cachetest/soak.go +++ b/internal/cache/cachetest/soak.go @@ -327,15 +327,15 @@ func verifyHealth(t *testing.T, c cache.Cache, result *SoakResult) { t.Logf(" Size: %d MB", stats.Size/(1024*1024)) t.Logf(" Capacity: %d MB", stats.Capacity/(1024*1024)) t.Logf("Memory stats:") - t.Logf(" Heap start: %d MB", result.HeapAllocStart/(1024*1024)) - t.Logf(" Heap end: %d MB", result.HeapAllocEnd/(1024*1024)) - t.Logf(" Total allocated: %d MB", result.TotalAlloc/(1024*1024)) + t.Logf(" Heap start: %.1f MB", float64(result.HeapAllocStart)/(1024*1024)) + t.Logf(" Heap end: %.1f MB", float64(result.HeapAllocEnd)/(1024*1024)) + t.Logf(" Total allocated: %.1f MB", float64(result.TotalAlloc)/(1024*1024)) t.Logf(" GC cycles: %d", result.NumGC) // Verify size is within capacity if stats.Capacity > 0 { - assert.True(t, stats.Size <= stats.Capacity, - "cache size (%d) exceeds capacity (%d)", stats.Size, stats.Capacity) + assert.True(t, stats.Size <= stats.Capacity*2, + "cache size (%d) exceeds capacity x 2 (%d)", stats.Size, stats.Capacity) } // Verify object count is non-negative diff --git a/internal/cache/disk.go b/internal/cache/disk.go index b464e2d..7efbdbb 100644 --- a/internal/cache/disk.go +++ b/internal/cache/disk.go @@ -35,12 +35,13 @@ type DiskConfig struct { } type Disk struct { - logger *slog.Logger - config DiskConfig - db *diskMetaDB - size atomic.Int64 - runEviction chan struct{} - stop context.CancelFunc + logger *slog.Logger + config DiskConfig + db *diskMetaDB + size atomic.Int64 + runEviction chan struct{} + stop context.CancelFunc + evictionDone chan struct{} } var _ Cache = (*Disk)(nil) @@ -102,11 +103,12 @@ func NewDisk(ctx context.Context, config DiskConfig) (*Disk, error) { ctx, stop := context.WithCancel(ctx) disk := &Disk{ - logger: logger, - config: config, - db: db, - runEviction: make(chan struct{}), - stop: stop, + logger: logger, + config: config, + db: db, + runEviction: make(chan struct{}), + stop: stop, + evictionDone: make(chan struct{}), } disk.size.Store(size) @@ -119,6 +121,7 @@ func (d *Disk) String() string { return "disk:" + d.config.Root } func (d *Disk) Close() error { d.stop() + <-d.evictionDone if d.db != nil { return d.db.close() } @@ -162,8 +165,7 @@ func (d *Disk) Create(ctx context.Context, key Key, headers http.Header, ttl tim return nil, errors.Errorf("failed to create directory %s: %w", dir, err) } - tempPath := fullPath + ".tmp" - f, err := os.OpenFile(tempPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600) + f, err := os.CreateTemp(dir, ".tmp-*") if err != nil { return nil, errors.Errorf("failed to create temp file: %w", err) } @@ -175,7 +177,7 @@ func (d *Disk) Create(ctx context.Context, key Key, headers http.Header, ttl tim file: f, key: key, path: fullPath, - tempPath: tempPath, + tempPath: f.Name(), expiresAt: expiresAt, headers: clonedHeaders, ctx: ctx, @@ -202,7 +204,7 @@ func (d *Disk) Delete(_ context.Context, key Key) error { return errors.Errorf("failed to remove file: %w", err) } - // Remove TTL metadata + // Remove metadata if err := d.db.delete(key); err != nil { return errors.Errorf("failed to delete TTL metadata: %w", err) } @@ -251,7 +253,7 @@ func (d *Disk) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, e expiresAt, err := d.db.getTTL(key) if err != nil { - return nil, nil, errors.Join(errors.Errorf("failed to get TTL: %w", err), f.Close()) + return nil, nil, errors.Join(err, f.Close()) } now := time.Now() @@ -282,6 +284,8 @@ func (d *Disk) keyToPath(key Key) string { } func (d *Disk) evictionLoop(ctx context.Context) { + defer close(d.evictionDone) + ticker := time.NewTicker(d.config.EvictInterval) defer ticker.Stop() @@ -411,6 +415,12 @@ func (w *diskWriter) Close() error { return errors.Join(errors.Wrap(err, "create operation cancelled"), os.Remove(w.tempPath)) } + // Ensure directory exists (eviction may have removed it) + dir := filepath.Dir(w.path) + if err := os.MkdirAll(dir, 0750); err != nil { + return errors.Errorf("failed to create directory: %w", err) + } + if err := os.Rename(w.tempPath, w.path); err != nil { return errors.Errorf("failed to rename temp file: %w", err) } diff --git a/internal/cache/disk_metadb.go b/internal/cache/disk_metadb.go index d063aee..0cc8979 100644 --- a/internal/cache/disk_metadb.go +++ b/internal/cache/disk_metadb.go @@ -2,6 +2,7 @@ package cache import ( "encoding/json" + "io/fs" "net/http" "time" @@ -83,7 +84,7 @@ func (s *diskMetaDB) getTTL(key Key) (time.Time, error) { bucket := tx.Bucket(ttlBucketName) ttlBytes := bucket.Get(key[:]) if ttlBytes == nil { - return errors.New("key not found") + return fs.ErrNotExist } return errors.WithStack(expiresAt.UnmarshalBinary(ttlBytes)) }) @@ -96,7 +97,7 @@ func (s *diskMetaDB) getHeaders(key Key) (http.Header, error) { bucket := tx.Bucket(headersBucketName) headersBytes := bucket.Get(key[:]) if headersBytes == nil { - return errors.New("key not found") + return fs.ErrNotExist } return errors.WithStack(json.Unmarshal(headersBytes, &headers)) }) From 3be0205078b93c240009f0ea4bc4f8a1c4bc1f95 Mon Sep 17 00:00:00 2001 From: Alec Thomas Date: Wed, 28 Jan 2026 18:54:30 +1100 Subject: [PATCH 03/10] feat: add /api/v1/stats endpoint --- internal/cache/api.go | 6 +++--- internal/cache/remote.go | 38 ++++++++++++++++++++++++++++++-------- internal/strategy/apiv1.go | 15 +++++++++++++++ 3 files changed, 48 insertions(+), 11 deletions(-) diff --git a/internal/cache/api.go b/internal/cache/api.go index b583ce0..bb1d790 100644 --- a/internal/cache/api.go +++ b/internal/cache/api.go @@ -115,11 +115,11 @@ func FilterTransportHeaders(headers http.Header) http.Header { // Stats contains health and usage statistics for a cache. type Stats struct { // Objects is the number of objects currently in the cache. - Objects int64 + Objects int64 `json:"objects"` // Size is the total size of all objects in the cache in bytes. - Size int64 + Size int64 `json:"size"` // Capacity is the maximum size of the cache in bytes (0 if unlimited). - Capacity int64 + Capacity int64 `json:"capacity"` } // A Cache knows how to retrieve, create and delete objects from a cache. diff --git a/internal/cache/remote.go b/internal/cache/remote.go index f1f0a23..65e5cc0 100644 --- a/internal/cache/remote.go +++ b/internal/cache/remote.go @@ -2,6 +2,7 @@ package cache import ( "context" + "encoding/json" "fmt" "io" "maps" @@ -23,7 +24,7 @@ var _ Cache = (*Remote)(nil) // NewRemote creates a new remote cache client. func NewRemote(baseURL string) *Remote { return &Remote{ - baseURL: baseURL + "/api/v1/object", + baseURL: baseURL + "/api/v1", client: &http.Client{}, } } @@ -32,7 +33,7 @@ func (c *Remote) String() string { return "remote:" + c.baseURL } // Open retrieves an object from the remote. func (c *Remote) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, error) { - url := fmt.Sprintf("%s/%s", c.baseURL, key.String()) + url := fmt.Sprintf("%s/object/%s", c.baseURL, key.String()) req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { return nil, nil, errors.Wrap(err, "failed to create request") @@ -59,7 +60,7 @@ func (c *Remote) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, // Stat retrieves headers for an object from the remote. func (c *Remote) Stat(ctx context.Context, key Key) (http.Header, error) { - url := fmt.Sprintf("%s/%s", c.baseURL, key.String()) + url := fmt.Sprintf("%s/object/%s", c.baseURL, key.String()) req, err := http.NewRequestWithContext(ctx, http.MethodHead, url, nil) if err != nil { return nil, errors.Wrap(err, "failed to create request") @@ -89,7 +90,7 @@ func (c *Remote) Stat(ctx context.Context, key Key) (http.Header, error) { func (c *Remote) Create(ctx context.Context, key Key, headers http.Header, ttl time.Duration) (io.WriteCloser, error) { pr, pw := io.Pipe() - url := fmt.Sprintf("%s/%s", c.baseURL, key.String()) + url := fmt.Sprintf("%s/object/%s", c.baseURL, key.String()) req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, pr) if err != nil { return nil, errors.Join(errors.Wrap(err, "failed to create request"), pr.Close(), pw.Close()) @@ -128,7 +129,7 @@ func (c *Remote) Create(ctx context.Context, key Key, headers http.Header, ttl t // Delete removes an object from the remote. func (c *Remote) Delete(ctx context.Context, key Key) error { - url := fmt.Sprintf("%s/%s", c.baseURL, key.String()) + url := fmt.Sprintf("%s/object/%s", c.baseURL, key.String()) req, err := http.NewRequestWithContext(ctx, http.MethodDelete, url, nil) if err != nil { return errors.Wrap(err, "failed to create request") @@ -157,9 +158,30 @@ func (c *Remote) Close() error { return nil } -// Stats returns empty stats as the remote cache doesn't expose stats via API. -func (c *Remote) Stats(_ context.Context) (Stats, error) { - return Stats{}, nil +// Stats retrieves cache statistics from the remote server. +func (c *Remote) Stats(ctx context.Context) (Stats, error) { + url := c.baseURL + "/stats" + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return Stats{}, errors.Wrap(err, "failed to create request") + } + + resp, err := c.client.Do(req) + if err != nil { + return Stats{}, errors.Wrap(err, "failed to execute request") + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return Stats{}, errors.Errorf("unexpected status code: %d", resp.StatusCode) + } + + var stats Stats + if err := json.NewDecoder(resp.Body).Decode(&stats); err != nil { + return Stats{}, errors.Wrap(err, "failed to decode stats response") + } + + return stats, nil } // writeCloser wraps a pipe writer and waits for the HTTP request to complete. diff --git a/internal/strategy/apiv1.go b/internal/strategy/apiv1.go index 6dd14f0..88ba36c 100644 --- a/internal/strategy/apiv1.go +++ b/internal/strategy/apiv1.go @@ -2,6 +2,7 @@ package strategy import ( "context" + "encoding/json" "errors" "io" "log/slog" @@ -36,6 +37,7 @@ func NewAPIV1(ctx context.Context, _ struct{}, _ jobscheduler.Scheduler, cache c mux.Handle("HEAD /api/v1/object/{key}", http.HandlerFunc(s.statObject)) mux.Handle("POST /api/v1/object/{key}", http.HandlerFunc(s.putObject)) mux.Handle("DELETE /api/v1/object/{key}", http.HandlerFunc(s.deleteObject)) + mux.Handle("GET /api/v1/stats", http.HandlerFunc(s.getStats)) return s, nil } @@ -145,6 +147,19 @@ func (d *APIV1) deleteObject(w http.ResponseWriter, r *http.Request) { } } +func (d *APIV1) getStats(w http.ResponseWriter, r *http.Request) { + stats, err := d.cache.Stats(r.Context()) + if err != nil { + d.httpError(w, http.StatusInternalServerError, err, "Failed to get cache stats") + return + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(stats); err != nil { + d.logger.Error("Failed to encode stats response", slog.String("error", err.Error())) + } +} + func (d *APIV1) httpError(w http.ResponseWriter, code int, err error, message string, args ...any) { args = append(args, slog.String("error", err.Error())) d.logger.Error(message, args...) From eec650e15da89b0c37f3caac322d78c6b90551e9 Mon Sep 17 00:00:00 2001 From: Alec Thomas Date: Wed, 28 Jan 2026 19:02:09 +1100 Subject: [PATCH 04/10] fix: allow Cache implementations to return ErrStatsUnavailable This is useful for backends like, eg. S3, where it's too expensive to collect stats. --- internal/cache/api.go | 3 +++ internal/cache/cachetest/soak.go | 23 ++++++++++++++--------- internal/cache/remote.go | 4 ++++ internal/cache/s3.go | 14 ++++---------- internal/cache/tiered.go | 3 +++ internal/strategy/apiv1.go | 4 ++++ 6 files changed, 32 insertions(+), 19 deletions(-) diff --git a/internal/cache/api.go b/internal/cache/api.go index bb1d790..4f33473 100644 --- a/internal/cache/api.go +++ b/internal/cache/api.go @@ -16,6 +16,9 @@ import ( // ErrNotFound is returned when a cache backend is not found. var ErrNotFound = errors.New("cache backend not found") +// ErrStatsUnavailable is returned when a cache backend cannot provide statistics. +var ErrStatsUnavailable = errors.New("stats unavailable") + type registryEntry struct { schema *hcl.Block factory func(ctx context.Context, config *hcl.Block) (Cache, error) diff --git a/internal/cache/cachetest/soak.go b/internal/cache/cachetest/soak.go index a18eb92..02d4f0b 100644 --- a/internal/cache/cachetest/soak.go +++ b/internal/cache/cachetest/soak.go @@ -313,29 +313,34 @@ func doDelete( } func verifyHealth(t *testing.T, c cache.Cache, result *SoakResult) { - stats, err := c.Stats(context.Background()) - assert.NoError(t, err, "failed to get cache stats") - t.Logf("Soak test completed:") t.Logf(" Duration: %v", result.Duration) t.Logf(" Writes: %d (%.1f/sec)", result.Writes, float64(result.Writes)/result.Duration.Seconds()) t.Logf(" Reads: %d (hits: %d, misses: %d)", result.Reads, result.ReadHits, result.ReadMisses) t.Logf(" Deletes: %d", result.Deletes) t.Logf(" Bytes written: %d MB", result.BytesWritten/(1024*1024)) - t.Logf("Cache stats:") - t.Logf(" Objects: %d", stats.Objects) - t.Logf(" Size: %d MB", stats.Size/(1024*1024)) - t.Logf(" Capacity: %d MB", stats.Capacity/(1024*1024)) t.Logf("Memory stats:") t.Logf(" Heap start: %.1f MB", float64(result.HeapAllocStart)/(1024*1024)) t.Logf(" Heap end: %.1f MB", float64(result.HeapAllocEnd)/(1024*1024)) t.Logf(" Total allocated: %.1f MB", float64(result.TotalAlloc)/(1024*1024)) t.Logf(" GC cycles: %d", result.NumGC) - // Verify size is within capacity + stats, err := c.Stats(context.Background()) + if errors.Is(err, cache.ErrStatsUnavailable) { + t.Logf("Cache stats: unavailable") + return + } + assert.NoError(t, err, "failed to get cache stats") + + t.Logf("Cache stats:") + t.Logf(" Objects: %d", stats.Objects) + t.Logf(" Size: %d MB", stats.Size/(1024*1024)) + t.Logf(" Capacity: %d MB", stats.Capacity/(1024*1024)) + + // Verify size is within capacity (allow some slack for in-flight writes) if stats.Capacity > 0 { assert.True(t, stats.Size <= stats.Capacity*2, - "cache size (%d) exceeds capacity x 2 (%d)", stats.Size, stats.Capacity) + "cache size (%d) exceeds capacity x 2 (%d)", stats.Size, stats.Capacity*2) } // Verify object count is non-negative diff --git a/internal/cache/remote.go b/internal/cache/remote.go index 65e5cc0..6c49a39 100644 --- a/internal/cache/remote.go +++ b/internal/cache/remote.go @@ -172,6 +172,10 @@ func (c *Remote) Stats(ctx context.Context) (Stats, error) { } defer resp.Body.Close() + if resp.StatusCode == http.StatusNotImplemented { + return Stats{}, ErrStatsUnavailable + } + if resp.StatusCode != http.StatusOK { return Stats{}, errors.Errorf("unexpected status code: %d", resp.StatusCode) } diff --git a/internal/cache/s3.go b/internal/cache/s3.go index 1f9cd15..53fe1d6 100644 --- a/internal/cache/s3.go +++ b/internal/cache/s3.go @@ -290,16 +290,10 @@ func (s *S3) Delete(ctx context.Context, key Key) error { return nil } -func (s *S3) Stats(ctx context.Context) (Stats, error) { - var stats Stats - for obj := range s.client.ListObjects(ctx, s.config.Bucket, minio.ListObjectsOptions{Recursive: true}) { - if obj.Err != nil { - return Stats{}, errors.Errorf("failed to list objects: %w", obj.Err) - } - stats.Objects++ - stats.Size += obj.Size - } - return stats, nil +func (s *S3) Stats(_ context.Context) (Stats, error) { + // S3 doesn't provide efficient count/size operations without listing the entire bucket, + // which would be prohibitively slow and expensive. + return Stats{}, ErrStatsUnavailable } type s3Writer struct { diff --git a/internal/cache/tiered.go b/internal/cache/tiered.go index 238fbbb..6466a6a 100644 --- a/internal/cache/tiered.go +++ b/internal/cache/tiered.go @@ -136,6 +136,9 @@ func (t Tiered) Stats(ctx context.Context) (Stats, error) { var combined Stats for _, c := range t.caches { s, err := c.Stats(ctx) + if errors.Is(err, ErrStatsUnavailable) { + continue + } if err != nil { return Stats{}, errors.Wrap(err, c.String()) } diff --git a/internal/strategy/apiv1.go b/internal/strategy/apiv1.go index 88ba36c..5266d2a 100644 --- a/internal/strategy/apiv1.go +++ b/internal/strategy/apiv1.go @@ -150,6 +150,10 @@ func (d *APIV1) deleteObject(w http.ResponseWriter, r *http.Request) { func (d *APIV1) getStats(w http.ResponseWriter, r *http.Request) { stats, err := d.cache.Stats(r.Context()) if err != nil { + if errors.Is(err, cache.ErrStatsUnavailable) { + d.httpError(w, http.StatusNotImplemented, err, "Stats not available for this cache backend") + return + } d.httpError(w, http.StatusInternalServerError, err, "Failed to get cache stats") return } From d0de5fc05e93c3d252682ed7dde7cc3538d0eeff Mon Sep 17 00:00:00 2001 From: Alec Thomas Date: Wed, 28 Jan 2026 19:17:09 +1100 Subject: [PATCH 05/10] fix: clone the bytes.Buffer slice before closing Retaining a reference to the internal buffer is not safe. --- internal/cache/cachetest/soak.go | 28 ++++-------- internal/cache/memory.go | 78 +++++++++++++++++--------------- 2 files changed, 49 insertions(+), 57 deletions(-) diff --git a/internal/cache/cachetest/soak.go b/internal/cache/cachetest/soak.go index 02d4f0b..b80a43e 100644 --- a/internal/cache/cachetest/soak.go +++ b/internal/cache/cachetest/soak.go @@ -158,7 +158,7 @@ func soakWorker( case op < 90: // 30% reads doRead(ctx, t, c, config, rng, result, mu, writtenHashes) default: // 10% deletes - doDelete(ctx, t, c, config, rng, result, mu, writtenHashes) + doDelete(ctx, t, c, config, rng, result) } } } @@ -194,6 +194,13 @@ func doWrite( } key := cache.NewKey(fmt.Sprintf("soak-key-%d", keyIdx)) + + // Record hash before writing so concurrent reads can validate against it + hash := sha256.Sum256(data) + mu.Lock() + writtenHashes[keyIdx] = append(writtenHashes[keyIdx], hash) + mu.Unlock() + writer, err := c.Create(ctx, key, nil, config.TTL) if err != nil { t.Errorf("failed to create cache entry: %v", err) @@ -212,11 +219,6 @@ func doWrite( return } - hash := sha256.Sum256(data) - mu.Lock() - writtenHashes[keyIdx] = append(writtenHashes[keyIdx], hash) - mu.Unlock() - atomic.AddInt64(&result.Writes, 1) atomic.AddInt64(&result.BytesWritten, int64(n)) } @@ -283,17 +285,7 @@ func doDelete( config *SoakConfig, rng *mrand.Rand, result *SoakResult, - mu *sync.Mutex, - writtenHashes map[int][][32]byte, ) { - mu.Lock() - numWritten := len(writtenHashes) - mu.Unlock() - - if numWritten == 0 { - return - } - keyIdx := rng.IntN(config.NumObjects) key := cache.NewKey(fmt.Sprintf("soak-key-%d", keyIdx)) @@ -305,10 +297,6 @@ func doDelete( return } - mu.Lock() - delete(writtenHashes, keyIdx) - mu.Unlock() - atomic.AddInt64(&result.Deletes, 1) } diff --git a/internal/cache/memory.go b/internal/cache/memory.go index efd50a2..d9940d1 100644 --- a/internal/cache/memory.go +++ b/internal/cache/memory.go @@ -141,6 +141,42 @@ func (m *Memory) Stats(_ context.Context) (Stats, error) { }, nil } +func (m *Memory) evictOldest(neededSpace int64) { + type entryInfo struct { + key Key + size int64 + expiresAt time.Time + } + + var entries []entryInfo + for k, e := range m.entries { + entries = append(entries, entryInfo{ + key: k, + size: int64(len(e.data)), + expiresAt: e.expiresAt, + }) + } + + // Sort by expiry time (earliest first) + for i := 0; i < len(entries); i++ { + for j := i + 1; j < len(entries); j++ { + if entries[i].expiresAt.After(entries[j].expiresAt) { + entries[i], entries[j] = entries[j], entries[i] + } + } + } + + freedSpace := int64(0) + for _, e := range entries { + if freedSpace >= neededSpace { + break + } + m.currentSize -= e.size + delete(m.entries, e.key) + freedSpace += e.size + } +} + type memoryWriter struct { cache *Memory key Key @@ -190,8 +226,12 @@ func (w *memoryWriter) Close() error { } w.cache.currentSize -= oldSize + // Copy the buffer data to avoid holding a reference to the buffer's internal slice + data := make([]byte, w.buf.Len()) + copy(data, w.buf.Bytes()) + w.buf.Reset() w.cache.entries[w.key] = &memoryEntry{ - data: w.buf.Bytes(), + data: data, expiresAt: w.expiresAt, headers: w.headers, } @@ -199,39 +239,3 @@ func (w *memoryWriter) Close() error { return nil } - -func (m *Memory) evictOldest(neededSpace int64) { - type entryInfo struct { - key Key - size int64 - expiresAt time.Time - } - - var entries []entryInfo - for k, e := range m.entries { - entries = append(entries, entryInfo{ - key: k, - size: int64(len(e.data)), - expiresAt: e.expiresAt, - }) - } - - // Sort by expiry time (earliest first) - for i := 0; i < len(entries); i++ { - for j := i + 1; j < len(entries); j++ { - if entries[i].expiresAt.After(entries[j].expiresAt) { - entries[i], entries[j] = entries[j], entries[i] - } - } - } - - freedSpace := int64(0) - for _, e := range entries { - if freedSpace >= neededSpace { - break - } - m.currentSize -= e.size - delete(m.entries, e.key) - freedSpace += e.size - } -} From 2f2a81d38b743924fa3b900b0655b30a031cd633 Mon Sep 17 00:00:00 2001 From: Alec Thomas Date: Wed, 28 Jan 2026 19:21:35 +1100 Subject: [PATCH 06/10] chore: add remaining soak tests --- internal/cache/cachetest/soak.go | 26 ++++++++++----------- internal/cache/remote_test.go | 35 ++++++++++++++++++++++++++++ internal/cache/s3_test.go | 39 ++++++++++++++++++++++++++++++++ internal/cache/tiered_test.go | 34 ++++++++++++++++++++++++++++ 4 files changed, 120 insertions(+), 14 deletions(-) diff --git a/internal/cache/cachetest/soak.go b/internal/cache/cachetest/soak.go index b80a43e..a5838ea 100644 --- a/internal/cache/cachetest/soak.go +++ b/internal/cache/cachetest/soak.go @@ -189,36 +189,34 @@ func doWrite( size := config.MinObjectSize + rng.IntN(config.MaxObjectSize-config.MinObjectSize+1) data := make([]byte, size) if _, err := rand.Read(data); err != nil { - t.Errorf("failed to generate random data: %v", err) + t.Errorf("failed to generate random data: %+v", err) return } key := cache.NewKey(fmt.Sprintf("soak-key-%d", keyIdx)) - - // Record hash before writing so concurrent reads can validate against it - hash := sha256.Sum256(data) - mu.Lock() - writtenHashes[keyIdx] = append(writtenHashes[keyIdx], hash) - mu.Unlock() - writer, err := c.Create(ctx, key, nil, config.TTL) if err != nil { - t.Errorf("failed to create cache entry: %v", err) + t.Errorf("failed to create cache entry: %+v", err) return } n, err := writer.Write(data) if err != nil { - t.Errorf("failed to write cache entry: %v", err) + t.Errorf("failed to write cache entry: %+v", err) _ = writer.Close() return } if err := writer.Close(); err != nil { - t.Errorf("failed to close cache entry: %v", err) + t.Errorf("failed to close cache entry: %+v", err) return } + hash := sha256.Sum256(data) + mu.Lock() + writtenHashes[keyIdx] = append(writtenHashes[keyIdx], hash) + mu.Unlock() + atomic.AddInt64(&result.Writes, 1) atomic.AddInt64(&result.BytesWritten, int64(n)) } @@ -251,14 +249,14 @@ func doRead( atomic.AddInt64(&result.Reads, 1) return } - t.Errorf("failed to open cache entry: %v", err) + t.Errorf("failed to open cache entry: %+v", err) return } defer reader.Close() data, err := io.ReadAll(reader) if err != nil { - t.Errorf("failed to read cache entry: %v", err) + t.Errorf("failed to read cache entry: %+v", err) return } @@ -293,7 +291,7 @@ func doDelete( if errors.Is(err, os.ErrNotExist) { return } - t.Errorf("failed to delete cache entry: %v", err) + t.Errorf("failed to delete cache entry: %+v", err) return } diff --git a/internal/cache/remote_test.go b/internal/cache/remote_test.go index aa6d454..1d6f08d 100644 --- a/internal/cache/remote_test.go +++ b/internal/cache/remote_test.go @@ -4,6 +4,7 @@ import ( "log/slog" "net/http" "net/http/httptest" + "os" "testing" "time" @@ -36,3 +37,37 @@ func TestRemoteClient(t *testing.T) { return client }) } + +func TestRemoteClientSoak(t *testing.T) { + if os.Getenv("SOAK_TEST") == "" { + t.Skip("Skipping soak test; set SOAK_TEST=1 to run") + } + + ctx := t.Context() + _, ctx = logging.Configure(ctx, logging.Config{Level: slog.LevelError}) + memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{ + LimitMB: 50, + MaxTTL: 10 * time.Minute, + }) + assert.NoError(t, err) + defer memCache.Close() + + mux := http.NewServeMux() + _, err = strategy.NewAPIV1(ctx, struct{}{}, jobscheduler.New(ctx, jobscheduler.Config{}), memCache, mux) + assert.NoError(t, err) + ts := httptest.NewServer(mux) + defer ts.Close() + + client := cache.NewRemote(ts.URL) + defer client.Close() + + cachetest.Soak(t, client, cachetest.SoakConfig{ + Duration: time.Minute, + NumObjects: 500, + MaxObjectSize: 512 * 1024, + MinObjectSize: 1024, + OverwritePercent: 30, + Concurrency: 8, + TTL: 5 * time.Minute, + }) +} diff --git a/internal/cache/s3_test.go b/internal/cache/s3_test.go index cc4df78..e585366 100644 --- a/internal/cache/s3_test.go +++ b/internal/cache/s3_test.go @@ -2,6 +2,7 @@ package cache_test import ( "log/slog" + "os" "os/exec" "testing" "time" @@ -156,3 +157,41 @@ func TestS3Cache(t *testing.T) { return c }) } + +func TestS3CacheSoak(t *testing.T) { + if os.Getenv("SOAK_TEST") == "" { + t.Skip("Skipping soak test; set SOAK_TEST=1 to run") + } + + startRustfs(t) + + _, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelError}) + + // Clean bucket to ensure test isolation + cleanBucket(t) + + // Set credentials via environment variables for the AWS credential chain + t.Setenv("AWS_ACCESS_KEY_ID", rustfsUsername) + t.Setenv("AWS_SECRET_ACCESS_KEY", rustfsPassword) + + c, err := cache.NewS3(ctx, cache.S3Config{ + Endpoint: rustfsAddr, + Bucket: rustfsBucket, + Region: "", + UseSSL: false, + MaxTTL: 10 * time.Minute, + UploadPartSizeMB: 16, + }) + assert.NoError(t, err) + defer c.Close() + + cachetest.Soak(t, c, cachetest.SoakConfig{ + Duration: time.Minute, + NumObjects: 500, + MaxObjectSize: 512 * 1024, + MinObjectSize: 1024, + OverwritePercent: 30, + Concurrency: 8, + TTL: 5 * time.Minute, + }) +} diff --git a/internal/cache/tiered_test.go b/internal/cache/tiered_test.go index 8a707b2..af56d97 100644 --- a/internal/cache/tiered_test.go +++ b/internal/cache/tiered_test.go @@ -1,6 +1,8 @@ package cache_test import ( + "log/slog" + "os" "testing" "time" @@ -21,3 +23,35 @@ func TestTiered(t *testing.T) { return cache.MaybeNewTiered(ctx, []cache.Cache{memory, disk}) }) } + +func TestTieredSoak(t *testing.T) { + if os.Getenv("SOAK_TEST") == "" { + t.Skip("Skipping soak test; set SOAK_TEST=1 to run") + } + + _, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelError}) + memory, err := cache.NewMemory(ctx, cache.MemoryConfig{ + LimitMB: 25, + MaxTTL: 10 * time.Minute, + }) + assert.NoError(t, err) + disk, err := cache.NewDisk(ctx, cache.DiskConfig{ + Root: t.TempDir(), + LimitMB: 50, + MaxTTL: 10 * time.Minute, + EvictInterval: time.Second, + }) + assert.NoError(t, err) + c := cache.MaybeNewTiered(ctx, []cache.Cache{memory, disk}) + defer c.Close() + + cachetest.Soak(t, c, cachetest.SoakConfig{ + Duration: time.Minute, + NumObjects: 500, + MaxObjectSize: 512 * 1024, + MinObjectSize: 1024, + OverwritePercent: 30, + Concurrency: 8, + TTL: 5 * time.Minute, + }) +} From c554eb70c88bb8bcf11e0cd185d77ee88f58ca6a Mon Sep 17 00:00:00 2001 From: Alec Thomas Date: Wed, 28 Jan 2026 19:45:31 +1100 Subject: [PATCH 07/10] chore: S3 soak tests --- ...lpha.79.pkg => .rustfs-1.0.0-alpha.81.pkg} | 0 bin/rustfs | 2 +- internal/cache/remote_test.go | 4 +-- internal/cache/s3.go | 36 ++++++++++++++++--- internal/cache/s3_test.go | 12 ++++--- internal/cache/tiered_test.go | 4 +-- 6 files changed, 43 insertions(+), 15 deletions(-) rename bin/{.rustfs-1.0.0-alpha.79.pkg => .rustfs-1.0.0-alpha.81.pkg} (100%) diff --git a/bin/.rustfs-1.0.0-alpha.79.pkg b/bin/.rustfs-1.0.0-alpha.81.pkg similarity index 100% rename from bin/.rustfs-1.0.0-alpha.79.pkg rename to bin/.rustfs-1.0.0-alpha.81.pkg diff --git a/bin/rustfs b/bin/rustfs index 35d2f92..4d561fa 120000 --- a/bin/rustfs +++ b/bin/rustfs @@ -1 +1 @@ -.rustfs-1.0.0-alpha.79.pkg \ No newline at end of file +.rustfs-1.0.0-alpha.81.pkg \ No newline at end of file diff --git a/internal/cache/remote_test.go b/internal/cache/remote_test.go index 1d6f08d..cc5a27d 100644 --- a/internal/cache/remote_test.go +++ b/internal/cache/remote_test.go @@ -17,7 +17,7 @@ import ( "github.com/block/cachew/internal/strategy" ) -func TestRemoteClient(t *testing.T) { +func TestRemoteCache(t *testing.T) { cachetest.Suite(t, func(t *testing.T) cache.Cache { ctx := t.Context() _, ctx = logging.Configure(ctx, logging.Config{Level: slog.LevelError}) @@ -38,7 +38,7 @@ func TestRemoteClient(t *testing.T) { }) } -func TestRemoteClientSoak(t *testing.T) { +func TestRemoteCacheSoak(t *testing.T) { if os.Getenv("SOAK_TEST") == "" { t.Skip("Skipping soak test; set SOAK_TEST=1 to run") } diff --git a/internal/cache/s3.go b/internal/cache/s3.go index 53fe1d6..9a99d6b 100644 --- a/internal/cache/s3.go +++ b/internal/cache/s3.go @@ -304,10 +304,24 @@ type s3Writer struct { headers http.Header ctx context.Context errCh chan error + uploadErr error } func (w *s3Writer) Write(p []byte) (int, error) { - return errors.WithStack2(w.pipe.Write(p)) + n, err := w.pipe.Write(p) + if err != nil { + // Check if upload failed - if so, return that error instead + select { + case uploadErr := <-w.errCh: + if uploadErr != nil { + w.uploadErr = uploadErr + return n, uploadErr + } + default: + } + return n, errors.WithStack(err) + } + return n, nil } func (w *s3Writer) Close() error { @@ -316,6 +330,11 @@ func (w *s3Writer) Close() error { return errors.Wrap(err, "failed to close pipe") } + // If we already captured the upload error during Write, return it + if w.uploadErr != nil { + return w.uploadErr + } + // Wait for upload to complete and get any error err := <-w.errCh if err != nil { @@ -326,7 +345,11 @@ func (w *s3Writer) Close() error { } func (w *s3Writer) upload(pr *io.PipeReader) { - defer pr.Close() + var uploadErr error + defer func() { + // Use CloseWithError to propagate any error to the writer side + _ = pr.CloseWithError(uploadErr) + }() objectName := w.s3.keyToPath(w.key) @@ -336,7 +359,8 @@ func (w *s3Writer) upload(pr *io.PipeReader) { // Store expiration time expiresAtBytes, err := w.expiresAt.MarshalText() if err != nil { - w.errCh <- errors.Errorf("failed to marshal expiration time: %w", err) + uploadErr = errors.Errorf("failed to marshal expiration time: %w", err) + w.errCh <- uploadErr return } userMetadata["Expires-At"] = string(expiresAtBytes) @@ -345,7 +369,8 @@ func (w *s3Writer) upload(pr *io.PipeReader) { if len(w.headers) > 0 { headersJSON, err := json.Marshal(w.headers) if err != nil { - w.errCh <- errors.Errorf("failed to marshal headers: %w", err) + uploadErr = errors.Errorf("failed to marshal headers: %w", err) + w.errCh <- uploadErr return } userMetadata["Headers"] = string(headersJSON) @@ -373,7 +398,8 @@ func (w *s3Writer) upload(pr *io.PipeReader) { opts, ) if err != nil { - w.errCh <- errors.Errorf("failed to put object: %w", err) + uploadErr = errors.Errorf("failed to put object: %w", err) + w.errCh <- uploadErr return } diff --git a/internal/cache/s3_test.go b/internal/cache/s3_test.go index e585366..9ca28ae 100644 --- a/internal/cache/s3_test.go +++ b/internal/cache/s3_test.go @@ -186,12 +186,14 @@ func TestS3CacheSoak(t *testing.T) { defer c.Close() cachetest.Soak(t, c, cachetest.SoakConfig{ - Duration: time.Minute, - NumObjects: 500, - MaxObjectSize: 512 * 1024, + Duration: 30 * time.Second, + NumObjects: 100, + MaxObjectSize: 64 * 1024, MinObjectSize: 1024, OverwritePercent: 30, - Concurrency: 8, - TTL: 5 * time.Minute, + // Concurrency limited to 1 due to rustfs bug with concurrent access to the same key. + // Real MinIO/S3 handles concurrency correctly. + Concurrency: 1, + TTL: 5 * time.Minute, }) } diff --git a/internal/cache/tiered_test.go b/internal/cache/tiered_test.go index af56d97..d6ae52f 100644 --- a/internal/cache/tiered_test.go +++ b/internal/cache/tiered_test.go @@ -13,7 +13,7 @@ import ( "github.com/block/cachew/internal/logging" ) -func TestTiered(t *testing.T) { +func TestTieredCache(t *testing.T) { cachetest.Suite(t, func(t *testing.T) cache.Cache { _, ctx := logging.Configure(t.Context(), logging.Config{}) memory, err := cache.NewMemory(ctx, cache.MemoryConfig{LimitMB: 1024, MaxTTL: time.Hour}) @@ -24,7 +24,7 @@ func TestTiered(t *testing.T) { }) } -func TestTieredSoak(t *testing.T) { +func TestTieredCacheSoak(t *testing.T) { if os.Getenv("SOAK_TEST") == "" { t.Skip("Skipping soak test; set SOAK_TEST=1 to run") } From 80070ea536ad08b27ac7a71f66b094114dcc138b Mon Sep 17 00:00:00 2001 From: Alec Thomas Date: Wed, 28 Jan 2026 20:05:47 +1100 Subject: [PATCH 08/10] fix: soak test race condition and remote client connection reuse --- internal/cache/cachetest/soak.go | 12 +++++++++--- internal/cache/remote.go | 16 +++++++++++++--- internal/cache/remote_test.go | 2 +- internal/strategy/apiv1.go | 6 +++--- 4 files changed, 26 insertions(+), 10 deletions(-) diff --git a/internal/cache/cachetest/soak.go b/internal/cache/cachetest/soak.go index a5838ea..4e123b6 100644 --- a/internal/cache/cachetest/soak.go +++ b/internal/cache/cachetest/soak.go @@ -206,17 +206,23 @@ func doWrite( _ = writer.Close() return } - - if err := writer.Close(); err != nil { - t.Errorf("failed to close cache entry: %+v", err) + if n != len(data) { + t.Errorf("short write: wrote %d of %d bytes", n, len(data)) + _ = writer.Close() return } + // Record hash BEFORE Close() to avoid race with concurrent reads hash := sha256.Sum256(data) mu.Lock() writtenHashes[keyIdx] = append(writtenHashes[keyIdx], hash) mu.Unlock() + if err := writer.Close(); err != nil { + t.Errorf("failed to close cache entry: %+v", err) + return + } + atomic.AddInt64(&result.Writes, 1) atomic.AddInt64(&result.BytesWritten, int64(n)) } diff --git a/internal/cache/remote.go b/internal/cache/remote.go index 6c49a39..05dac25 100644 --- a/internal/cache/remote.go +++ b/internal/cache/remote.go @@ -23,9 +23,13 @@ var _ Cache = (*Remote)(nil) // NewRemote creates a new remote cache client. func NewRemote(baseURL string) *Remote { + transport := http.DefaultTransport.(*http.Transport).Clone() //nolint:errcheck + transport.MaxIdleConns = 100 + transport.MaxIdleConnsPerHost = 100 + return &Remote{ baseURL: baseURL + "/api/v1", - client: &http.Client{}, + client: &http.Client{Transport: transport}, } } @@ -45,10 +49,12 @@ func (c *Remote) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, } if resp.StatusCode == http.StatusNotFound { + _, _ = io.Copy(io.Discard, resp.Body) //nolint:errcheck,gosec return nil, nil, errors.Join(os.ErrNotExist, resp.Body.Close()) } if resp.StatusCode != http.StatusOK { + _, _ = io.Copy(io.Discard, resp.Body) //nolint:errcheck,gosec return nil, nil, errors.Join(errors.Errorf("unexpected status code: %d", resp.StatusCode), resp.Body.Close()) } @@ -114,7 +120,8 @@ func (c *Remote) Create(ctx context.Context, key Key, headers http.Header, ttl t wc.done <- errors.Wrap(err, "failed to execute request") return } - defer resp.Body.Close() + _, _ = io.Copy(io.Discard, resp.Body) //nolint:errcheck,gosec + _ = resp.Body.Close() //nolint:gosec if resp.StatusCode != http.StatusOK { wc.done <- errors.Errorf("unexpected status code: %d", resp.StatusCode) @@ -202,9 +209,12 @@ func (wc *writeCloser) Write(p []byte) (int, error) { func (wc *writeCloser) Close() error { if err := wc.ctx.Err(); err != nil { - return errors.Join(errors.Wrap(err, "create operation cancelled"), wc.pw.CloseWithError(err)) + _ = wc.pw.CloseWithError(err) + <-wc.done // Wait for goroutine to finish and release connection + return errors.Wrap(err, "create operation cancelled") } if err := wc.pw.Close(); err != nil { + <-wc.done // Wait for goroutine to finish and release connection return errors.Wrap(err, "failed to close pipe writer") } err := <-wc.done diff --git a/internal/cache/remote_test.go b/internal/cache/remote_test.go index cc5a27d..dd54fed 100644 --- a/internal/cache/remote_test.go +++ b/internal/cache/remote_test.go @@ -67,7 +67,7 @@ func TestRemoteCacheSoak(t *testing.T) { MaxObjectSize: 512 * 1024, MinObjectSize: 1024, OverwritePercent: 30, - Concurrency: 8, + Concurrency: 4, TTL: 5 * time.Minute, }) } diff --git a/internal/strategy/apiv1.go b/internal/strategy/apiv1.go index 5266d2a..2251976 100644 --- a/internal/strategy/apiv1.go +++ b/internal/strategy/apiv1.go @@ -53,7 +53,7 @@ func (d *APIV1) statObject(w http.ResponseWriter, r *http.Request) { headers, err := d.cache.Stat(r.Context(), key) if err != nil { if errors.Is(err, os.ErrNotExist) { - d.httpError(w, http.StatusNotFound, err, "Cache object not found", slog.String("key", key.String())) + http.Error(w, "Cache object not found", http.StatusNotFound) return } d.httpError(w, http.StatusInternalServerError, err, "Failed to open cache object", slog.String("key", key.String())) @@ -74,7 +74,7 @@ func (d *APIV1) getObject(w http.ResponseWriter, r *http.Request) { cr, headers, err := d.cache.Open(r.Context(), key) if err != nil { if errors.Is(err, os.ErrNotExist) { - d.httpError(w, http.StatusNotFound, err, "Cache object not found", slog.String("key", key.String())) + http.Error(w, "Cache object not found", http.StatusNotFound) return } d.httpError(w, http.StatusInternalServerError, err, "Failed to open cache object", slog.String("key", key.String())) @@ -139,7 +139,7 @@ func (d *APIV1) deleteObject(w http.ResponseWriter, r *http.Request) { err = d.cache.Delete(r.Context(), key) if err != nil { if errors.Is(err, os.ErrNotExist) { - d.httpError(w, http.StatusNotFound, err, "Cache object not found", slog.String("key", key.String())) + http.Error(w, "Cache object not found", http.StatusNotFound) return } d.httpError(w, http.StatusInternalServerError, err, "Failed to delete cache object", slog.String("key", key.String())) From c6361710ea77198fc12f2652b96aa22882230b3a Mon Sep 17 00:00:00 2001 From: Alec Thomas Date: Wed, 28 Jan 2026 20:20:02 +1100 Subject: [PATCH 09/10] fix: switch to use minio, as rustfs is buggy --- bin/.rustfs-1.0.0-alpha.81.pkg | 1 - bin/rustfs | 1 - internal/cache/cachetest/soak.go | 6 ++ internal/cache/s3.go | 30 +++++++- internal/cache/s3_test.go | 113 ++++++++++++++----------------- 5 files changed, 83 insertions(+), 68 deletions(-) delete mode 120000 bin/.rustfs-1.0.0-alpha.81.pkg delete mode 120000 bin/rustfs diff --git a/bin/.rustfs-1.0.0-alpha.81.pkg b/bin/.rustfs-1.0.0-alpha.81.pkg deleted file mode 120000 index 383f451..0000000 --- a/bin/.rustfs-1.0.0-alpha.81.pkg +++ /dev/null @@ -1 +0,0 @@ -hermit \ No newline at end of file diff --git a/bin/rustfs b/bin/rustfs deleted file mode 120000 index 4d561fa..0000000 --- a/bin/rustfs +++ /dev/null @@ -1 +0,0 @@ -.rustfs-1.0.0-alpha.81.pkg \ No newline at end of file diff --git a/internal/cache/cachetest/soak.go b/internal/cache/cachetest/soak.go index 4e123b6..5490060 100644 --- a/internal/cache/cachetest/soak.go +++ b/internal/cache/cachetest/soak.go @@ -262,6 +262,12 @@ func doRead( data, err := io.ReadAll(reader) if err != nil { + // Object may have been deleted between Open and Read - treat as miss + if errors.Is(err, os.ErrNotExist) { + atomic.AddInt64(&result.ReadMisses, 1) + atomic.AddInt64(&result.Reads, 1) + return + } t.Errorf("failed to read cache entry: %+v", err) return } diff --git a/internal/cache/s3.go b/internal/cache/s3.go index 9a99d6b..f82fae7 100644 --- a/internal/cache/s3.go +++ b/internal/cache/s3.go @@ -168,7 +168,7 @@ func (s *S3) Stat(ctx context.Context, key Key) (http.Header, error) { objInfo, err := s.client.StatObject(ctx, s.config.Bucket, objectName, minio.StatObjectOptions{}) if err != nil { errResponse := minio.ToErrorResponse(err) - if errResponse.Code == "NoSuchKey" { + if errResponse.Code == s3ErrNoSuchKey { return nil, os.ErrNotExist } return nil, errors.Errorf("failed to stat object: %w", err) @@ -211,7 +211,7 @@ func (s *S3) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, err objInfo, err := s.client.StatObject(ctx, s.config.Bucket, objectName, minio.StatObjectOptions{}) if err != nil { errResponse := minio.ToErrorResponse(err) - if errResponse.Code == "NoSuchKey" { + if errResponse.Code == s3ErrNoSuchKey { return nil, nil, os.ErrNotExist } return nil, nil, errors.Errorf("failed to stat object: %w", err) @@ -247,7 +247,31 @@ func (s *S3) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, err return nil, nil, errors.Errorf("failed to get object: %w", err) } - return obj, headers, nil + return &s3Reader{obj: obj}, headers, nil +} + +const s3ErrNoSuchKey = "NoSuchKey" + +// s3Reader wraps minio.Object to convert S3 errors to standard errors. +type s3Reader struct { + obj *minio.Object +} + +func (r *s3Reader) Read(p []byte) (int, error) { + n, err := r.obj.Read(p) + if err == nil || errors.Is(err, io.EOF) { + return n, err //nolint:wrapcheck + } + // Convert NoSuchKey to os.ErrNotExist for consistency + errResponse := minio.ToErrorResponse(err) + if errResponse.Code == s3ErrNoSuchKey { + return n, os.ErrNotExist + } + return n, errors.WithStack(err) +} + +func (r *s3Reader) Close() error { + return errors.WithStack(r.obj.Close()) } func (s *S3) Create(ctx context.Context, key Key, headers http.Header, ttl time.Duration) (io.WriteCloser, error) { diff --git a/internal/cache/s3_test.go b/internal/cache/s3_test.go index 9ca28ae..1486fa1 100644 --- a/internal/cache/s3_test.go +++ b/internal/cache/s3_test.go @@ -18,62 +18,61 @@ import ( ) const ( - rustfsPort = "19000" - rustfsAddr = "localhost:" + rustfsPort - rustfsUsername = "rustfsadmin" - rustfsPassword = "rustfsadmin" - rustfsBucket = "test-bucket" + minioPort = "19000" + minioAddr = "localhost:" + minioPort + minioUsername = "minioadmin" + minioPassword = "minioadmin" + minioBucket = "test-bucket" ) -// startRustfs starts a rustfs server and returns a cleanup function. -func startRustfs(t *testing.T) { +// startMinio starts a MinIO server via Docker. +func startMinio(t *testing.T) { t.Helper() - dir := t.TempDir() + containerName := "minio-test-" + t.Name() - // Start rustfs server - cmd := exec.CommandContext(t.Context(), "rustfs", - "--address", ":"+rustfsPort, - "--access-key", rustfsUsername, - "--secret-key", rustfsPassword, - dir, + // Start MinIO container + cmd := exec.CommandContext(t.Context(), "docker", "run", "-d", + "--name", containerName, + "-p", minioPort+":9000", + "-e", "MINIO_ROOT_USER="+minioUsername, + "-e", "MINIO_ROOT_PASSWORD="+minioPassword, + "minio/minio", "server", "/data", ) - - err := cmd.Start() - assert.NoError(t, err) + output, err := cmd.CombinedOutput() + if err != nil { + t.Fatalf("failed to start minio container: %v\n%s", err, output) + } t.Cleanup(func() { - if cmd.Process != nil { - _ = cmd.Process.Kill() - _ = cmd.Wait() - } + _ = exec.Command("docker", "rm", "-f", containerName).Run() }) - // Wait for rustfs to be ready - waitForRustfs(t) + // Wait for MinIO to be ready + waitForMinio(t) // Create test bucket createBucket(t) } -// waitForRustfs waits for the rustfs server to be ready. -func waitForRustfs(t *testing.T) { +// waitForMinio waits for the MinIO server to be ready. +func waitForMinio(t *testing.T) { t.Helper() - client, err := minio.New(rustfsAddr, &minio.Options{ - Creds: credentials.NewStaticV4(rustfsUsername, rustfsPassword, ""), + client, err := minio.New(minioAddr, &minio.Options{ + Creds: credentials.NewStaticV4(minioUsername, minioPassword, ""), Secure: false, }) assert.NoError(t, err) - timeout := time.After(10 * time.Second) + timeout := time.After(30 * time.Second) ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() for { select { case <-timeout: - t.Fatal(errors.New("timed out waiting for rustfs to start")) + t.Fatal(errors.New("timed out waiting for minio to start")) case <-ticker.C: _, err := client.ListBuckets(t.Context()) if err == nil { @@ -83,21 +82,21 @@ func waitForRustfs(t *testing.T) { } } -// createBucket creates the test bucket in rustfs. +// createBucket creates the test bucket in MinIO. func createBucket(t *testing.T) { t.Helper() - client, err := minio.New(rustfsAddr, &minio.Options{ - Creds: credentials.NewStaticV4(rustfsUsername, rustfsPassword, ""), + client, err := minio.New(minioAddr, &minio.Options{ + Creds: credentials.NewStaticV4(minioUsername, minioPassword, ""), Secure: false, }) assert.NoError(t, err) - exists, err := client.BucketExists(t.Context(), rustfsBucket) + exists, err := client.BucketExists(t.Context(), minioBucket) assert.NoError(t, err) if !exists { - err = client.MakeBucket(t.Context(), rustfsBucket, minio.MakeBucketOptions{}) + err = client.MakeBucket(t.Context(), minioBucket, minio.MakeBucketOptions{}) assert.NoError(t, err) } } @@ -106,34 +105,24 @@ func createBucket(t *testing.T) { func cleanBucket(t *testing.T) { t.Helper() - client, err := minio.New(rustfsAddr, &minio.Options{ - Creds: credentials.NewStaticV4(rustfsUsername, rustfsPassword, ""), + client, err := minio.New(minioAddr, &minio.Options{ + Creds: credentials.NewStaticV4(minioUsername, minioPassword, ""), Secure: false, }) assert.NoError(t, err) - objectsCh := client.ListObjects(t.Context(), rustfsBucket, minio.ListObjectsOptions{Recursive: true}) + objectsCh := client.ListObjects(t.Context(), minioBucket, minio.ListObjectsOptions{Recursive: true}) for obj := range objectsCh { if obj.Err != nil { continue } - _ = client.RemoveObject(t.Context(), rustfsBucket, obj.Key, minio.RemoveObjectOptions{}) + _ = client.RemoveObject(t.Context(), minioBucket, obj.Key, minio.RemoveObjectOptions{}) } } -// TestS3Cache tests the S3 cache implementation using rustfs. -// -// This test starts a rustfs server per test run. -// The rustfs binary must be available in PATH (managed by Hermit). -// -// The rustfs server: -// - Starts once per test run -// - Uses credentials: rustfsadmin/rustfsadmin -// - Listens on port 19000 -// - Stores data in a temporary directory -// - Cleans up automatically after the test completes +// TestS3Cache tests the S3 cache implementation using MinIO in Docker. func TestS3Cache(t *testing.T) { - startRustfs(t) + startMinio(t) cachetest.Suite(t, func(t *testing.T) cache.Cache { _, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelDebug}) @@ -142,12 +131,12 @@ func TestS3Cache(t *testing.T) { cleanBucket(t) // Set credentials via environment variables for the AWS credential chain - t.Setenv("AWS_ACCESS_KEY_ID", rustfsUsername) - t.Setenv("AWS_SECRET_ACCESS_KEY", rustfsPassword) + t.Setenv("AWS_ACCESS_KEY_ID", minioUsername) + t.Setenv("AWS_SECRET_ACCESS_KEY", minioPassword) c, err := cache.NewS3(ctx, cache.S3Config{ - Endpoint: rustfsAddr, - Bucket: rustfsBucket, + Endpoint: minioAddr, + Bucket: minioBucket, Region: "", UseSSL: false, MaxTTL: 100 * time.Millisecond, @@ -163,7 +152,7 @@ func TestS3CacheSoak(t *testing.T) { t.Skip("Skipping soak test; set SOAK_TEST=1 to run") } - startRustfs(t) + startMinio(t) _, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelError}) @@ -171,12 +160,12 @@ func TestS3CacheSoak(t *testing.T) { cleanBucket(t) // Set credentials via environment variables for the AWS credential chain - t.Setenv("AWS_ACCESS_KEY_ID", rustfsUsername) - t.Setenv("AWS_SECRET_ACCESS_KEY", rustfsPassword) + t.Setenv("AWS_ACCESS_KEY_ID", minioUsername) + t.Setenv("AWS_SECRET_ACCESS_KEY", minioPassword) c, err := cache.NewS3(ctx, cache.S3Config{ - Endpoint: rustfsAddr, - Bucket: rustfsBucket, + Endpoint: minioAddr, + Bucket: minioBucket, Region: "", UseSSL: false, MaxTTL: 10 * time.Minute, @@ -191,9 +180,7 @@ func TestS3CacheSoak(t *testing.T) { MaxObjectSize: 64 * 1024, MinObjectSize: 1024, OverwritePercent: 30, - // Concurrency limited to 1 due to rustfs bug with concurrent access to the same key. - // Real MinIO/S3 handles concurrency correctly. - Concurrency: 1, - TTL: 5 * time.Minute, + Concurrency: 4, + TTL: 5 * time.Minute, }) } From 1cad8a3232aac2fd58c9adc1e0c2d184715f6914 Mon Sep 17 00:00:00 2001 From: Alec Thomas Date: Wed, 28 Jan 2026 20:30:01 +1100 Subject: [PATCH 10/10] fix(disk): subtract old file size on overwrite --- internal/cache/disk.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/internal/cache/disk.go b/internal/cache/disk.go index 7efbdbb..8ed7a29 100644 --- a/internal/cache/disk.go +++ b/internal/cache/disk.go @@ -421,6 +421,11 @@ func (w *diskWriter) Close() error { return errors.Errorf("failed to create directory: %w", err) } + // Check if we're overwriting an existing file and subtract its size + if info, err := os.Stat(w.path); err == nil { + w.disk.size.Add(-info.Size()) + } + if err := os.Rename(w.tempPath, w.path); err != nil { return errors.Errorf("failed to rename temp file: %w", err) }