Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion bin/.rustfs-1.0.0-alpha.79.pkg

This file was deleted.

1 change: 0 additions & 1 deletion bin/rustfs

This file was deleted.

5 changes: 0 additions & 5 deletions cmd/cachew/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,5 @@ func (pk *PlatformKey) AfterApply(cli *CLI) error {
prefixed = now.Format("2006-01-02-") + prefixed
}

// Only print debug if we actually modified the key
if prefixed != pk.raw {
fmt.Fprintf(os.Stderr, "[DEBUG] Key transform: %s -> %s\n", pk.raw, prefixed) //nolint:forbidigo
}

return errors.WithStack(pk.key.UnmarshalText([]byte(prefixed)))
}
15 changes: 15 additions & 0 deletions internal/cache/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import (
// ErrNotFound is returned when a cache backend is not found.
var ErrNotFound = errors.New("cache backend not found")

// ErrStatsUnavailable is returned when a cache backend cannot provide statistics.
var ErrStatsUnavailable = errors.New("stats unavailable")

type registryEntry struct {
schema *hcl.Block
factory func(ctx context.Context, config *hcl.Block) (Cache, error)
Expand Down Expand Up @@ -112,6 +115,16 @@ func FilterTransportHeaders(headers http.Header) http.Header {
return filtered
}

// Stats contains health and usage statistics for a cache.
type Stats struct {
// Objects is the number of objects currently in the cache.
Objects int64 `json:"objects"`
// Size is the total size of all objects in the cache in bytes.
Size int64 `json:"size"`
// Capacity is the maximum size of the cache in bytes (0 if unlimited).
Capacity int64 `json:"capacity"`
}

// A Cache knows how to retrieve, create and delete objects from a cache.
//
// Objects in the cache are not guaranteed to persist and implementations may delete them at any time.
Expand Down Expand Up @@ -141,6 +154,8 @@ type Cache interface {
//
// MUST be atomic.
Delete(ctx context.Context, key Key) error
// Stats returns health and usage statistics for the cache.
Stats(ctx context.Context) (Stats, error)
// Close the Cache.
Close() error
}
346 changes: 346 additions & 0 deletions internal/cache/cachetest/soak.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,346 @@
package cachetest

import (
"context"
"crypto/rand"
"crypto/sha256"
"errors"
"fmt"
"io"
mrand "math/rand/v2"
"os"
"runtime"
"slices"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/alecthomas/assert/v2"

"github.com/block/cachew/internal/cache"
)

// SoakConfig configures the soak test parameters.
type SoakConfig struct {
// Duration is how long to run the soak test (default: 1 minute).
Duration time.Duration
// NumObjects is the total number of unique object keys to use (default: 1000).
NumObjects int
// MaxObjectSize is the maximum size of each object in bytes (default: 1MB).
MaxObjectSize int
// MinObjectSize is the minimum size of each object in bytes (default: 1KB).
MinObjectSize int
// OverwritePercent is the percentage of writes that should overwrite existing keys (default: 20).
OverwritePercent int
// Concurrency is the number of concurrent goroutines writing to the cache (default: 4).
Concurrency int
// TTL is the time-to-live for cache entries (default: 1 hour).
TTL time.Duration
}

func (c *SoakConfig) setDefaults() {
if c.Duration == 0 {
c.Duration = time.Minute
}
if c.NumObjects == 0 {
c.NumObjects = 1000
}
if c.MaxObjectSize == 0 {
c.MaxObjectSize = 1024 * 1024 // 1MB
}
if c.MinObjectSize == 0 {
c.MinObjectSize = 1024 // 1KB
}
if c.OverwritePercent == 0 {
c.OverwritePercent = 20
}
if c.Concurrency == 0 {
c.Concurrency = 4
}
if c.TTL == 0 {
c.TTL = time.Hour
}
}

// SoakResult contains the results of a soak test run.
type SoakResult struct {
Writes int64
Reads int64
ReadHits int64
ReadMisses int64
Deletes int64
BytesWritten int64
Duration time.Duration

// Memory stats
HeapAllocStart uint64
HeapAllocEnd uint64
TotalAlloc uint64
NumGC uint32
}

// Soak runs an extended soak test against a cache implementation.
//
// The test writes random objects of varying sizes, with some overwrites,
// and verifies that the cache behaves correctly under sustained load.
// It also performs periodic reads and deletes.
func Soak(t *testing.T, c cache.Cache, config SoakConfig) SoakResult {
config.setDefaults()

ctx, cancel := context.WithTimeout(t.Context(), config.Duration+time.Minute)
defer cancel()

var result SoakResult
var mu sync.Mutex
// key index -> list of SHA256 hashes of all values ever written to this key
writtenHashes := make(map[int][][32]byte)

// Capture initial memory stats
runtime.GC()
var memStart runtime.MemStats
runtime.ReadMemStats(&memStart)
result.HeapAllocStart = memStart.HeapAlloc

startTime := time.Now()
deadline := startTime.Add(config.Duration)

var wg sync.WaitGroup
for i := range config.Concurrency {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
soakWorker(ctx, t, c, &config, deadline, workerID, &result, &mu, writtenHashes)
}(i)
}

wg.Wait()
result.Duration = time.Since(startTime)

// Capture final memory stats
runtime.GC()
var memEnd runtime.MemStats
runtime.ReadMemStats(&memEnd)
result.HeapAllocEnd = memEnd.HeapAlloc
result.TotalAlloc = memEnd.TotalAlloc - memStart.TotalAlloc
result.NumGC = memEnd.NumGC - memStart.NumGC

verifyHealth(t, c, &result)

return result
}

func soakWorker(
ctx context.Context,
t *testing.T,
c cache.Cache,
config *SoakConfig,
deadline time.Time,
workerID int,
result *SoakResult,
mu *sync.Mutex,
writtenHashes map[int][][32]byte,
) {
//nolint:gosec // math/rand is fine for soak testing, we don't need cryptographic randomness for test operations
rng := mrand.New(mrand.NewPCG(uint64(workerID), uint64(time.Now().UnixNano())))

for time.Now().Before(deadline) {
select {
case <-ctx.Done():
return
default:
}

op := rng.IntN(100)
switch {
case op < 60: // 60% writes
doWrite(ctx, t, c, config, rng, result, mu, writtenHashes)
case op < 90: // 30% reads
doRead(ctx, t, c, config, rng, result, mu, writtenHashes)
default: // 10% deletes
doDelete(ctx, t, c, config, rng, result)
}
}
}

func doWrite(
ctx context.Context,
t *testing.T,
c cache.Cache,
config *SoakConfig,
rng *mrand.Rand,
result *SoakResult,
mu *sync.Mutex,
writtenHashes map[int][][32]byte,
) {
var keyIdx int
mu.Lock()
numWritten := len(writtenHashes)
mu.Unlock()

if numWritten > 0 && rng.IntN(100) < config.OverwritePercent {
// Overwrite an existing key
keyIdx = rng.IntN(min(numWritten, config.NumObjects))
} else {
// Write a new key
keyIdx = rng.IntN(config.NumObjects)
}

size := config.MinObjectSize + rng.IntN(config.MaxObjectSize-config.MinObjectSize+1)
data := make([]byte, size)
if _, err := rand.Read(data); err != nil {
t.Errorf("failed to generate random data: %+v", err)
return
}

key := cache.NewKey(fmt.Sprintf("soak-key-%d", keyIdx))
writer, err := c.Create(ctx, key, nil, config.TTL)
if err != nil {
t.Errorf("failed to create cache entry: %+v", err)
return
}

n, err := writer.Write(data)
if err != nil {
t.Errorf("failed to write cache entry: %+v", err)
_ = writer.Close()
return
}
if n != len(data) {
t.Errorf("short write: wrote %d of %d bytes", n, len(data))
_ = writer.Close()
return
}

// Record hash BEFORE Close() to avoid race with concurrent reads
hash := sha256.Sum256(data)
mu.Lock()
writtenHashes[keyIdx] = append(writtenHashes[keyIdx], hash)
mu.Unlock()

if err := writer.Close(); err != nil {
t.Errorf("failed to close cache entry: %+v", err)
return
}

atomic.AddInt64(&result.Writes, 1)
atomic.AddInt64(&result.BytesWritten, int64(n))
}

func doRead(
ctx context.Context,
t *testing.T,
c cache.Cache,
config *SoakConfig,
rng *mrand.Rand,
result *SoakResult,
mu *sync.Mutex,
writtenHashes map[int][][32]byte,
) {
mu.Lock()
numWritten := len(writtenHashes)
mu.Unlock()

if numWritten == 0 {
return
}

keyIdx := rng.IntN(config.NumObjects)
key := cache.NewKey(fmt.Sprintf("soak-key-%d", keyIdx))

reader, _, err := c.Open(ctx, key)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
atomic.AddInt64(&result.ReadMisses, 1)
atomic.AddInt64(&result.Reads, 1)
return
}
t.Errorf("failed to open cache entry: %+v", err)
return
}
defer reader.Close()

