diff --git a/bin/.rustfs-1.0.0-alpha.79.pkg b/bin/.rustfs-1.0.0-alpha.79.pkg deleted file mode 120000 index 383f451..0000000 --- a/bin/.rustfs-1.0.0-alpha.79.pkg +++ /dev/null @@ -1 +0,0 @@ -hermit \ No newline at end of file diff --git a/bin/rustfs b/bin/rustfs deleted file mode 120000 index 35d2f92..0000000 --- a/bin/rustfs +++ /dev/null @@ -1 +0,0 @@ -.rustfs-1.0.0-alpha.79.pkg \ No newline at end of file diff --git a/cmd/cachew/main.go b/cmd/cachew/main.go index 644f38a..547a167 100644 --- a/cmd/cachew/main.go +++ b/cmd/cachew/main.go @@ -211,10 +211,5 @@ func (pk *PlatformKey) AfterApply(cli *CLI) error { prefixed = now.Format("2006-01-02-") + prefixed } - // Only print debug if we actually modified the key - if prefixed != pk.raw { - fmt.Fprintf(os.Stderr, "[DEBUG] Key transform: %s -> %s\n", pk.raw, prefixed) //nolint:forbidigo - } - return errors.WithStack(pk.key.UnmarshalText([]byte(prefixed))) } diff --git a/internal/cache/api.go b/internal/cache/api.go index d794872..4f33473 100644 --- a/internal/cache/api.go +++ b/internal/cache/api.go @@ -16,6 +16,9 @@ import ( // ErrNotFound is returned when a cache backend is not found. var ErrNotFound = errors.New("cache backend not found") +// ErrStatsUnavailable is returned when a cache backend cannot provide statistics. +var ErrStatsUnavailable = errors.New("stats unavailable") + type registryEntry struct { schema *hcl.Block factory func(ctx context.Context, config *hcl.Block) (Cache, error) @@ -112,6 +115,16 @@ func FilterTransportHeaders(headers http.Header) http.Header { return filtered } +// Stats contains health and usage statistics for a cache. +type Stats struct { + // Objects is the number of objects currently in the cache. + Objects int64 `json:"objects"` + // Size is the total size of all objects in the cache in bytes. + Size int64 `json:"size"` + // Capacity is the maximum size of the cache in bytes (0 if unlimited). + Capacity int64 `json:"capacity"` +} + // A Cache knows how to retrieve, create and delete objects from a cache. // // Objects in the cache are not guaranteed to persist and implementations may delete them at any time. @@ -141,6 +154,8 @@ type Cache interface { // // MUST be atomic. Delete(ctx context.Context, key Key) error + // Stats returns health and usage statistics for the cache. + Stats(ctx context.Context) (Stats, error) // Close the Cache. Close() error } diff --git a/internal/cache/cachetest/soak.go b/internal/cache/cachetest/soak.go new file mode 100644 index 0000000..5490060 --- /dev/null +++ b/internal/cache/cachetest/soak.go @@ -0,0 +1,346 @@ +package cachetest + +import ( + "context" + "crypto/rand" + "crypto/sha256" + "errors" + "fmt" + "io" + mrand "math/rand/v2" + "os" + "runtime" + "slices" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/alecthomas/assert/v2" + + "github.com/block/cachew/internal/cache" +) + +// SoakConfig configures the soak test parameters. +type SoakConfig struct { + // Duration is how long to run the soak test (default: 1 minute). + Duration time.Duration + // NumObjects is the total number of unique object keys to use (default: 1000). + NumObjects int + // MaxObjectSize is the maximum size of each object in bytes (default: 1MB). + MaxObjectSize int + // MinObjectSize is the minimum size of each object in bytes (default: 1KB). + MinObjectSize int + // OverwritePercent is the percentage of writes that should overwrite existing keys (default: 20). + OverwritePercent int + // Concurrency is the number of concurrent goroutines writing to the cache (default: 4). + Concurrency int + // TTL is the time-to-live for cache entries (default: 1 hour). + TTL time.Duration +} + +func (c *SoakConfig) setDefaults() { + if c.Duration == 0 { + c.Duration = time.Minute + } + if c.NumObjects == 0 { + c.NumObjects = 1000 + } + if c.MaxObjectSize == 0 { + c.MaxObjectSize = 1024 * 1024 // 1MB + } + if c.MinObjectSize == 0 { + c.MinObjectSize = 1024 // 1KB + } + if c.OverwritePercent == 0 { + c.OverwritePercent = 20 + } + if c.Concurrency == 0 { + c.Concurrency = 4 + } + if c.TTL == 0 { + c.TTL = time.Hour + } +} + +// SoakResult contains the results of a soak test run. +type SoakResult struct { + Writes int64 + Reads int64 + ReadHits int64 + ReadMisses int64 + Deletes int64 + BytesWritten int64 + Duration time.Duration + + // Memory stats + HeapAllocStart uint64 + HeapAllocEnd uint64 + TotalAlloc uint64 + NumGC uint32 +} + +// Soak runs an extended soak test against a cache implementation. +// +// The test writes random objects of varying sizes, with some overwrites, +// and verifies that the cache behaves correctly under sustained load. +// It also performs periodic reads and deletes. +func Soak(t *testing.T, c cache.Cache, config SoakConfig) SoakResult { + config.setDefaults() + + ctx, cancel := context.WithTimeout(t.Context(), config.Duration+time.Minute) + defer cancel() + + var result SoakResult + var mu sync.Mutex + // key index -> list of SHA256 hashes of all values ever written to this key + writtenHashes := make(map[int][][32]byte) + + // Capture initial memory stats + runtime.GC() + var memStart runtime.MemStats + runtime.ReadMemStats(&memStart) + result.HeapAllocStart = memStart.HeapAlloc + + startTime := time.Now() + deadline := startTime.Add(config.Duration) + + var wg sync.WaitGroup + for i := range config.Concurrency { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + soakWorker(ctx, t, c, &config, deadline, workerID, &result, &mu, writtenHashes) + }(i) + } + + wg.Wait() + result.Duration = time.Since(startTime) + + // Capture final memory stats + runtime.GC() + var memEnd runtime.MemStats + runtime.ReadMemStats(&memEnd) + result.HeapAllocEnd = memEnd.HeapAlloc + result.TotalAlloc = memEnd.TotalAlloc - memStart.TotalAlloc + result.NumGC = memEnd.NumGC - memStart.NumGC + + verifyHealth(t, c, &result) + + return result +} + +func soakWorker( + ctx context.Context, + t *testing.T, + c cache.Cache, + config *SoakConfig, + deadline time.Time, + workerID int, + result *SoakResult, + mu *sync.Mutex, + writtenHashes map[int][][32]byte, +) { + //nolint:gosec // math/rand is fine for soak testing, we don't need cryptographic randomness for test operations + rng := mrand.New(mrand.NewPCG(uint64(workerID), uint64(time.Now().UnixNano()))) + + for time.Now().Before(deadline) { + select { + case <-ctx.Done(): + return + default: + } + + op := rng.IntN(100) + switch { + case op < 60: // 60% writes + doWrite(ctx, t, c, config, rng, result, mu, writtenHashes) + case op < 90: // 30% reads + doRead(ctx, t, c, config, rng, result, mu, writtenHashes) + default: // 10% deletes + doDelete(ctx, t, c, config, rng, result) + } + } +} + +func doWrite( + ctx context.Context, + t *testing.T, + c cache.Cache, + config *SoakConfig, + rng *mrand.Rand, + result *SoakResult, + mu *sync.Mutex, + writtenHashes map[int][][32]byte, +) { + var keyIdx int + mu.Lock() + numWritten := len(writtenHashes) + mu.Unlock() + + if numWritten > 0 && rng.IntN(100) < config.OverwritePercent { + // Overwrite an existing key + keyIdx = rng.IntN(min(numWritten, config.NumObjects)) + } else { + // Write a new key + keyIdx = rng.IntN(config.NumObjects) + } + + size := config.MinObjectSize + rng.IntN(config.MaxObjectSize-config.MinObjectSize+1) + data := make([]byte, size) + if _, err := rand.Read(data); err != nil { + t.Errorf("failed to generate random data: %+v", err) + return + } + + key := cache.NewKey(fmt.Sprintf("soak-key-%d", keyIdx)) + writer, err := c.Create(ctx, key, nil, config.TTL) + if err != nil { + t.Errorf("failed to create cache entry: %+v", err) + return + } + + n, err := writer.Write(data) + if err != nil { + t.Errorf("failed to write cache entry: %+v", err) + _ = writer.Close() + return + } + if n != len(data) { + t.Errorf("short write: wrote %d of %d bytes", n, len(data)) + _ = writer.Close() + return + } + + // Record hash BEFORE Close() to avoid race with concurrent reads + hash := sha256.Sum256(data) + mu.Lock() + writtenHashes[keyIdx] = append(writtenHashes[keyIdx], hash) + mu.Unlock() + + if err := writer.Close(); err != nil { + t.Errorf("failed to close cache entry: %+v", err) + return + } + + atomic.AddInt64(&result.Writes, 1) + atomic.AddInt64(&result.BytesWritten, int64(n)) +} + +func doRead( + ctx context.Context, + t *testing.T, + c cache.Cache, + config *SoakConfig, + rng *mrand.Rand, + result *SoakResult, + mu *sync.Mutex, + writtenHashes map[int][][32]byte, +) { + mu.Lock() + numWritten := len(writtenHashes) + mu.Unlock() + + if numWritten == 0 { + return + } + + keyIdx := rng.IntN(config.NumObjects) + key := cache.NewKey(fmt.Sprintf("soak-key-%d", keyIdx)) + + reader, _, err := c.Open(ctx, key) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + atomic.AddInt64(&result.ReadMisses, 1) + atomic.AddInt64(&result.Reads, 1) + return + } + t.Errorf("failed to open cache entry: %+v", err) + return + } + defer reader.Close() + + data, err := io.ReadAll(reader) + if err != nil { + // Object may have been deleted between Open and Read - treat as miss + if errors.Is(err, os.ErrNotExist) { + atomic.AddInt64(&result.ReadMisses, 1) + atomic.AddInt64(&result.Reads, 1) + return + } + t.Errorf("failed to read cache entry: %+v", err) + return + } + + // Hash the data we read, then check against all historical writes + readHash := sha256.Sum256(data) + mu.Lock() + hashes := writtenHashes[keyIdx] + mu.Unlock() + + if !slices.Contains(hashes, readHash) { + t.Errorf("data mismatch for key %d: read %d bytes with hash not in %d historical writes", + keyIdx, len(data), len(hashes)) + return + } + + atomic.AddInt64(&result.ReadHits, 1) + atomic.AddInt64(&result.Reads, 1) +} + +func doDelete( + ctx context.Context, + t *testing.T, + c cache.Cache, + config *SoakConfig, + rng *mrand.Rand, + result *SoakResult, +) { + keyIdx := rng.IntN(config.NumObjects) + key := cache.NewKey(fmt.Sprintf("soak-key-%d", keyIdx)) + + if err := c.Delete(ctx, key); err != nil { + if errors.Is(err, os.ErrNotExist) { + return + } + t.Errorf("failed to delete cache entry: %+v", err) + return + } + + atomic.AddInt64(&result.Deletes, 1) +} + +func verifyHealth(t *testing.T, c cache.Cache, result *SoakResult) { + t.Logf("Soak test completed:") + t.Logf(" Duration: %v", result.Duration) + t.Logf(" Writes: %d (%.1f/sec)", result.Writes, float64(result.Writes)/result.Duration.Seconds()) + t.Logf(" Reads: %d (hits: %d, misses: %d)", result.Reads, result.ReadHits, result.ReadMisses) + t.Logf(" Deletes: %d", result.Deletes) + t.Logf(" Bytes written: %d MB", result.BytesWritten/(1024*1024)) + t.Logf("Memory stats:") + t.Logf(" Heap start: %.1f MB", float64(result.HeapAllocStart)/(1024*1024)) + t.Logf(" Heap end: %.1f MB", float64(result.HeapAllocEnd)/(1024*1024)) + t.Logf(" Total allocated: %.1f MB", float64(result.TotalAlloc)/(1024*1024)) + t.Logf(" GC cycles: %d", result.NumGC) + + stats, err := c.Stats(context.Background()) + if errors.Is(err, cache.ErrStatsUnavailable) { + t.Logf("Cache stats: unavailable") + return + } + assert.NoError(t, err, "failed to get cache stats") + + t.Logf("Cache stats:") + t.Logf(" Objects: %d", stats.Objects) + t.Logf(" Size: %d MB", stats.Size/(1024*1024)) + t.Logf(" Capacity: %d MB", stats.Capacity/(1024*1024)) + + // Verify size is within capacity (allow some slack for in-flight writes) + if stats.Capacity > 0 { + assert.True(t, stats.Size <= stats.Capacity*2, + "cache size (%d) exceeds capacity x 2 (%d)", stats.Size, stats.Capacity*2) + } + + // Verify object count is non-negative + assert.True(t, stats.Objects >= 0, "object count should be non-negative") +} diff --git a/internal/cache/disk.go b/internal/cache/disk.go index 63b5b08..8ed7a29 100644 --- a/internal/cache/disk.go +++ b/internal/cache/disk.go @@ -35,12 +35,13 @@ type DiskConfig struct { } type Disk struct { - logger *slog.Logger - config DiskConfig - db *diskMetaDB - size atomic.Int64 - runEviction chan struct{} - stop context.CancelFunc + logger *slog.Logger + config DiskConfig + db *diskMetaDB + size atomic.Int64 + runEviction chan struct{} + stop context.CancelFunc + evictionDone chan struct{} } var _ Cache = (*Disk)(nil) @@ -102,11 +103,12 @@ func NewDisk(ctx context.Context, config DiskConfig) (*Disk, error) { ctx, stop := context.WithCancel(ctx) disk := &Disk{ - logger: logger, - config: config, - db: db, - runEviction: make(chan struct{}), - stop: stop, + logger: logger, + config: config, + db: db, + runEviction: make(chan struct{}), + stop: stop, + evictionDone: make(chan struct{}), } disk.size.Store(size) @@ -119,6 +121,7 @@ func (d *Disk) String() string { return "disk:" + d.config.Root } func (d *Disk) Close() error { d.stop() + <-d.evictionDone if d.db != nil { return d.db.close() } @@ -129,6 +132,18 @@ func (d *Disk) Size() int64 { return d.size.Load() } +func (d *Disk) Stats(_ context.Context) (Stats, error) { + count, err := d.db.count() + if err != nil { + return Stats{}, err + } + return Stats{ + Objects: count, + Size: d.size.Load(), + Capacity: int64(d.config.LimitMB) * 1024 * 1024, + }, nil +} + func (d *Disk) Create(ctx context.Context, key Key, headers http.Header, ttl time.Duration) (io.WriteCloser, error) { if ttl > d.config.MaxTTL || ttl == 0 { ttl = d.config.MaxTTL @@ -150,8 +165,7 @@ func (d *Disk) Create(ctx context.Context, key Key, headers http.Header, ttl tim return nil, errors.Errorf("failed to create directory %s: %w", dir, err) } - tempPath := fullPath + ".tmp" - f, err := os.OpenFile(tempPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600) + f, err := os.CreateTemp(dir, ".tmp-*") if err != nil { return nil, errors.Errorf("failed to create temp file: %w", err) } @@ -163,7 +177,7 @@ func (d *Disk) Create(ctx context.Context, key Key, headers http.Header, ttl tim file: f, key: key, path: fullPath, - tempPath: tempPath, + tempPath: f.Name(), expiresAt: expiresAt, headers: clonedHeaders, ctx: ctx, @@ -190,7 +204,7 @@ func (d *Disk) Delete(_ context.Context, key Key) error { return errors.Errorf("failed to remove file: %w", err) } - // Remove TTL metadata + // Remove metadata if err := d.db.delete(key); err != nil { return errors.Errorf("failed to delete TTL metadata: %w", err) } @@ -239,7 +253,7 @@ func (d *Disk) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, e expiresAt, err := d.db.getTTL(key) if err != nil { - return nil, nil, errors.Join(errors.Errorf("failed to get TTL: %w", err), f.Close()) + return nil, nil, errors.Join(err, f.Close()) } now := time.Now() @@ -270,6 +284,8 @@ func (d *Disk) keyToPath(key Key) string { } func (d *Disk) evictionLoop(ctx context.Context) { + defer close(d.evictionDone) + ticker := time.NewTicker(d.config.EvictInterval) defer ticker.Stop() @@ -399,6 +415,17 @@ func (w *diskWriter) Close() error { return errors.Join(errors.Wrap(err, "create operation cancelled"), os.Remove(w.tempPath)) } + // Ensure directory exists (eviction may have removed it) + dir := filepath.Dir(w.path) + if err := os.MkdirAll(dir, 0750); err != nil { + return errors.Errorf("failed to create directory: %w", err) + } + + // Check if we're overwriting an existing file and subtract its size + if info, err := os.Stat(w.path); err == nil { + w.disk.size.Add(-info.Size()) + } + if err := os.Rename(w.tempPath, w.path); err != nil { return errors.Errorf("failed to rename temp file: %w", err) } diff --git a/internal/cache/disk_metadb.go b/internal/cache/disk_metadb.go index 82017df..0cc8979 100644 --- a/internal/cache/disk_metadb.go +++ b/internal/cache/disk_metadb.go @@ -2,6 +2,7 @@ package cache import ( "encoding/json" + "io/fs" "net/http" "time" @@ -83,7 +84,7 @@ func (s *diskMetaDB) getTTL(key Key) (time.Time, error) { bucket := tx.Bucket(ttlBucketName) ttlBytes := bucket.Get(key[:]) if ttlBytes == nil { - return errors.New("key not found") + return fs.ErrNotExist } return errors.WithStack(expiresAt.UnmarshalBinary(ttlBytes)) }) @@ -96,7 +97,7 @@ func (s *diskMetaDB) getHeaders(key Key) (http.Header, error) { bucket := tx.Bucket(headersBucketName) headersBytes := bucket.Get(key[:]) if headersBytes == nil { - return errors.New("key not found") + return fs.ErrNotExist } return errors.WithStack(json.Unmarshal(headersBytes, &headers)) }) @@ -156,6 +157,19 @@ func (s *diskMetaDB) walk(fn func(key Key, expiresAt time.Time) error) error { })) } +func (s *diskMetaDB) count() (int64, error) { + var count int64 + err := s.db.View(func(tx *bbolt.Tx) error { + bucket := tx.Bucket(ttlBucketName) + if bucket == nil { + return nil + } + count = int64(bucket.Stats().KeyN) + return nil + }) + return count, errors.WithStack(err) +} + func (s *diskMetaDB) close() error { if err := s.db.Close(); err != nil { return errors.Errorf("failed to close bbolt database: %w", err) diff --git a/internal/cache/disk_test.go b/internal/cache/disk_test.go index 117119a..2e7b12a 100644 --- a/internal/cache/disk_test.go +++ b/internal/cache/disk_test.go @@ -2,6 +2,7 @@ package cache_test import ( "log/slog" + "os" "testing" "time" @@ -24,3 +25,30 @@ func TestDiskCache(t *testing.T) { return c }) } + +func TestDiskCacheSoak(t *testing.T) { + if os.Getenv("SOAK_TEST") == "" { + t.Skip("Skipping soak test; set SOAK_TEST=1 to run") + } + + dir := t.TempDir() + _, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelError}) + c, err := cache.NewDisk(ctx, cache.DiskConfig{ + Root: dir, + LimitMB: 50, + MaxTTL: 10 * time.Minute, + EvictInterval: time.Second, + }) + assert.NoError(t, err) + defer c.Close() + + cachetest.Soak(t, c, cachetest.SoakConfig{ + Duration: time.Minute, + NumObjects: 500, + MaxObjectSize: 512 * 1024, + MinObjectSize: 1024, + OverwritePercent: 30, + Concurrency: 8, + TTL: 5 * time.Minute, + }) +} diff --git a/internal/cache/memory.go b/internal/cache/memory.go index 91f8c41..d9940d1 100644 --- a/internal/cache/memory.go +++ b/internal/cache/memory.go @@ -130,6 +130,53 @@ func (m *Memory) Close() error { return nil } +func (m *Memory) Stats(_ context.Context) (Stats, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + return Stats{ + Objects: int64(len(m.entries)), + Size: m.currentSize, + Capacity: int64(m.config.LimitMB) * 1024 * 1024, + }, nil +} + +func (m *Memory) evictOldest(neededSpace int64) { + type entryInfo struct { + key Key + size int64 + expiresAt time.Time + } + + var entries []entryInfo + for k, e := range m.entries { + entries = append(entries, entryInfo{ + key: k, + size: int64(len(e.data)), + expiresAt: e.expiresAt, + }) + } + + // Sort by expiry time (earliest first) + for i := 0; i < len(entries); i++ { + for j := i + 1; j < len(entries); j++ { + if entries[i].expiresAt.After(entries[j].expiresAt) { + entries[i], entries[j] = entries[j], entries[i] + } + } + } + + freedSpace := int64(0) + for _, e := range entries { + if freedSpace >= neededSpace { + break + } + m.currentSize -= e.size + delete(m.entries, e.key) + freedSpace += e.size + } +} + type memoryWriter struct { cache *Memory key Key @@ -179,8 +226,12 @@ func (w *memoryWriter) Close() error { } w.cache.currentSize -= oldSize + // Copy the buffer data to avoid holding a reference to the buffer's internal slice + data := make([]byte, w.buf.Len()) + copy(data, w.buf.Bytes()) + w.buf.Reset() w.cache.entries[w.key] = &memoryEntry{ - data: w.buf.Bytes(), + data: data, expiresAt: w.expiresAt, headers: w.headers, } @@ -188,39 +239,3 @@ func (w *memoryWriter) Close() error { return nil } - -func (m *Memory) evictOldest(neededSpace int64) { - type entryInfo struct { - key Key - size int64 - expiresAt time.Time - } - - var entries []entryInfo - for k, e := range m.entries { - entries = append(entries, entryInfo{ - key: k, - size: int64(len(e.data)), - expiresAt: e.expiresAt, - }) - } - - // Sort by expiry time (earliest first) - for i := 0; i < len(entries); i++ { - for j := i + 1; j < len(entries); j++ { - if entries[i].expiresAt.After(entries[j].expiresAt) { - entries[i], entries[j] = entries[j], entries[i] - } - } - } - - freedSpace := int64(0) - for _, e := range entries { - if freedSpace >= neededSpace { - break - } - m.currentSize -= e.size - delete(m.entries, e.key) - freedSpace += e.size - } -} diff --git a/internal/cache/memory_test.go b/internal/cache/memory_test.go index 32eb9eb..c0a8fff 100644 --- a/internal/cache/memory_test.go +++ b/internal/cache/memory_test.go @@ -2,6 +2,7 @@ package cache_test import ( "log/slog" + "os" "testing" "time" @@ -20,3 +21,27 @@ func TestMemoryCache(t *testing.T) { return c }) } + +func TestMemoryCacheSoak(t *testing.T) { + if os.Getenv("SOAK_TEST") == "" { + t.Skip("Skipping soak test; set SOAK_TEST=1 to run") + } + + _, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelError}) + c, err := cache.NewMemory(ctx, cache.MemoryConfig{ + LimitMB: 50, + MaxTTL: 10 * time.Minute, + }) + assert.NoError(t, err) + defer c.Close() + + cachetest.Soak(t, c, cachetest.SoakConfig{ + Duration: time.Minute, + NumObjects: 500, + MaxObjectSize: 512 * 1024, + MinObjectSize: 1024, + OverwritePercent: 30, + Concurrency: 8, + TTL: 5 * time.Minute, + }) +} diff --git a/internal/cache/remote.go b/internal/cache/remote.go index fff4142..05dac25 100644 --- a/internal/cache/remote.go +++ b/internal/cache/remote.go @@ -2,6 +2,7 @@ package cache import ( "context" + "encoding/json" "fmt" "io" "maps" @@ -22,9 +23,13 @@ var _ Cache = (*Remote)(nil) // NewRemote creates a new remote cache client. func NewRemote(baseURL string) *Remote { + transport := http.DefaultTransport.(*http.Transport).Clone() //nolint:errcheck + transport.MaxIdleConns = 100 + transport.MaxIdleConnsPerHost = 100 + return &Remote{ - baseURL: baseURL + "/api/v1/object", - client: &http.Client{}, + baseURL: baseURL + "/api/v1", + client: &http.Client{Transport: transport}, } } @@ -32,7 +37,7 @@ func (c *Remote) String() string { return "remote:" + c.baseURL } // Open retrieves an object from the remote. func (c *Remote) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, error) { - url := fmt.Sprintf("%s/%s", c.baseURL, key.String()) + url := fmt.Sprintf("%s/object/%s", c.baseURL, key.String()) req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { return nil, nil, errors.Wrap(err, "failed to create request") @@ -44,10 +49,12 @@ func (c *Remote) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, } if resp.StatusCode == http.StatusNotFound { + _, _ = io.Copy(io.Discard, resp.Body) //nolint:errcheck,gosec return nil, nil, errors.Join(os.ErrNotExist, resp.Body.Close()) } if resp.StatusCode != http.StatusOK { + _, _ = io.Copy(io.Discard, resp.Body) //nolint:errcheck,gosec return nil, nil, errors.Join(errors.Errorf("unexpected status code: %d", resp.StatusCode), resp.Body.Close()) } @@ -59,7 +66,7 @@ func (c *Remote) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, // Stat retrieves headers for an object from the remote. func (c *Remote) Stat(ctx context.Context, key Key) (http.Header, error) { - url := fmt.Sprintf("%s/%s", c.baseURL, key.String()) + url := fmt.Sprintf("%s/object/%s", c.baseURL, key.String()) req, err := http.NewRequestWithContext(ctx, http.MethodHead, url, nil) if err != nil { return nil, errors.Wrap(err, "failed to create request") @@ -89,7 +96,7 @@ func (c *Remote) Stat(ctx context.Context, key Key) (http.Header, error) { func (c *Remote) Create(ctx context.Context, key Key, headers http.Header, ttl time.Duration) (io.WriteCloser, error) { pr, pw := io.Pipe() - url := fmt.Sprintf("%s/%s", c.baseURL, key.String()) + url := fmt.Sprintf("%s/object/%s", c.baseURL, key.String()) req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, pr) if err != nil { return nil, errors.Join(errors.Wrap(err, "failed to create request"), pr.Close(), pw.Close()) @@ -113,7 +120,8 @@ func (c *Remote) Create(ctx context.Context, key Key, headers http.Header, ttl t wc.done <- errors.Wrap(err, "failed to execute request") return } - defer resp.Body.Close() + _, _ = io.Copy(io.Discard, resp.Body) //nolint:errcheck,gosec + _ = resp.Body.Close() //nolint:gosec if resp.StatusCode != http.StatusOK { wc.done <- errors.Errorf("unexpected status code: %d", resp.StatusCode) @@ -128,7 +136,7 @@ func (c *Remote) Create(ctx context.Context, key Key, headers http.Header, ttl t // Delete removes an object from the remote. func (c *Remote) Delete(ctx context.Context, key Key) error { - url := fmt.Sprintf("%s/%s", c.baseURL, key.String()) + url := fmt.Sprintf("%s/object/%s", c.baseURL, key.String()) req, err := http.NewRequestWithContext(ctx, http.MethodDelete, url, nil) if err != nil { return errors.Wrap(err, "failed to create request") @@ -157,6 +165,36 @@ func (c *Remote) Close() error { return nil } +// Stats retrieves cache statistics from the remote server. +func (c *Remote) Stats(ctx context.Context) (Stats, error) { + url := c.baseURL + "/stats" + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return Stats{}, errors.Wrap(err, "failed to create request") + } + + resp, err := c.client.Do(req) + if err != nil { + return Stats{}, errors.Wrap(err, "failed to execute request") + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusNotImplemented { + return Stats{}, ErrStatsUnavailable + } + + if resp.StatusCode != http.StatusOK { + return Stats{}, errors.Errorf("unexpected status code: %d", resp.StatusCode) + } + + var stats Stats + if err := json.NewDecoder(resp.Body).Decode(&stats); err != nil { + return Stats{}, errors.Wrap(err, "failed to decode stats response") + } + + return stats, nil +} + // writeCloser wraps a pipe writer and waits for the HTTP request to complete. type writeCloser struct { pw *io.PipeWriter @@ -171,9 +209,12 @@ func (wc *writeCloser) Write(p []byte) (int, error) { func (wc *writeCloser) Close() error { if err := wc.ctx.Err(); err != nil { - return errors.Join(errors.Wrap(err, "create operation cancelled"), wc.pw.CloseWithError(err)) + _ = wc.pw.CloseWithError(err) + <-wc.done // Wait for goroutine to finish and release connection + return errors.Wrap(err, "create operation cancelled") } if err := wc.pw.Close(); err != nil { + <-wc.done // Wait for goroutine to finish and release connection return errors.Wrap(err, "failed to close pipe writer") } err := <-wc.done diff --git a/internal/cache/remote_test.go b/internal/cache/remote_test.go index aa6d454..dd54fed 100644 --- a/internal/cache/remote_test.go +++ b/internal/cache/remote_test.go @@ -4,6 +4,7 @@ import ( "log/slog" "net/http" "net/http/httptest" + "os" "testing" "time" @@ -16,7 +17,7 @@ import ( "github.com/block/cachew/internal/strategy" ) -func TestRemoteClient(t *testing.T) { +func TestRemoteCache(t *testing.T) { cachetest.Suite(t, func(t *testing.T) cache.Cache { ctx := t.Context() _, ctx = logging.Configure(ctx, logging.Config{Level: slog.LevelError}) @@ -36,3 +37,37 @@ func TestRemoteClient(t *testing.T) { return client }) } + +func TestRemoteCacheSoak(t *testing.T) { + if os.Getenv("SOAK_TEST") == "" { + t.Skip("Skipping soak test; set SOAK_TEST=1 to run") + } + + ctx := t.Context() + _, ctx = logging.Configure(ctx, logging.Config{Level: slog.LevelError}) + memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{ + LimitMB: 50, + MaxTTL: 10 * time.Minute, + }) + assert.NoError(t, err) + defer memCache.Close() + + mux := http.NewServeMux() + _, err = strategy.NewAPIV1(ctx, struct{}{}, jobscheduler.New(ctx, jobscheduler.Config{}), memCache, mux) + assert.NoError(t, err) + ts := httptest.NewServer(mux) + defer ts.Close() + + client := cache.NewRemote(ts.URL) + defer client.Close() + + cachetest.Soak(t, client, cachetest.SoakConfig{ + Duration: time.Minute, + NumObjects: 500, + MaxObjectSize: 512 * 1024, + MinObjectSize: 1024, + OverwritePercent: 30, + Concurrency: 4, + TTL: 5 * time.Minute, + }) +} diff --git a/internal/cache/s3.go b/internal/cache/s3.go index 23a3017..f82fae7 100644 --- a/internal/cache/s3.go +++ b/internal/cache/s3.go @@ -168,7 +168,7 @@ func (s *S3) Stat(ctx context.Context, key Key) (http.Header, error) { objInfo, err := s.client.StatObject(ctx, s.config.Bucket, objectName, minio.StatObjectOptions{}) if err != nil { errResponse := minio.ToErrorResponse(err) - if errResponse.Code == "NoSuchKey" { + if errResponse.Code == s3ErrNoSuchKey { return nil, os.ErrNotExist } return nil, errors.Errorf("failed to stat object: %w", err) @@ -211,7 +211,7 @@ func (s *S3) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, err objInfo, err := s.client.StatObject(ctx, s.config.Bucket, objectName, minio.StatObjectOptions{}) if err != nil { errResponse := minio.ToErrorResponse(err) - if errResponse.Code == "NoSuchKey" { + if errResponse.Code == s3ErrNoSuchKey { return nil, nil, os.ErrNotExist } return nil, nil, errors.Errorf("failed to stat object: %w", err) @@ -247,7 +247,31 @@ func (s *S3) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, err return nil, nil, errors.Errorf("failed to get object: %w", err) } - return obj, headers, nil + return &s3Reader{obj: obj}, headers, nil +} + +const s3ErrNoSuchKey = "NoSuchKey" + +// s3Reader wraps minio.Object to convert S3 errors to standard errors. +type s3Reader struct { + obj *minio.Object +} + +func (r *s3Reader) Read(p []byte) (int, error) { + n, err := r.obj.Read(p) + if err == nil || errors.Is(err, io.EOF) { + return n, err //nolint:wrapcheck + } + // Convert NoSuchKey to os.ErrNotExist for consistency + errResponse := minio.ToErrorResponse(err) + if errResponse.Code == s3ErrNoSuchKey { + return n, os.ErrNotExist + } + return n, errors.WithStack(err) +} + +func (r *s3Reader) Close() error { + return errors.WithStack(r.obj.Close()) } func (s *S3) Create(ctx context.Context, key Key, headers http.Header, ttl time.Duration) (io.WriteCloser, error) { @@ -290,6 +314,12 @@ func (s *S3) Delete(ctx context.Context, key Key) error { return nil } +func (s *S3) Stats(_ context.Context) (Stats, error) { + // S3 doesn't provide efficient count/size operations without listing the entire bucket, + // which would be prohibitively slow and expensive. + return Stats{}, ErrStatsUnavailable +} + type s3Writer struct { s3 *S3 key Key @@ -298,10 +328,24 @@ type s3Writer struct { headers http.Header ctx context.Context errCh chan error + uploadErr error } func (w *s3Writer) Write(p []byte) (int, error) { - return errors.WithStack2(w.pipe.Write(p)) + n, err := w.pipe.Write(p) + if err != nil { + // Check if upload failed - if so, return that error instead + select { + case uploadErr := <-w.errCh: + if uploadErr != nil { + w.uploadErr = uploadErr + return n, uploadErr + } + default: + } + return n, errors.WithStack(err) + } + return n, nil } func (w *s3Writer) Close() error { @@ -310,6 +354,11 @@ func (w *s3Writer) Close() error { return errors.Wrap(err, "failed to close pipe") } + // If we already captured the upload error during Write, return it + if w.uploadErr != nil { + return w.uploadErr + } + // Wait for upload to complete and get any error err := <-w.errCh if err != nil { @@ -320,7 +369,11 @@ func (w *s3Writer) Close() error { } func (w *s3Writer) upload(pr *io.PipeReader) { - defer pr.Close() + var uploadErr error + defer func() { + // Use CloseWithError to propagate any error to the writer side + _ = pr.CloseWithError(uploadErr) + }() objectName := w.s3.keyToPath(w.key) @@ -330,7 +383,8 @@ func (w *s3Writer) upload(pr *io.PipeReader) { // Store expiration time expiresAtBytes, err := w.expiresAt.MarshalText() if err != nil { - w.errCh <- errors.Errorf("failed to marshal expiration time: %w", err) + uploadErr = errors.Errorf("failed to marshal expiration time: %w", err) + w.errCh <- uploadErr return } userMetadata["Expires-At"] = string(expiresAtBytes) @@ -339,7 +393,8 @@ func (w *s3Writer) upload(pr *io.PipeReader) { if len(w.headers) > 0 { headersJSON, err := json.Marshal(w.headers) if err != nil { - w.errCh <- errors.Errorf("failed to marshal headers: %w", err) + uploadErr = errors.Errorf("failed to marshal headers: %w", err) + w.errCh <- uploadErr return } userMetadata["Headers"] = string(headersJSON) @@ -367,7 +422,8 @@ func (w *s3Writer) upload(pr *io.PipeReader) { opts, ) if err != nil { - w.errCh <- errors.Errorf("failed to put object: %w", err) + uploadErr = errors.Errorf("failed to put object: %w", err) + w.errCh <- uploadErr return } diff --git a/internal/cache/s3_test.go b/internal/cache/s3_test.go index cc4df78..1486fa1 100644 --- a/internal/cache/s3_test.go +++ b/internal/cache/s3_test.go @@ -2,6 +2,7 @@ package cache_test import ( "log/slog" + "os" "os/exec" "testing" "time" @@ -17,62 +18,61 @@ import ( ) const ( - rustfsPort = "19000" - rustfsAddr = "localhost:" + rustfsPort - rustfsUsername = "rustfsadmin" - rustfsPassword = "rustfsadmin" - rustfsBucket = "test-bucket" + minioPort = "19000" + minioAddr = "localhost:" + minioPort + minioUsername = "minioadmin" + minioPassword = "minioadmin" + minioBucket = "test-bucket" ) -// startRustfs starts a rustfs server and returns a cleanup function. -func startRustfs(t *testing.T) { +// startMinio starts a MinIO server via Docker. +func startMinio(t *testing.T) { t.Helper() - dir := t.TempDir() + containerName := "minio-test-" + t.Name() - // Start rustfs server - cmd := exec.CommandContext(t.Context(), "rustfs", - "--address", ":"+rustfsPort, - "--access-key", rustfsUsername, - "--secret-key", rustfsPassword, - dir, + // Start MinIO container + cmd := exec.CommandContext(t.Context(), "docker", "run", "-d", + "--name", containerName, + "-p", minioPort+":9000", + "-e", "MINIO_ROOT_USER="+minioUsername, + "-e", "MINIO_ROOT_PASSWORD="+minioPassword, + "minio/minio", "server", "/data", ) - - err := cmd.Start() - assert.NoError(t, err) + output, err := cmd.CombinedOutput() + if err != nil { + t.Fatalf("failed to start minio container: %v\n%s", err, output) + } t.Cleanup(func() { - if cmd.Process != nil { - _ = cmd.Process.Kill() - _ = cmd.Wait() - } + _ = exec.Command("docker", "rm", "-f", containerName).Run() }) - // Wait for rustfs to be ready - waitForRustfs(t) + // Wait for MinIO to be ready + waitForMinio(t) // Create test bucket createBucket(t) } -// waitForRustfs waits for the rustfs server to be ready. -func waitForRustfs(t *testing.T) { +// waitForMinio waits for the MinIO server to be ready. +func waitForMinio(t *testing.T) { t.Helper() - client, err := minio.New(rustfsAddr, &minio.Options{ - Creds: credentials.NewStaticV4(rustfsUsername, rustfsPassword, ""), + client, err := minio.New(minioAddr, &minio.Options{ + Creds: credentials.NewStaticV4(minioUsername, minioPassword, ""), Secure: false, }) assert.NoError(t, err) - timeout := time.After(10 * time.Second) + timeout := time.After(30 * time.Second) ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() for { select { case <-timeout: - t.Fatal(errors.New("timed out waiting for rustfs to start")) + t.Fatal(errors.New("timed out waiting for minio to start")) case <-ticker.C: _, err := client.ListBuckets(t.Context()) if err == nil { @@ -82,21 +82,21 @@ func waitForRustfs(t *testing.T) { } } -// createBucket creates the test bucket in rustfs. +// createBucket creates the test bucket in MinIO. func createBucket(t *testing.T) { t.Helper() - client, err := minio.New(rustfsAddr, &minio.Options{ - Creds: credentials.NewStaticV4(rustfsUsername, rustfsPassword, ""), + client, err := minio.New(minioAddr, &minio.Options{ + Creds: credentials.NewStaticV4(minioUsername, minioPassword, ""), Secure: false, }) assert.NoError(t, err) - exists, err := client.BucketExists(t.Context(), rustfsBucket) + exists, err := client.BucketExists(t.Context(), minioBucket) assert.NoError(t, err) if !exists { - err = client.MakeBucket(t.Context(), rustfsBucket, minio.MakeBucketOptions{}) + err = client.MakeBucket(t.Context(), minioBucket, minio.MakeBucketOptions{}) assert.NoError(t, err) } } @@ -105,34 +105,24 @@ func createBucket(t *testing.T) { func cleanBucket(t *testing.T) { t.Helper() - client, err := minio.New(rustfsAddr, &minio.Options{ - Creds: credentials.NewStaticV4(rustfsUsername, rustfsPassword, ""), + client, err := minio.New(minioAddr, &minio.Options{ + Creds: credentials.NewStaticV4(minioUsername, minioPassword, ""), Secure: false, }) assert.NoError(t, err) - objectsCh := client.ListObjects(t.Context(), rustfsBucket, minio.ListObjectsOptions{Recursive: true}) + objectsCh := client.ListObjects(t.Context(), minioBucket, minio.ListObjectsOptions{Recursive: true}) for obj := range objectsCh { if obj.Err != nil { continue } - _ = client.RemoveObject(t.Context(), rustfsBucket, obj.Key, minio.RemoveObjectOptions{}) + _ = client.RemoveObject(t.Context(), minioBucket, obj.Key, minio.RemoveObjectOptions{}) } } -// TestS3Cache tests the S3 cache implementation using rustfs. -// -// This test starts a rustfs server per test run. -// The rustfs binary must be available in PATH (managed by Hermit). -// -// The rustfs server: -// - Starts once per test run -// - Uses credentials: rustfsadmin/rustfsadmin -// - Listens on port 19000 -// - Stores data in a temporary directory -// - Cleans up automatically after the test completes +// TestS3Cache tests the S3 cache implementation using MinIO in Docker. func TestS3Cache(t *testing.T) { - startRustfs(t) + startMinio(t) cachetest.Suite(t, func(t *testing.T) cache.Cache { _, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelDebug}) @@ -141,12 +131,12 @@ func TestS3Cache(t *testing.T) { cleanBucket(t) // Set credentials via environment variables for the AWS credential chain - t.Setenv("AWS_ACCESS_KEY_ID", rustfsUsername) - t.Setenv("AWS_SECRET_ACCESS_KEY", rustfsPassword) + t.Setenv("AWS_ACCESS_KEY_ID", minioUsername) + t.Setenv("AWS_SECRET_ACCESS_KEY", minioPassword) c, err := cache.NewS3(ctx, cache.S3Config{ - Endpoint: rustfsAddr, - Bucket: rustfsBucket, + Endpoint: minioAddr, + Bucket: minioBucket, Region: "", UseSSL: false, MaxTTL: 100 * time.Millisecond, @@ -156,3 +146,41 @@ func TestS3Cache(t *testing.T) { return c }) } + +func TestS3CacheSoak(t *testing.T) { + if os.Getenv("SOAK_TEST") == "" { + t.Skip("Skipping soak test; set SOAK_TEST=1 to run") + } + + startMinio(t) + + _, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelError}) + + // Clean bucket to ensure test isolation + cleanBucket(t) + + // Set credentials via environment variables for the AWS credential chain + t.Setenv("AWS_ACCESS_KEY_ID", minioUsername) + t.Setenv("AWS_SECRET_ACCESS_KEY", minioPassword) + + c, err := cache.NewS3(ctx, cache.S3Config{ + Endpoint: minioAddr, + Bucket: minioBucket, + Region: "", + UseSSL: false, + MaxTTL: 10 * time.Minute, + UploadPartSizeMB: 16, + }) + assert.NoError(t, err) + defer c.Close() + + cachetest.Soak(t, c, cachetest.SoakConfig{ + Duration: 30 * time.Second, + NumObjects: 100, + MaxObjectSize: 64 * 1024, + MinObjectSize: 1024, + OverwritePercent: 30, + Concurrency: 4, + TTL: 5 * time.Minute, + }) +} diff --git a/internal/cache/tiered.go b/internal/cache/tiered.go index 5a5b5be..6466a6a 100644 --- a/internal/cache/tiered.go +++ b/internal/cache/tiered.go @@ -132,6 +132,23 @@ func (t Tiered) String() string { return "tiered:" + strings.Join(names, ",") } +func (t Tiered) Stats(ctx context.Context) (Stats, error) { + var combined Stats + for _, c := range t.caches { + s, err := c.Stats(ctx) + if errors.Is(err, ErrStatsUnavailable) { + continue + } + if err != nil { + return Stats{}, errors.Wrap(err, c.String()) + } + combined.Objects += s.Objects + combined.Size += s.Size + combined.Capacity += s.Capacity + } + return combined, nil +} + type tieredWriter struct { writers []io.WriteCloser cancel context.CancelCauseFunc diff --git a/internal/cache/tiered_test.go b/internal/cache/tiered_test.go index 8a707b2..d6ae52f 100644 --- a/internal/cache/tiered_test.go +++ b/internal/cache/tiered_test.go @@ -1,6 +1,8 @@ package cache_test import ( + "log/slog" + "os" "testing" "time" @@ -11,7 +13,7 @@ import ( "github.com/block/cachew/internal/logging" ) -func TestTiered(t *testing.T) { +func TestTieredCache(t *testing.T) { cachetest.Suite(t, func(t *testing.T) cache.Cache { _, ctx := logging.Configure(t.Context(), logging.Config{}) memory, err := cache.NewMemory(ctx, cache.MemoryConfig{LimitMB: 1024, MaxTTL: time.Hour}) @@ -21,3 +23,35 @@ func TestTiered(t *testing.T) { return cache.MaybeNewTiered(ctx, []cache.Cache{memory, disk}) }) } + +func TestTieredCacheSoak(t *testing.T) { + if os.Getenv("SOAK_TEST") == "" { + t.Skip("Skipping soak test; set SOAK_TEST=1 to run") + } + + _, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelError}) + memory, err := cache.NewMemory(ctx, cache.MemoryConfig{ + LimitMB: 25, + MaxTTL: 10 * time.Minute, + }) + assert.NoError(t, err) + disk, err := cache.NewDisk(ctx, cache.DiskConfig{ + Root: t.TempDir(), + LimitMB: 50, + MaxTTL: 10 * time.Minute, + EvictInterval: time.Second, + }) + assert.NoError(t, err) + c := cache.MaybeNewTiered(ctx, []cache.Cache{memory, disk}) + defer c.Close() + + cachetest.Soak(t, c, cachetest.SoakConfig{ + Duration: time.Minute, + NumObjects: 500, + MaxObjectSize: 512 * 1024, + MinObjectSize: 1024, + OverwritePercent: 30, + Concurrency: 8, + TTL: 5 * time.Minute, + }) +} diff --git a/internal/strategy/apiv1.go b/internal/strategy/apiv1.go index 6dd14f0..2251976 100644 --- a/internal/strategy/apiv1.go +++ b/internal/strategy/apiv1.go @@ -2,6 +2,7 @@ package strategy import ( "context" + "encoding/json" "errors" "io" "log/slog" @@ -36,6 +37,7 @@ func NewAPIV1(ctx context.Context, _ struct{}, _ jobscheduler.Scheduler, cache c mux.Handle("HEAD /api/v1/object/{key}", http.HandlerFunc(s.statObject)) mux.Handle("POST /api/v1/object/{key}", http.HandlerFunc(s.putObject)) mux.Handle("DELETE /api/v1/object/{key}", http.HandlerFunc(s.deleteObject)) + mux.Handle("GET /api/v1/stats", http.HandlerFunc(s.getStats)) return s, nil } @@ -51,7 +53,7 @@ func (d *APIV1) statObject(w http.ResponseWriter, r *http.Request) { headers, err := d.cache.Stat(r.Context(), key) if err != nil { if errors.Is(err, os.ErrNotExist) { - d.httpError(w, http.StatusNotFound, err, "Cache object not found", slog.String("key", key.String())) + http.Error(w, "Cache object not found", http.StatusNotFound) return } d.httpError(w, http.StatusInternalServerError, err, "Failed to open cache object", slog.String("key", key.String())) @@ -72,7 +74,7 @@ func (d *APIV1) getObject(w http.ResponseWriter, r *http.Request) { cr, headers, err := d.cache.Open(r.Context(), key) if err != nil { if errors.Is(err, os.ErrNotExist) { - d.httpError(w, http.StatusNotFound, err, "Cache object not found", slog.String("key", key.String())) + http.Error(w, "Cache object not found", http.StatusNotFound) return } d.httpError(w, http.StatusInternalServerError, err, "Failed to open cache object", slog.String("key", key.String())) @@ -137,7 +139,7 @@ func (d *APIV1) deleteObject(w http.ResponseWriter, r *http.Request) { err = d.cache.Delete(r.Context(), key) if err != nil { if errors.Is(err, os.ErrNotExist) { - d.httpError(w, http.StatusNotFound, err, "Cache object not found", slog.String("key", key.String())) + http.Error(w, "Cache object not found", http.StatusNotFound) return } d.httpError(w, http.StatusInternalServerError, err, "Failed to delete cache object", slog.String("key", key.String())) @@ -145,6 +147,23 @@ func (d *APIV1) deleteObject(w http.ResponseWriter, r *http.Request) { } } +func (d *APIV1) getStats(w http.ResponseWriter, r *http.Request) { + stats, err := d.cache.Stats(r.Context()) + if err != nil { + if errors.Is(err, cache.ErrStatsUnavailable) { + d.httpError(w, http.StatusNotImplemented, err, "Stats not available for this cache backend") + return + } + d.httpError(w, http.StatusInternalServerError, err, "Failed to get cache stats") + return + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(stats); err != nil { + d.logger.Error("Failed to encode stats response", slog.String("error", err.Error())) + } +} + func (d *APIV1) httpError(w http.ResponseWriter, code int, err error, message string, args ...any) { args = append(args, slog.String("error", err.Error())) d.logger.Error(message, args...)