data, err := io.ReadAll(reader)
if err != nil {
// Object may have been deleted between Open and Read - treat as miss
if errors.Is(err, os.ErrNotExist) {
atomic.AddInt64(&result.ReadMisses, 1)
atomic.AddInt64(&result.Reads, 1)
return
}
t.Errorf("failed to read cache entry: %+v", err)
return
}

// Hash the data we read, then check against all historical writes
readHash := sha256.Sum256(data)
mu.Lock()
hashes := writtenHashes[keyIdx]
mu.Unlock()

if !slices.Contains(hashes, readHash) {
t.Errorf("data mismatch for key %d: read %d bytes with hash not in %d historical writes",
keyIdx, len(data), len(hashes))
return
}

atomic.AddInt64(&result.ReadHits, 1)
atomic.AddInt64(&result.Reads, 1)
}

func doDelete(
ctx context.Context,
t *testing.T,
c cache.Cache,
config *SoakConfig,
rng *mrand.Rand,
result *SoakResult,
) {
keyIdx := rng.IntN(config.NumObjects)
key := cache.NewKey(fmt.Sprintf("soak-key-%d", keyIdx))

if err := c.Delete(ctx, key); err != nil {
if errors.Is(err, os.ErrNotExist) {
return
}
t.Errorf("failed to delete cache entry: %+v", err)
return
}

atomic.AddInt64(&result.Deletes, 1)
}

func verifyHealth(t *testing.T, c cache.Cache, result *SoakResult) {
t.Logf("Soak test completed:")
t.Logf(" Duration: %v", result.Duration)
t.Logf(" Writes: %d (%.1f/sec)", result.Writes, float64(result.Writes)/result.Duration.Seconds())
t.Logf(" Reads: %d (hits: %d, misses: %d)", result.Reads, result.ReadHits, result.ReadMisses)
t.Logf(" Deletes: %d", result.Deletes)
t.Logf(" Bytes written: %d MB", result.BytesWritten/(1024*1024))
t.Logf("Memory stats:")
t.Logf(" Heap start: %.1f MB", float64(result.HeapAllocStart)/(1024*1024))
t.Logf(" Heap end: %.1f MB", float64(result.HeapAllocEnd)/(1024*1024))
t.Logf(" Total allocated: %.1f MB", float64(result.TotalAlloc)/(1024*1024))
t.Logf(" GC cycles: %d", result.NumGC)

stats, err := c.Stats(context.Background())
if errors.Is(err, cache.ErrStatsUnavailable) {
t.Logf("Cache stats: unavailable")
return
}
assert.NoError(t, err, "failed to get cache stats")

t.Logf("Cache stats:")
t.Logf(" Objects: %d", stats.Objects)
t.Logf(" Size: %d MB", stats.Size/(1024*1024))
t.Logf(" Capacity: %d MB", stats.Capacity/(1024*1024))

// Verify size is within capacity (allow some slack for in-flight writes)
if stats.Capacity > 0 {
assert.True(t, stats.Size <= stats.Capacity*2,
"cache size (%d) exceeds capacity x 2 (%d)", stats.Size, stats.Capacity*2)
}

// Verify object count is non-negative
assert.True(t, stats.Objects >= 0, "object count should be non-negative")
}
Loading