From 7eb7de04c75e72558b457c4beef72ece48e57a9a Mon Sep 17 00:00:00 2001 From: Alec Thomas Date: Fri, 27 Mar 2026 13:10:53 +1100 Subject: [PATCH] feat: add eventually consistent metadata store for cross-replica coordination Introduces the metadatadb package, providing generic redis-like data structures (Scalar, Int, Set, Map, List) backed by a pluggable storage layer. Mutations apply locally immediately and sync periodically via a lock-load-replay-store cycle. Includes memory and S3 backends, a reusable test suite, and a soak test with monotonic invariant verification. Also extracts MinIO Docker test helpers into internal/minitest for reuse across cache and metadatadb S3 tests. Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/cache/s3_test.go | 134 +----- internal/metadatadb/api.go | 455 ++++++++++++++++++++ internal/metadatadb/memory.go | 75 ++++ internal/metadatadb/memory_test.go | 24 ++ internal/metadatadb/metadatadbtest/soak.go | 238 ++++++++++ internal/metadatadb/metadatadbtest/suite.go | 435 +++++++++++++++++++ internal/metadatadb/oplog.go | 314 ++++++++++++++ internal/metadatadb/s3.go | 173 ++++++++ internal/metadatadb/s3_test.go | 39 ++ internal/minitest/minitest.go | 116 +++++ 10 files changed, 1878 insertions(+), 125 deletions(-) create mode 100644 internal/metadatadb/api.go create mode 100644 internal/metadatadb/memory.go create mode 100644 internal/metadatadb/memory_test.go create mode 100644 internal/metadatadb/metadatadbtest/soak.go create mode 100644 internal/metadatadb/metadatadbtest/suite.go create mode 100644 internal/metadatadb/oplog.go create mode 100644 internal/metadatadb/s3.go create mode 100644 internal/metadatadb/s3_test.go create mode 100644 internal/minitest/minitest.go diff --git a/internal/cache/s3_test.go b/internal/cache/s3_test.go index 1486fa10..16a39fcb 100644 --- a/internal/cache/s3_test.go +++ b/internal/cache/s3_test.go @@ -3,140 +3,29 @@ package cache_test import ( "log/slog" "os" - "os/exec" "testing" "time" "github.com/alecthomas/assert/v2" - "github.com/alecthomas/errors" - "github.com/minio/minio-go/v7" - "github.com/minio/minio-go/v7/pkg/credentials" "github.com/block/cachew/internal/cache" "github.com/block/cachew/internal/cache/cachetest" "github.com/block/cachew/internal/logging" + "github.com/block/cachew/internal/minitest" ) -const ( - minioPort = "19000" - minioAddr = "localhost:" + minioPort - minioUsername = "minioadmin" - minioPassword = "minioadmin" - minioBucket = "test-bucket" -) - -// startMinio starts a MinIO server via Docker. -func startMinio(t *testing.T) { - t.Helper() - - containerName := "minio-test-" + t.Name() - - // Start MinIO container - cmd := exec.CommandContext(t.Context(), "docker", "run", "-d", - "--name", containerName, - "-p", minioPort+":9000", - "-e", "MINIO_ROOT_USER="+minioUsername, - "-e", "MINIO_ROOT_PASSWORD="+minioPassword, - "minio/minio", "server", "/data", - ) - output, err := cmd.CombinedOutput() - if err != nil { - t.Fatalf("failed to start minio container: %v\n%s", err, output) - } - - t.Cleanup(func() { - _ = exec.Command("docker", "rm", "-f", containerName).Run() - }) - - // Wait for MinIO to be ready - waitForMinio(t) - - // Create test bucket - createBucket(t) -} - -// waitForMinio waits for the MinIO server to be ready. -func waitForMinio(t *testing.T) { - t.Helper() - - client, err := minio.New(minioAddr, &minio.Options{ - Creds: credentials.NewStaticV4(minioUsername, minioPassword, ""), - Secure: false, - }) - assert.NoError(t, err) - - timeout := time.After(30 * time.Second) - ticker := time.NewTicker(100 * time.Millisecond) - defer ticker.Stop() - - for { - select { - case <-timeout: - t.Fatal(errors.New("timed out waiting for minio to start")) - case <-ticker.C: - _, err := client.ListBuckets(t.Context()) - if err == nil { - return - } - } - } -} - -// createBucket creates the test bucket in MinIO. -func createBucket(t *testing.T) { - t.Helper() - - client, err := minio.New(minioAddr, &minio.Options{ - Creds: credentials.NewStaticV4(minioUsername, minioPassword, ""), - Secure: false, - }) - assert.NoError(t, err) - - exists, err := client.BucketExists(t.Context(), minioBucket) - assert.NoError(t, err) - - if !exists { - err = client.MakeBucket(t.Context(), minioBucket, minio.MakeBucketOptions{}) - assert.NoError(t, err) - } -} - -// cleanBucket removes all objects from the bucket. -func cleanBucket(t *testing.T) { - t.Helper() - - client, err := minio.New(minioAddr, &minio.Options{ - Creds: credentials.NewStaticV4(minioUsername, minioPassword, ""), - Secure: false, - }) - assert.NoError(t, err) - - objectsCh := client.ListObjects(t.Context(), minioBucket, minio.ListObjectsOptions{Recursive: true}) - for obj := range objectsCh { - if obj.Err != nil { - continue - } - _ = client.RemoveObject(t.Context(), minioBucket, obj.Key, minio.RemoveObjectOptions{}) - } -} - // TestS3Cache tests the S3 cache implementation using MinIO in Docker. func TestS3Cache(t *testing.T) { - startMinio(t) + minitest.Start(t) cachetest.Suite(t, func(t *testing.T) cache.Cache { _, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelDebug}) - // Clean bucket to ensure test isolation - cleanBucket(t) - - // Set credentials via environment variables for the AWS credential chain - t.Setenv("AWS_ACCESS_KEY_ID", minioUsername) - t.Setenv("AWS_SECRET_ACCESS_KEY", minioPassword) + minitest.CleanBucket(t) c, err := cache.NewS3(ctx, cache.S3Config{ - Endpoint: minioAddr, - Bucket: minioBucket, + Endpoint: minitest.Addr, + Bucket: minitest.Bucket, Region: "", UseSSL: false, MaxTTL: 100 * time.Millisecond, @@ -152,20 +41,15 @@ func TestS3CacheSoak(t *testing.T) { t.Skip("Skipping soak test; set SOAK_TEST=1 to run") } - startMinio(t) + minitest.Start(t) _, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelError}) - // Clean bucket to ensure test isolation - cleanBucket(t) - - // Set credentials via environment variables for the AWS credential chain - t.Setenv("AWS_ACCESS_KEY_ID", minioUsername) - t.Setenv("AWS_SECRET_ACCESS_KEY", minioPassword) + minitest.CleanBucket(t) c, err := cache.NewS3(ctx, cache.S3Config{ - Endpoint: minioAddr, - Bucket: minioBucket, + Endpoint: minitest.Addr, + Bucket: minitest.Bucket, Region: "", UseSSL: false, MaxTTL: 10 * time.Minute, diff --git a/internal/metadatadb/api.go b/internal/metadatadb/api.go new file mode 100644 index 00000000..ca68723a --- /dev/null +++ b/internal/metadatadb/api.go @@ -0,0 +1,455 @@ +// Package metadatadb provides an eventually consistent metadata store for +// coordinating state across cachew replicas. Mutations are applied to local +// state immediately and synced periodically to a shared backend. Last flush +// wins — the lock serialises all writes. +package metadatadb + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "slices" + "sync" + "time" + + "github.com/alecthomas/errors" + + "github.com/block/cachew/internal/logging" +) + +// Config controls the metadata store's sync behaviour. +type Config struct { + SyncInterval time.Duration `hcl:"sync-interval,optional" help:"How often to sync with the backend." default:"30s"` + LockTTL time.Duration `hcl:"lock-ttl,optional" help:"TTL for namespace locks." default:"30s"` +} + +// op is an idempotent operation that applies itself to the state. +type op interface { + apply(state map[string]any) +} + +// ErrInvalidToken is returned by Backend.Store when the token does not match +// the current version, indicating a concurrent write. +var ErrInvalidToken = errors.New("invalid token") + +// Backend is the pluggable storage and locking layer. +type Backend interface { + // Load returns the current state and an opaque token identifying the + // version. The token must be passed to Store to prevent stale writes. + Load(ctx context.Context, namespace string) (data json.RawMessage, token string, err error) + // Store persists the state. It must return ErrInvalidToken if the token + // does not match the current version (i.e. another writer stored since + // our Load). For S3, the token maps to an ETag with If-Match. + Store(ctx context.Context, namespace string, data json.RawMessage, token string) error + Lock(ctx context.Context, namespace string) error + Unlock(ctx context.Context, namespace string) error +} + +// Store is the top-level metadata store. +type Store struct { + backend Backend + config Config + logger *slog.Logger + mu sync.Mutex + ns map[string]*Namespace + ctx context.Context + cancel context.CancelFunc +} + +func New(ctx context.Context, config Config, backend Backend) *Store { + logger := logging.FromContext(ctx) + ctx, cancel := context.WithCancel(ctx) + return &Store{ + backend: backend, + config: config, + logger: logger.With("component", "metadata"), + ns: make(map[string]*Namespace), + ctx: ctx, + cancel: cancel, + } +} + +func (s *Store) Namespace(name string) *Namespace { + s.mu.Lock() + defer s.mu.Unlock() + if ns, ok := s.ns[name]; ok { + return ns + } + ns := &Namespace{ + store: s, + name: name, + state: make(map[string]any), + done: make(chan struct{}), + } + go ns.syncLoop() + s.ns[name] = ns + return ns +} + +func (s *Store) Close() error { + s.cancel() + s.mu.Lock() + defer s.mu.Unlock() + for _, ns := range s.ns { + <-ns.done + } + return nil +} + +// Namespace is a scoped collection of named data structures. Mutations are +// applied locally immediately and synced periodically to the backend. +type Namespace struct { + store *Store + name string + mu sync.RWMutex + state map[string]any + pending []op + syncMu sync.Mutex + done chan struct{} +} + +// Flush forces an immediate sync with the backend. +func (n *Namespace) Flush(ctx context.Context) error { return n.doSync(ctx) } + +func (n *Namespace) apply(o op) { + n.mu.Lock() + defer n.mu.Unlock() + o.apply(n.state) + n.pending = append(n.pending, o) +} + +// Scalar is a single value. Last write wins. +type Scalar[V any] struct { + ns *Namespace + name string +} + +// NewScalar creates or retrieves a named scalar within a namespace. +func NewScalar[V any](ns *Namespace, name string) *Scalar[V] { + return &Scalar[V]{ns: ns, name: name} +} + +func (s *Scalar[V]) Set(value V) { s.ns.apply(&scalarSetOp[V]{name: s.name, value: value}) } +func (s *Scalar[V]) Delete() { s.ns.apply(&scalarDeleteOp{name: s.name}) } + +func (s *Scalar[V]) Get() (V, bool) { + s.ns.mu.RLock() + defer s.ns.mu.RUnlock() + raw, ok := s.ns.state[s.name] + if !ok { + var zero V + return zero, false + } + return jsonRoundTrip[V](raw), true +} + +// Int is an integer with arithmetic operations. All ops are applied +// sequentially — the lock serialises flushes. +type Int struct { + ns *Namespace + name string +} + +// NewInt creates or retrieves a named integer within a namespace. +func NewInt(ns *Namespace, name string) *Int { + return &Int{ns: ns, name: name} +} + +func (i *Int) Set(value int64) { i.ns.apply(&intSetOp{name: i.name, value: value}) } +func (i *Int) Add(delta int64) { i.ns.apply(&intAddOp{name: i.name, delta: delta}) } +func (i *Int) Mul(factor int64) { i.ns.apply(&intMulOp{name: i.name, factor: factor}) } +func (i *Int) Div(divisor int64) { i.ns.apply(&intDivOp{name: i.name, divisor: divisor}) } + +func (i *Int) Get() int64 { + i.ns.mu.RLock() + defer i.ns.mu.RUnlock() + return toInt64(i.ns.state[i.name]) +} + +// IntMap is a keyed collection of integer values supporting atomic increment. +// Keys are JSON-marshaled to strings internally. +type IntMap[K comparable] struct { + ns *Namespace + name string +} + +// NewIntMap creates or retrieves a named integer map within a namespace. +func NewIntMap[K comparable](ns *Namespace, name string) *IntMap[K] { + return &IntMap[K]{ns: ns, name: name} +} + +func (m *IntMap[K]) Set(key K, value int64) { + m.ns.apply(&intMapSetOp[K]{name: m.name, key: key, value: value}) +} +func (m *IntMap[K]) Add(key K, delta int64) { + m.ns.apply(&intMapAddOp[K]{name: m.name, key: key, delta: delta}) +} +func (m *IntMap[K]) Mul(key K, factor int64) { + m.ns.apply(&intMapMulOp[K]{name: m.name, key: key, factor: factor}) +} +func (m *IntMap[K]) Div(key K, divisor int64) { + m.ns.apply(&intMapDivOp[K]{name: m.name, key: key, divisor: divisor}) +} +func (m *IntMap[K]) Delete(key K) { m.ns.apply(&intMapDeleteOp[K]{name: m.name, key: key}) } + +func (m *IntMap[K]) Get(key K) int64 { + m.ns.mu.RLock() + defer m.ns.mu.RUnlock() + raw, ok := m.ns.state[m.name] + if !ok { + return 0 + } + return toInt64(raw.(map[string]any)[marshalKey(key)]) +} + +func (m *IntMap[K]) Keys() []K { + m.ns.mu.RLock() + defer m.ns.mu.RUnlock() + raw, ok := m.ns.state[m.name] + if !ok { + return nil + } + entries := raw.(map[string]any) + result := make([]K, 0, len(entries)) + for k := range entries { + result = append(result, unmarshalKey[K](k)) + } + slices.SortFunc(result, func(a, b K) int { + ka, kb := marshalKey(a), marshalKey(b) + if ka < kb { + return -1 + } + if ka > kb { + return 1 + } + return 0 + }) + return result +} + +func (m *IntMap[K]) Entries() map[K]int64 { + m.ns.mu.RLock() + defer m.ns.mu.RUnlock() + raw, ok := m.ns.state[m.name] + if !ok { + return nil + } + entries := raw.(map[string]any) + result := make(map[K]int64, len(entries)) + for k, v := range entries { + result[unmarshalKey[K](k)] = toInt64(v) + } + return result +} + +// Set is an unordered collection of unique members. Members are stored as +// JSON-marshaled string keys internally, so any comparable JSON-serializable +// type can be used. +type Set[V comparable] struct { + ns *Namespace + name string +} + +// NewSet creates or retrieves a named set within a namespace. +func NewSet[V comparable](ns *Namespace, name string) *Set[V] { + return &Set[V]{ns: ns, name: name} +} + +func (s *Set[V]) Add(member V) { s.ns.apply(&setAddOp[V]{name: s.name, member: member}) } +func (s *Set[V]) Remove(member V) { s.ns.apply(&setRemoveOp[V]{name: s.name, member: member}) } + +func (s *Set[V]) Contains(member V) bool { + s.ns.mu.RLock() + defer s.ns.mu.RUnlock() + raw, ok := s.ns.state[s.name] + if !ok { + return false + } + _, ok = raw.(map[string]any)[marshalKey(member)] + return ok +} + +func (s *Set[V]) Members() []V { + s.ns.mu.RLock() + defer s.ns.mu.RUnlock() + raw, ok := s.ns.state[s.name] + if !ok { + return nil + } + m := raw.(map[string]any) + result := make([]V, 0, len(m)) + for k := range m { + result = append(result, unmarshalKey[V](k)) + } + slices.SortFunc(result, func(a, b V) int { + ka, kb := marshalKey(a), marshalKey(b) + if ka < kb { + return -1 + } + if ka > kb { + return 1 + } + return 0 + }) + return result +} + +func marshalKey[V any](v V) string { + data, err := json.Marshal(v) + if err != nil { + panic(fmt.Sprintf("metadatadb: marshal key %T: %v", v, err)) + } + return string(data) +} + +func unmarshalKey[V any](s string) V { + var v V + if err := json.Unmarshal([]byte(s), &v); err != nil { + panic(fmt.Sprintf("metadatadb: unmarshal key into %T: %v", v, err)) + } + return v +} + +// Map is a keyed collection of values. Keys are JSON-marshaled to strings +// internally, so any comparable JSON-serializable type can be used. +// Last write per key wins. +type Map[K comparable, V any] struct { + ns *Namespace + name string +} + +// NewMap creates or retrieves a named map within a namespace. +func NewMap[K comparable, V any](ns *Namespace, name string) *Map[K, V] { + return &Map[K, V]{ns: ns, name: name} +} + +func (m *Map[K, V]) Set(key K, value V) { + m.ns.apply(&mapSetOp[K, V]{name: m.name, key: key, value: value}) +} + +func (m *Map[K, V]) Delete(key K) { + m.ns.apply(&mapDeleteOp[K]{name: m.name, key: key}) +} + +func (m *Map[K, V]) Get(key K) (V, bool) { + m.ns.mu.RLock() + defer m.ns.mu.RUnlock() + raw, ok := m.ns.state[m.name] + if !ok { + var zero V + return zero, false + } + v, ok := raw.(map[string]any)[marshalKey(key)] + if !ok { + var zero V + return zero, false + } + return jsonRoundTrip[V](v), true +} + +func (m *Map[K, V]) Keys() []K { + m.ns.mu.RLock() + defer m.ns.mu.RUnlock() + raw, ok := m.ns.state[m.name] + if !ok { + return nil + } + entries := raw.(map[string]any) + result := make([]K, 0, len(entries)) + for k := range entries { + result = append(result, unmarshalKey[K](k)) + } + slices.SortFunc(result, func(a, b K) int { + ka, kb := marshalKey(a), marshalKey(b) + if ka < kb { + return -1 + } + if ka > kb { + return 1 + } + return 0 + }) + return result +} + +func (m *Map[K, V]) Entries() map[K]V { + m.ns.mu.RLock() + defer m.ns.mu.RUnlock() + raw, ok := m.ns.state[m.name] + if !ok { + return nil + } + entries := raw.(map[string]any) + result := make(map[K]V, len(entries)) + for k, v := range entries { + result[unmarshalKey[K](k)] = jsonRoundTrip[V](v) + } + return result +} + +// List is an append-only ordered collection. +type List[V any] struct { + ns *Namespace + name string +} + +// NewList creates or retrieves a named append-only list within a namespace. +func NewList[V any](ns *Namespace, name string) *List[V] { + return &List[V]{ns: ns, name: name} +} + +func (l *List[V]) Append(value V) { + l.ns.apply(&listAppendOp[V]{name: l.name, value: value}) +} + +func (l *List[V]) Entries() []V { + l.ns.mu.RLock() + defer l.ns.mu.RUnlock() + raw, ok := l.ns.state[l.name] + if !ok { + return nil + } + src := raw.([]any) + result := make([]V, len(src)) + for i, v := range src { + result[i] = jsonRoundTrip[V](v) + } + return result +} + +func (l *List[V]) Len() int { + l.ns.mu.RLock() + defer l.ns.mu.RUnlock() + raw, ok := l.ns.state[l.name] + if !ok { + return 0 + } + return len(raw.([]any)) +} + +// jsonRoundTrip marshals v to JSON then unmarshals into V, handling the +// type mismatch between locally-stored Go values and JSON-deserialized values +// after a sync round-trip. +func jsonRoundTrip[V any](v any) V { + data, err := json.Marshal(v) + if err != nil { + panic(fmt.Sprintf("metadata: marshal %T: %v", v, err)) + } + var result V + if err := json.Unmarshal(data, &result); err != nil { + panic(fmt.Sprintf("metadata: unmarshal into %T: %v", result, err)) + } + return result +} + +func toInt64(v any) int64 { + switch n := v.(type) { + case int64: + return n + case float64: + return int64(n) + default: + return 0 + } +} diff --git a/internal/metadatadb/memory.go b/internal/metadatadb/memory.go new file mode 100644 index 00000000..bcd428dc --- /dev/null +++ b/internal/metadatadb/memory.go @@ -0,0 +1,75 @@ +package metadatadb + +import ( + "context" + "encoding/json" + "strconv" + "sync" + "sync/atomic" + + "github.com/alecthomas/errors" +) + +// MemoryBackend is an in-memory Backend for testing and single-instance +// deployments. It is safe for concurrent use across multiple Store instances. +type MemoryBackend struct { + mu sync.Mutex + data map[string]json.RawMessage + tokens map[string]string + version atomic.Int64 + locks map[string]chan struct{} +} + +func NewMemoryBackend() *MemoryBackend { + return &MemoryBackend{ + data: make(map[string]json.RawMessage), + tokens: make(map[string]string), + locks: make(map[string]chan struct{}), + } +} + +func (m *MemoryBackend) Load(_ context.Context, namespace string) (json.RawMessage, string, error) { + m.mu.Lock() + defer m.mu.Unlock() + return m.data[namespace], m.tokens[namespace], nil +} + +func (m *MemoryBackend) Store(_ context.Context, namespace string, data json.RawMessage, token string) error { + m.mu.Lock() + defer m.mu.Unlock() + if m.tokens[namespace] != token { + return ErrInvalidToken + } + m.data[namespace] = data + m.tokens[namespace] = strconv.FormatInt(m.version.Add(1), 10) + return nil +} + +func (m *MemoryBackend) Lock(ctx context.Context, namespace string) error { + for { + m.mu.Lock() + ch, locked := m.locks[namespace] + if !locked { + m.locks[namespace] = make(chan struct{}) + m.mu.Unlock() + return nil + } + m.mu.Unlock() + + select { + case <-ch: + case <-ctx.Done(): + return errors.WithStack(ctx.Err()) + } + } +} + +func (m *MemoryBackend) Unlock(_ context.Context, namespace string) error { + m.mu.Lock() + defer m.mu.Unlock() + if ch, ok := m.locks[namespace]; ok { + close(ch) + delete(m.locks, namespace) + } + return nil +} diff --git a/internal/metadatadb/memory_test.go b/internal/metadatadb/memory_test.go new file mode 100644 index 00000000..f726fa85 --- /dev/null +++ b/internal/metadatadb/memory_test.go @@ -0,0 +1,24 @@ +package metadatadb_test + +import ( + "testing" + "time" + + "github.com/block/cachew/internal/metadatadb" + "github.com/block/cachew/internal/metadatadb/metadatadbtest" +) + +func TestMemoryBackend(t *testing.T) { + metadatadbtest.Suite(t, func(t *testing.T) metadatadb.Backend { + t.Helper() + return metadatadb.NewMemoryBackend() + }) +} + +func TestMemoryBackendSoak(t *testing.T) { + metadatadbtest.Soak(t, metadatadb.NewMemoryBackend(), metadatadbtest.SoakConfig{ + Duration: 5 * time.Second, + Concurrency: 4, + NumKeys: 20, + }) +} diff --git a/internal/metadatadb/metadatadbtest/soak.go b/internal/metadatadb/metadatadbtest/soak.go new file mode 100644 index 00000000..08c5ce22 --- /dev/null +++ b/internal/metadatadb/metadatadbtest/soak.go @@ -0,0 +1,238 @@ +package metadatadbtest + +import ( + "context" + "fmt" + "log/slog" + mrand "math/rand/v2" + "os" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/alecthomas/assert/v2" + + "github.com/block/cachew/internal/logging" + "github.com/block/cachew/internal/metadatadb" +) + +// SoakConfig configures the soak test parameters. +type SoakConfig struct { + Duration time.Duration + Concurrency int + NumKeys int +} + +func (c *SoakConfig) setDefaults() { + if c.Duration == 0 { + c.Duration = 30 * time.Second + } + if c.Concurrency == 0 { + c.Concurrency = 4 + } + if c.NumKeys == 0 { + c.NumKeys = 50 + } +} + +// SoakResult contains the results of a soak test run. +type SoakResult struct { + Ops int64 + Flushes int64 + Errors int64 + Duration time.Duration +} + +// tracker collects monotonic invariants from workers for verification. +type tracker struct { + mu sync.Mutex + // Add-only counters: each worker adds positive deltas, total must be >= 0. + counterAdds map[string]int64 + // Add-only sets: members are only added, never removed. + setAdds map[string]map[string]bool + // Append-only lists: total appended count per key. + listAppends map[string]int +} + +func newTracker() *tracker { + return &tracker{ + counterAdds: make(map[string]int64), + setAdds: make(map[string]map[string]bool), + listAppends: make(map[string]int), + } +} + +func (tr *tracker) addCounter(key string, delta int64) { + tr.mu.Lock() + defer tr.mu.Unlock() + tr.counterAdds[key] += delta +} + +func (tr *tracker) addSetMember(key, member string) { + tr.mu.Lock() + defer tr.mu.Unlock() + if tr.setAdds[key] == nil { + tr.setAdds[key] = make(map[string]bool) + } + tr.setAdds[key][member] = true +} + +func (tr *tracker) appendList(key string) { + tr.mu.Lock() + defer tr.mu.Unlock() + tr.listAppends[key]++ +} + +// Soak runs a concurrent soak test against a backend, exercising all data +// structure types with random operations and periodic flushes, then verifies +// consistency by checking monotonic invariants and round-trip equality. +func Soak(t *testing.T, backend metadatadb.Backend, config SoakConfig) SoakResult { + if os.Getenv("SOAK_TEST") == "" { + t.Skip("Skipping soak test; set SOAK_TEST=1 to run") + } + config.setDefaults() + + ctx := logging.ContextWithLogger(context.Background(), slog.Default()) + ctx, cancel := context.WithTimeout(ctx, config.Duration+time.Minute) + defer cancel() + + cfg := metadatadb.Config{SyncInterval: time.Hour, LockTTL: 5 * time.Second} + store := metadatadb.New(ctx, cfg, backend) + t.Cleanup(func() { assert.NoError(t, store.Close()) }) + + ns := store.Namespace("soak") + tr := newTracker() + + var result SoakResult + startTime := time.Now() + deadline := startTime.Add(config.Duration) + + var wg sync.WaitGroup + for i := range config.Concurrency { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + soakWorker(ctx, ns, &config, deadline, workerID, &result, tr) + }(i) + } + wg.Wait() + result.Duration = time.Since(startTime) + + assert.NoError(t, ns.Flush(ctx)) + + verifyMonotonicInvariants(t, ns, tr) + logSoakResult(t, &result) + + return result +} + +func soakWorker( + ctx context.Context, + ns *metadatadb.Namespace, + config *SoakConfig, + deadline time.Time, + workerID int, + result *SoakResult, + tr *tracker, +) { + rng := mrand.New(mrand.NewPCG(uint64(workerID), uint64(time.Now().UnixNano()))) //nolint:gosec + + for time.Now().Before(deadline) { + select { + case <-ctx.Done(): + return + default: + } + + key := fmt.Sprintf("key-%d", rng.IntN(config.NumKeys)) + op := rng.IntN(100) + + switch { + // Chaotic ops (not tracked for invariants). + case op < 10: + metadatadb.NewScalar[string](ns, "sc-"+key).Set(fmt.Sprintf("val-%d", rng.IntN(1000))) + case op < 15: + metadatadb.NewScalar[string](ns, "sc-"+key).Delete() + case op < 20: + metadatadb.NewInt(ns, "int-"+key).Set(int64(rng.IntN(1000))) + case op < 25: + metadatadb.NewSet[string](ns, "set-"+key).Add(fmt.Sprintf("m-%d", rng.IntN(20))) + case op < 28: + metadatadb.NewSet[string](ns, "set-"+key).Remove(fmt.Sprintf("m-%d", rng.IntN(20))) + case op < 35: + metadatadb.NewMap[string, string](ns, "map-"+key).Set( + fmt.Sprintf("k-%d", rng.IntN(20)), + fmt.Sprintf("v-%d", rng.IntN(1000)), + ) + + // Monotonic ops (tracked for invariant verification). + case op < 50: + delta := int64(rng.IntN(100) + 1) // always positive + metadatadb.NewInt(ns, "mono-int-"+key).Add(delta) + tr.addCounter("mono-int-"+key, delta) + case op < 65: + member := fmt.Sprintf("m-%d", rng.IntN(50)) + metadatadb.NewSet[string](ns, "mono-set-"+key).Add(member) + tr.addSetMember("mono-set-"+key, member) + case op < 75: + metadatadb.NewList[string](ns, "mono-list-"+key).Append(fmt.Sprintf("e-%d", rng.IntN(1000))) + tr.appendList("mono-list-" + key) + + // Reads. + case op < 80: + metadatadb.NewScalar[string](ns, "sc-"+key).Get() + case op < 85: + metadatadb.NewInt(ns, "int-"+key).Get() + case op < 90: + metadatadb.NewSet[string](ns, "set-"+key).Members() + case op < 95: + metadatadb.NewMap[string, string](ns, "map-"+key).Entries() + + // Flushes. + default: + if err := ns.Flush(ctx); err != nil { + atomic.AddInt64(&result.Errors, 1) + } else { + atomic.AddInt64(&result.Flushes, 1) + } + } + atomic.AddInt64(&result.Ops, 1) + } +} + +func verifyMonotonicInvariants(t *testing.T, ns *metadatadb.Namespace, tr *tracker) { + t.Helper() + tr.mu.Lock() + defer tr.mu.Unlock() + + for key, expectedTotal := range tr.counterAdds { + actual := metadatadb.NewInt(ns, key).Get() + assert.Equal(t, expectedTotal, actual, "counter %s mismatch", key) + } + + for key, expectedMembers := range tr.setAdds { + actual := metadatadb.NewSet[string](ns, key).Members() + actualSet := make(map[string]bool, len(actual)) + for _, m := range actual { + actualSet[m] = true + } + for member := range expectedMembers { + assert.True(t, actualSet[member], "set %s missing member %q", key, member) + } + } + + for key, expectedLen := range tr.listAppends { + actual := metadatadb.NewList[string](ns, key).Entries() + assert.Equal(t, expectedLen, len(actual), "list %s length mismatch", key) + } +} + +func logSoakResult(t *testing.T, result *SoakResult) { + t.Helper() + t.Logf("Soak test completed:") + t.Logf(" Duration: %v", result.Duration) + t.Logf(" Ops: %d (%.1f/sec)", result.Ops, float64(result.Ops)/result.Duration.Seconds()) + t.Logf(" Flushes: %d", result.Flushes) + t.Logf(" Errors: %d", result.Errors) +} diff --git a/internal/metadatadb/metadatadbtest/suite.go b/internal/metadatadb/metadatadbtest/suite.go new file mode 100644 index 00000000..3861cc31 --- /dev/null +++ b/internal/metadatadb/metadatadbtest/suite.go @@ -0,0 +1,435 @@ +package metadatadbtest + +import ( + "context" + "encoding/json" + "log/slog" + "testing" + "time" + + "github.com/alecthomas/assert/v2" + + "github.com/block/cachew/internal/logging" + "github.com/block/cachew/internal/metadatadb" +) + +// Suite runs a comprehensive test suite against a metadatadb.Backend implementation. +func Suite(t *testing.T, newBackend func(t *testing.T) metadatadb.Backend) { + t.Run("Scalar", func(t *testing.T) { + testScalar(t, newBackend(t)) + }) + t.Run("ScalarDelete", func(t *testing.T) { + testScalarDelete(t, newBackend(t)) + }) + t.Run("ScalarStruct", func(t *testing.T) { + testScalarStruct(t, newBackend(t)) + }) + t.Run("Int", func(t *testing.T) { + testInt(t, newBackend(t)) + }) + t.Run("IntArithmetic", func(t *testing.T) { + testIntArithmetic(t, newBackend(t)) + }) + t.Run("IntDivByZero", func(t *testing.T) { + testIntDivByZero(t, newBackend(t)) + }) + t.Run("Set", func(t *testing.T) { + testSet(t, newBackend(t)) + }) + t.Run("SetRemove", func(t *testing.T) { + testSetRemove(t, newBackend(t)) + }) + t.Run("Map", func(t *testing.T) { + testMap(t, newBackend(t)) + }) + t.Run("MapDelete", func(t *testing.T) { + testMapDelete(t, newBackend(t)) + }) + t.Run("MapStruct", func(t *testing.T) { + testMapStruct(t, newBackend(t)) + }) + t.Run("IntMap", func(t *testing.T) { + testIntMap(t, newBackend(t)) + }) + t.Run("IntMapIncr", func(t *testing.T) { + testIntMapIncr(t, newBackend(t)) + }) + t.Run("List", func(t *testing.T) { + testList(t, newBackend(t)) + }) + t.Run("NamespaceIsolation", func(t *testing.T) { + testNamespaceIsolation(t, newBackend(t)) + }) + t.Run("FlushPersists", func(t *testing.T) { + testFlushPersists(t, newBackend(t)) + }) + t.Run("FlushRoundTrip", func(t *testing.T) { + testFlushRoundTrip(t, newBackend(t)) + }) + t.Run("TwoStoresSync", func(t *testing.T) { + testTwoStoresSync(t, newBackend(t)) + }) + t.Run("TokenMismatch", func(t *testing.T) { + testTokenMismatch(t, newBackend(t)) + }) +} + +func testContext() context.Context { + return logging.ContextWithLogger(context.Background(), slog.Default()) +} + +func newStore(t *testing.T, backend metadatadb.Backend) *metadatadb.Store { + t.Helper() + cfg := metadatadb.Config{SyncInterval: time.Hour, LockTTL: 5 * time.Second} + s := metadatadb.New(testContext(), cfg, backend) + t.Cleanup(func() { assert.NoError(t, s.Close()) }) + return s +} + +func testScalar(t *testing.T, backend metadatadb.Backend) { + s := newStore(t, backend) + ns := s.Namespace("test") + sc := metadatadb.NewScalar[string](ns, "greeting") + + _, ok := sc.Get() + assert.False(t, ok) + + sc.Set("hello") + v, ok := sc.Get() + assert.True(t, ok) + assert.Equal(t, "hello", v) + + sc.Set("world") + v, ok = sc.Get() + assert.True(t, ok) + assert.Equal(t, "world", v) +} + +func testScalarDelete(t *testing.T, backend metadatadb.Backend) { + s := newStore(t, backend) + ns := s.Namespace("test") + sc := metadatadb.NewScalar[int](ns, "counter") + + sc.Set(42) + v, ok := sc.Get() + assert.True(t, ok) + assert.Equal(t, 42, v) + + sc.Delete() + _, ok = sc.Get() + assert.False(t, ok) +} + +type testConfig struct { + Host string `json:"host"` + Port int `json:"port"` +} + +func testScalarStruct(t *testing.T, backend metadatadb.Backend) { + s := newStore(t, backend) + ns := s.Namespace("test") + sc := metadatadb.NewScalar[testConfig](ns, "config") + + sc.Set(testConfig{Host: "localhost", Port: 8080}) + v, ok := sc.Get() + assert.True(t, ok) + assert.Equal(t, testConfig{Host: "localhost", Port: 8080}, v) +} + +func testInt(t *testing.T, backend metadatadb.Backend) { + s := newStore(t, backend) + ns := s.Namespace("test") + i := metadatadb.NewInt(ns, "counter") + + assert.Equal(t, int64(0), i.Get()) + + i.Set(10) + assert.Equal(t, int64(10), i.Get()) + + i.Add(5) + assert.Equal(t, int64(15), i.Get()) +} + +func testIntArithmetic(t *testing.T, backend metadatadb.Backend) { + s := newStore(t, backend) + ns := s.Namespace("test") + i := metadatadb.NewInt(ns, "val") + + i.Set(100) + i.Mul(3) + assert.Equal(t, int64(300), i.Get()) + + i.Div(4) + assert.Equal(t, int64(75), i.Get()) + + i.Add(-25) + assert.Equal(t, int64(50), i.Get()) +} + +func testIntDivByZero(t *testing.T, backend metadatadb.Backend) { + s := newStore(t, backend) + ns := s.Namespace("test") + i := metadatadb.NewInt(ns, "val") + + i.Set(42) + i.Div(0) + assert.Equal(t, int64(42), i.Get()) +} + +func testSet(t *testing.T, backend metadatadb.Backend) { + s := newStore(t, backend) + ns := s.Namespace("test") + set := metadatadb.NewSet[string](ns, "repos") + + assert.Equal(t, []string(nil), set.Members()) + assert.False(t, set.Contains("a")) + + set.Add("a") + set.Add("b") + set.Add("a") // duplicate + assert.False(t, set.Contains("c")) + + assert.Equal(t, []string{"a", "b"}, set.Members()) +} + +func testSetRemove(t *testing.T, backend metadatadb.Backend) { + s := newStore(t, backend) + ns := s.Namespace("test") + set := metadatadb.NewSet[string](ns, "tags") + + set.Add("x") + set.Add("y") + set.Remove("x") + set.Remove("z") // non-existent is a no-op + + assert.Equal(t, []string{"y"}, set.Members()) +} + +func testMap(t *testing.T, backend metadatadb.Backend) { + s := newStore(t, backend) + ns := s.Namespace("test") + m := metadatadb.NewMap[string, string](ns, "labels") + + _, ok := m.Get("a") + assert.False(t, ok) + assert.Equal(t, []string(nil), m.Keys()) + + m.Set("a", "alpha") + m.Set("b", "beta") + + assert.Equal(t, map[string]string{"a": "alpha", "b": "beta"}, m.Entries()) +} + +func testMapDelete(t *testing.T, backend metadatadb.Backend) { + s := newStore(t, backend) + ns := s.Namespace("test") + m := metadatadb.NewMap[string, int](ns, "ports") + + m.Set("http", 80) + m.Set("https", 443) + m.Delete("http") + + assert.Equal(t, map[string]int{"https": 443}, m.Entries()) +} + +func testMapStruct(t *testing.T, backend metadatadb.Backend) { + s := newStore(t, backend) + ns := s.Namespace("test") + m := metadatadb.NewMap[string, testConfig](ns, "services") + + m.Set("web", testConfig{Host: "web.local", Port: 8080}) + + v, ok := m.Get("web") + assert.True(t, ok) + assert.Equal(t, testConfig{Host: "web.local", Port: 8080}, v) +} + +func testIntMap(t *testing.T, backend metadatadb.Backend) { + s := newStore(t, backend) + ns := s.Namespace("test") + m := metadatadb.NewIntMap[string](ns, "clones") + + assert.Equal(t, int64(0), m.Get("repo-a")) + + m.Set("repo-a", 10) + m.Set("repo-b", 20) + + assert.Equal(t, map[string]int64{"repo-a": 10, "repo-b": 20}, m.Entries()) + + m.Delete("repo-a") + assert.Equal(t, map[string]int64{"repo-b": 20}, m.Entries()) +} + +func testIntMapIncr(t *testing.T, backend metadatadb.Backend) { + s := newStore(t, backend) + ns := s.Namespace("test") + m := metadatadb.NewIntMap[string](ns, "histogram") + + m.Add("repo-a", 1) + m.Add("repo-a", 1) + m.Add("repo-b", 5) + m.Add("repo-a", 3) + + assert.Equal(t, map[string]int64{"repo-a": 5, "repo-b": 5}, m.Entries()) +} + +func testList(t *testing.T, backend metadatadb.Backend) { + s := newStore(t, backend) + ns := s.Namespace("test") + l := metadatadb.NewList[string](ns, "log") + + assert.Equal(t, 0, l.Len()) + assert.Equal(t, []string(nil), l.Entries()) + + l.Append("first") + l.Append("second") + l.Append("third") + + assert.Equal(t, 3, l.Len()) + assert.Equal(t, []string{"first", "second", "third"}, l.Entries()) +} + +func testNamespaceIsolation(t *testing.T, backend metadatadb.Backend) { + s := newStore(t, backend) + ns1 := s.Namespace("ns1") + ns2 := s.Namespace("ns2") + + sc1 := metadatadb.NewScalar[string](ns1, "key") + sc2 := metadatadb.NewScalar[string](ns2, "key") + + sc1.Set("from-ns1") + sc2.Set("from-ns2") + + v1, ok := sc1.Get() + assert.True(t, ok) + assert.Equal(t, "from-ns1", v1) + + v2, ok := sc2.Get() + assert.True(t, ok) + assert.Equal(t, "from-ns2", v2) +} + +func testFlushPersists(t *testing.T, backend metadatadb.Backend) { + ctx := testContext() + s := newStore(t, backend) + ns := s.Namespace("test") + + sc := metadatadb.NewScalar[string](ns, "key") + sc.Set("value") + assert.NoError(t, ns.Flush(ctx)) + + // Create a new store against the same backend — should see the value. + s2 := newStore(t, backend) + ns2 := s2.Namespace("test") + sc2 := metadatadb.NewScalar[string](ns2, "key") + + // Flush to load remote state. + assert.NoError(t, ns2.Flush(ctx)) + + v, ok := sc2.Get() + assert.True(t, ok) + assert.Equal(t, "value", v) +} + +func testFlushRoundTrip(t *testing.T, backend metadatadb.Backend) { + ctx := testContext() + s := newStore(t, backend) + ns := s.Namespace("test") + + // Write all data structure types, flush, then read back from a fresh store. + metadatadb.NewScalar[string](ns, "sc").Set("hello") + metadatadb.NewScalar[testConfig](ns, "cfg").Set(testConfig{Host: "h", Port: 1}) + i := metadatadb.NewInt(ns, "counter") + i.Set(10) + i.Add(5) + set := metadatadb.NewSet[string](ns, "tags") + set.Add("a") + set.Add("b") + m := metadatadb.NewMap[string, int](ns, "ports") + m.Set("http", 80) + im := metadatadb.NewIntMap[string](ns, "clones") + im.Add("repo-a", 3) + im.Add("repo-b", 7) + l := metadatadb.NewList[string](ns, "log") + l.Append("entry1") + l.Append("entry2") + + assert.NoError(t, ns.Flush(ctx)) + + // Fresh store, same backend. + s2 := newStore(t, backend) + ns2 := s2.Namespace("test") + assert.NoError(t, ns2.Flush(ctx)) + + sc, ok := metadatadb.NewScalar[string](ns2, "sc").Get() + assert.True(t, ok) + assert.Equal(t, "hello", sc) + + cfg, ok := metadatadb.NewScalar[testConfig](ns2, "cfg").Get() + assert.True(t, ok) + assert.Equal(t, testConfig{Host: "h", Port: 1}, cfg) + + assert.Equal(t, int64(15), metadatadb.NewInt(ns2, "counter").Get()) + assert.Equal(t, []string{"a", "b"}, metadatadb.NewSet[string](ns2, "tags").Members()) + assert.Equal(t, map[string]int{"http": 80}, metadatadb.NewMap[string, int](ns2, "ports").Entries()) + assert.Equal(t, map[string]int64{"repo-a": 3, "repo-b": 7}, metadatadb.NewIntMap[string](ns2, "clones").Entries()) + assert.Equal(t, []string{"entry1", "entry2"}, metadatadb.NewList[string](ns2, "log").Entries()) +} + +func testTwoStoresSync(t *testing.T, backend metadatadb.Backend) { + ctx := testContext() + s1 := newStore(t, backend) + s2 := newStore(t, backend) + + ns1 := s1.Namespace("shared") + ns2 := s2.Namespace("shared") + + // Store 1 writes and flushes. + metadatadb.NewScalar[string](ns1, "owner").Set("store1") + metadatadb.NewInt(ns1, "counter").Add(10) + assert.NoError(t, ns1.Flush(ctx)) + + // Store 2 loads. + assert.NoError(t, ns2.Flush(ctx)) + + v, ok := metadatadb.NewScalar[string](ns2, "owner").Get() + assert.True(t, ok) + assert.Equal(t, "store1", v) + assert.Equal(t, int64(10), metadatadb.NewInt(ns2, "counter").Get()) + + // Store 2 writes and flushes. + metadatadb.NewInt(ns2, "counter").Add(5) + assert.NoError(t, ns2.Flush(ctx)) + + // Store 1 loads. + assert.NoError(t, ns1.Flush(ctx)) + assert.Equal(t, int64(15), metadatadb.NewInt(ns1, "counter").Get()) +} + +func testTokenMismatch(t *testing.T, backend metadatadb.Backend) { + ctx := testContext() + + // Directly exercise the Backend token semantics. + _, token, err := backend.Load(ctx, "test") + assert.NoError(t, err) + assert.NoError(t, backend.Store(ctx, "test", json.RawMessage(`{"key":"v1"}`), token)) + + // Two readers load the same version. + _, token1, err := backend.Load(ctx, "test") + assert.NoError(t, err) + _, token2, err := backend.Load(ctx, "test") + assert.NoError(t, err) + assert.Equal(t, token1, token2) + + // First writer succeeds. + assert.NoError(t, backend.Store(ctx, "test", json.RawMessage(`{"key":"v2"}`), token1)) + + // Second writer fails — token is stale. + err = backend.Store(ctx, "test", json.RawMessage(`{"key":"v3"}`), token2) + assert.IsError(t, err, metadatadb.ErrInvalidToken) + + // Reload and retry succeeds. + _, token3, err := backend.Load(ctx, "test") + assert.NoError(t, err) + assert.NoError(t, backend.Store(ctx, "test", json.RawMessage(`{"key":"v3"}`), token3)) +} diff --git a/internal/metadatadb/oplog.go b/internal/metadatadb/oplog.go new file mode 100644 index 00000000..65621a06 --- /dev/null +++ b/internal/metadatadb/oplog.go @@ -0,0 +1,314 @@ +package metadatadb + +import ( + "context" + "encoding/json" + "time" + + "github.com/alecthomas/errors" +) + +// Scalar ops + +type scalarSetOp[V any] struct { + name string + value V +} + +func (o *scalarSetOp[V]) apply(state map[string]any) { state[o.name] = o.value } + +type scalarDeleteOp struct{ name string } + +func (o *scalarDeleteOp) apply(state map[string]any) { delete(state, o.name) } + +// Int ops + +type intSetOp struct { + name string + value int64 +} + +func (o *intSetOp) apply(state map[string]any) { state[o.name] = o.value } + +type intAddOp struct { + name string + delta int64 +} + +func (o *intAddOp) apply(state map[string]any) { + state[o.name] = toInt64(state[o.name]) + o.delta +} + +type intMulOp struct { + name string + factor int64 +} + +func (o *intMulOp) apply(state map[string]any) { + state[o.name] = toInt64(state[o.name]) * o.factor +} + +type intDivOp struct { + name string + divisor int64 +} + +func (o *intDivOp) apply(state map[string]any) { + if o.divisor == 0 { + return + } + state[o.name] = toInt64(state[o.name]) / o.divisor +} + +// Set ops — stored as map[string]any keyed by JSON-marshaled members. + +type setAddOp[V comparable] struct { + name string + member V +} + +func (o *setAddOp[V]) apply(state map[string]any) { + m, ok := state[o.name].(map[string]any) + if !ok { + m = make(map[string]any) + state[o.name] = m + } + m[marshalKey(o.member)] = true +} + +type setRemoveOp[V comparable] struct { + name string + member V +} + +func (o *setRemoveOp[V]) apply(state map[string]any) { + if m, ok := state[o.name].(map[string]any); ok { + delete(m, marshalKey(o.member)) + } +} + +// IntMap ops — stored as map[string]any with int64 values. + +type intMapSetOp[K comparable] struct { + name string + key K + value int64 +} + +func (o *intMapSetOp[K]) apply(state map[string]any) { + m, ok := state[o.name].(map[string]any) + if !ok { + m = make(map[string]any) + state[o.name] = m + } + m[marshalKey(o.key)] = o.value +} + +type intMapAddOp[K comparable] struct { + name string + key K + delta int64 +} + +func (o *intMapAddOp[K]) apply(state map[string]any) { + m, ok := state[o.name].(map[string]any) + if !ok { + m = make(map[string]any) + state[o.name] = m + } + k := marshalKey(o.key) + m[k] = toInt64(m[k]) + o.delta +} + +type intMapMulOp[K comparable] struct { + name string + key K + factor int64 +} + +func (o *intMapMulOp[K]) apply(state map[string]any) { + m, ok := state[o.name].(map[string]any) + if !ok { + m = make(map[string]any) + state[o.name] = m + } + k := marshalKey(o.key) + m[k] = toInt64(m[k]) * o.factor +} + +type intMapDivOp[K comparable] struct { + name string + key K + divisor int64 +} + +func (o *intMapDivOp[K]) apply(state map[string]any) { + if o.divisor == 0 { + return + } + m, ok := state[o.name].(map[string]any) + if !ok { + return + } + k := marshalKey(o.key) + m[k] = toInt64(m[k]) / o.divisor +} + +type intMapDeleteOp[K comparable] struct { + name string + key K +} + +func (o *intMapDeleteOp[K]) apply(state map[string]any) { + if m, ok := state[o.name].(map[string]any); ok { + delete(m, marshalKey(o.key)) + } +} + +// Map ops — stored as map[string]any keyed by JSON-marshaled keys. + +type mapSetOp[K comparable, V any] struct { + name string + key K + value V +} + +func (o *mapSetOp[K, V]) apply(state map[string]any) { + m, ok := state[o.name].(map[string]any) + if !ok { + m = make(map[string]any) + state[o.name] = m + } + m[marshalKey(o.key)] = o.value +} + +type mapDeleteOp[K comparable] struct { + name string + key K +} + +func (o *mapDeleteOp[K]) apply(state map[string]any) { + if m, ok := state[o.name].(map[string]any); ok { + delete(m, marshalKey(o.key)) + } +} + +// List ops — stored as []any + +type listAppendOp[V any] struct { + name string + value V +} + +func (o *listAppendOp[V]) apply(state map[string]any) { + s, _ := state[o.name].([]any) + state[o.name] = append(s, any(o.value)) +} + +// Sync + +func (n *Namespace) doSync(ctx context.Context) error { + n.syncMu.Lock() + defer n.syncMu.Unlock() + + n.mu.Lock() + pending := n.pending + n.pending = nil + n.mu.Unlock() + + hasPending := len(pending) > 0 + if hasPending { + if err := n.store.backend.Lock(ctx, n.name); err != nil { + n.restorePending(pending) + return errors.Wrap(err, "lock namespace") + } + defer func() { + if err := n.store.backend.Unlock(ctx, n.name); err != nil { + n.store.logger.WarnContext(ctx, "unlock failed", "namespace", n.name, "error", err) + } + }() + } + + remote, err := n.loadReplayStore(ctx, pending) + if err != nil { + n.restorePending(pending) + return err + } + + n.mu.Lock() + n.state = remote + for _, o := range n.pending { + o.apply(n.state) + } + n.mu.Unlock() + + return nil +} + +const maxTokenRetries = 3 + +// loadReplayStore loads remote state, replays ops, and stores the result. +// Retries the full cycle on ErrInvalidToken. +func (n *Namespace) loadReplayStore(ctx context.Context, pending []op) (map[string]any, error) { + for range maxTokenRetries { + remote, err := n.tryLoadReplayStore(ctx, pending) + if errors.Is(err, ErrInvalidToken) { + continue + } + return remote, err + } + return nil, errors.New("max token retries exceeded") +} + +func (n *Namespace) tryLoadReplayStore(ctx context.Context, pending []op) (map[string]any, error) { + data, token, err := n.store.backend.Load(ctx, n.name) + if err != nil { + return nil, errors.Wrap(err, "load namespace") + } + + remote := make(map[string]any) + if data != nil { + if err := json.Unmarshal(data, &remote); err != nil { + return nil, errors.Wrap(err, "unmarshal state") + } + } + + for _, o := range pending { + o.apply(remote) + } + + if len(pending) > 0 { + merged, err := json.Marshal(remote) + if err != nil { + return nil, errors.Wrap(err, "marshal state") + } + if err := n.store.backend.Store(ctx, n.name, merged, token); err != nil { + return nil, errors.Wrap(err, "store namespace") + } + } + + return remote, nil +} + +func (n *Namespace) restorePending(ops []op) { + n.mu.Lock() + defer n.mu.Unlock() + n.pending = append(ops, n.pending...) +} + +func (n *Namespace) syncLoop() { + defer close(n.done) + logger := n.store.logger.With("namespace", n.name) + ticker := time.NewTicker(n.store.config.SyncInterval) + defer ticker.Stop() + for { + select { + case <-n.store.ctx.Done(): + return + case <-ticker.C: + if err := n.doSync(n.store.ctx); err != nil { + logger.WarnContext(n.store.ctx, "sync failed", "error", err) + } + } + } +} diff --git a/internal/metadatadb/s3.go b/internal/metadatadb/s3.go new file mode 100644 index 00000000..b4f42b3f --- /dev/null +++ b/internal/metadatadb/s3.go @@ -0,0 +1,173 @@ +package metadatadb + +import ( + "bytes" + "context" + "encoding/json" + "log/slog" + "net/http" + "time" + + "github.com/alecthomas/errors" + "github.com/minio/minio-go/v7" +) + +// S3Backend stores metadata state as JSON objects in S3. Locking uses a +// separate lock object with TTL-based expiry for stale lock recovery. The +// idempotence token maps to the S3 object ETag. +type S3Backend struct { + client *minio.Client + logger *slog.Logger + bucket string + prefix string + lockTTL time.Duration +} + +// S3BackendConfig configures the S3 metadata backend. +type S3BackendConfig struct { + Client *minio.Client + Logger *slog.Logger + Bucket string + Prefix string + LockTTL time.Duration +} + +func NewS3Backend(config S3BackendConfig) *S3Backend { + if config.Prefix == "" { + config.Prefix = "_meta" + } + if config.LockTTL == 0 { + config.LockTTL = 30 * time.Second + } + logger := config.Logger + if logger == nil { + logger = slog.Default() + } + return &S3Backend{ + client: config.Client, + logger: logger, + bucket: config.Bucket, + prefix: config.Prefix, + lockTTL: config.LockTTL, + } +} + +func (s *S3Backend) stateKey(namespace string) string { return s.prefix + "/" + namespace + ".json" } +func (s *S3Backend) lockKey(namespace string) string { return s.prefix + "/" + namespace + ".lock" } + +func (s *S3Backend) Load(ctx context.Context, namespace string) (json.RawMessage, string, error) { + obj, err := s.client.GetObject(ctx, s.bucket, s.stateKey(namespace), minio.GetObjectOptions{}) + if err != nil { + return nil, "", errors.Wrap(err, "get object") + } + defer obj.Close() + + info, err := obj.Stat() + if err != nil { + if isNotFound(err) { + return nil, "", nil + } + return nil, "", errors.Wrap(err, "stat object") + } + + var buf bytes.Buffer + buf.Grow(int(info.Size)) + if _, err := buf.ReadFrom(obj); err != nil { + return nil, "", errors.Wrap(err, "read object") + } + + return buf.Bytes(), info.ETag, nil +} + +func (s *S3Backend) Store(ctx context.Context, namespace string, data json.RawMessage, token string) error { + opts := minio.PutObjectOptions{ + ContentType: "application/json", + } + if token != "" { + opts.SetMatchETag(token) + } else { + opts.SetMatchETagExcept("*") + } + + _, err := s.client.PutObject(ctx, s.bucket, s.stateKey(namespace), + bytes.NewReader(data), int64(len(data)), opts) + if err != nil { + if isPreconditionFailed(err) { + return ErrInvalidToken + } + return errors.Wrap(err, "put object") + } + return nil +} + +func (s *S3Backend) Lock(ctx context.Context, namespace string) error { + key := s.lockKey(namespace) + for { + // Try to acquire by writing a lock object with If-None-Match: * + opts := minio.PutObjectOptions{ + UserMetadata: map[string]string{ + "Expires-At": time.Now().Add(s.lockTTL).Format(time.RFC3339), + }, + } + opts.SetMatchETagExcept("*") + + _, err := s.client.PutObject(ctx, s.bucket, key, + bytes.NewReader([]byte("locked")), 6, opts) + if err == nil { + return nil + } + + if !isPreconditionFailed(err) { + return errors.Wrap(err, "acquire lock") + } + + // Lock exists — check if it's stale and remove it. + if err := s.tryExpireStaleLock(ctx, key); err != nil { + s.logger.WarnContext(ctx, "stale lock check failed", "key", key, "error", err) + } else { + continue + } + + select { + case <-ctx.Done(): + return errors.WithStack(ctx.Err()) + case <-time.After(500 * time.Millisecond): + } + } +} + +func (s *S3Backend) Unlock(ctx context.Context, namespace string) error { + err := s.client.RemoveObject(ctx, s.bucket, s.lockKey(namespace), minio.RemoveObjectOptions{}) + if err != nil { + return errors.Wrap(err, "remove lock") + } + return nil +} + +func (s *S3Backend) tryExpireStaleLock(ctx context.Context, key string) error { + info, err := s.client.StatObject(ctx, s.bucket, key, minio.StatObjectOptions{}) + if err != nil { + return errors.Wrap(err, "stat lock") + } + expiresAt, err := time.Parse(time.RFC3339, info.UserMetadata["Expires-At"]) + if err != nil { + return errors.Wrap(err, "parse lock expiry") + } + if time.Now().Before(expiresAt) { + return errors.New("lock not expired") + } + if err := s.client.RemoveObject(ctx, s.bucket, key, minio.RemoveObjectOptions{}); err != nil { + return errors.Wrap(err, "remove stale lock") + } + return nil +} + +func isNotFound(err error) bool { + var resp minio.ErrorResponse + return errors.As(err, &resp) && resp.StatusCode == http.StatusNotFound +} + +func isPreconditionFailed(err error) bool { + var resp minio.ErrorResponse + return errors.As(err, &resp) && resp.StatusCode == http.StatusPreconditionFailed +} diff --git a/internal/metadatadb/s3_test.go b/internal/metadatadb/s3_test.go new file mode 100644 index 00000000..d988f447 --- /dev/null +++ b/internal/metadatadb/s3_test.go @@ -0,0 +1,39 @@ +package metadatadb_test + +import ( + "testing" + "time" + + "github.com/block/cachew/internal/metadatadb" + "github.com/block/cachew/internal/metadatadb/metadatadbtest" + "github.com/block/cachew/internal/minitest" +) + +func TestS3Backend(t *testing.T) { + minitest.Start(t) + + metadatadbtest.Suite(t, func(t *testing.T) metadatadb.Backend { + t.Helper() + return metadatadb.NewS3Backend(metadatadb.S3BackendConfig{ + Client: minitest.Client(t), + Bucket: minitest.Bucket, + Prefix: "_meta-" + t.Name(), + LockTTL: 5 * time.Second, + }) + }) +} + +func TestS3BackendSoak(t *testing.T) { + minitest.Start(t) + + metadatadbtest.Soak(t, metadatadb.NewS3Backend(metadatadb.S3BackendConfig{ + Client: minitest.Client(t), + Bucket: minitest.Bucket, + Prefix: "_meta-soak", + LockTTL: 5 * time.Second, + }), metadatadbtest.SoakConfig{ + Duration: 5 * time.Second, + Concurrency: 4, + NumKeys: 10, + }) +} diff --git a/internal/minitest/minitest.go b/internal/minitest/minitest.go new file mode 100644 index 00000000..e4a8defb --- /dev/null +++ b/internal/minitest/minitest.go @@ -0,0 +1,116 @@ +// Package minitest provides a reusable MinIO test server via Docker. +package minitest + +import ( + "os/exec" + "strings" + "testing" + "time" + + "github.com/alecthomas/assert/v2" + "github.com/alecthomas/errors" + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" +) + +const ( + containerName = "minio-test" + Port = "19000" + Addr = "localhost:" + Port + Username = "minioadmin" + Password = "minioadmin" + Bucket = "test-bucket" +) + +// Start ensures a shared MinIO container is running, creating it if needed. +// The container persists across tests and packages. +func Start(t *testing.T) { + t.Helper() + + t.Setenv("AWS_ACCESS_KEY_ID", Username) + t.Setenv("AWS_SECRET_ACCESS_KEY", Password) + + // If it's already up and healthy, nothing to do. + if isHealthy(t) { + return + } + + // Try to start — if the container already exists (from another parallel + // package), docker run fails but that's fine, we just wait for it. + cmd := exec.CommandContext(t.Context(), "docker", "run", "-d", + "--name", containerName, + "-p", Port+":9000", + "-e", "MINIO_ROOT_USER="+Username, + "-e", "MINIO_ROOT_PASSWORD="+Password, + "minio/minio", "server", "/data", + ) + if output, err := cmd.CombinedOutput(); err != nil { + // Only fatal if the container doesn't exist at all. + if !strings.Contains(string(output), "already in use") { + t.Fatalf("failed to start minio container: %v\n%s", err, output) + } + } + + waitForReady(t) + createBucket(t) +} + +func isHealthy(t *testing.T) bool { + t.Helper() + client := Client(t) + _, err := client.ListBuckets(t.Context()) + return err == nil +} + +// Client returns a minio client connected to the test server. +func Client(t *testing.T) *minio.Client { + t.Helper() + client, err := minio.New(Addr, &minio.Options{ + Creds: credentials.NewStaticV4(Username, Password, ""), + Secure: false, + }) + assert.NoError(t, err) + return client +} + +// CleanBucket removes all objects from the test bucket. +func CleanBucket(t *testing.T) { + t.Helper() + client := Client(t) + for obj := range client.ListObjects(t.Context(), Bucket, minio.ListObjectsOptions{Recursive: true}) { + if obj.Err != nil { + continue + } + if err := client.RemoveObject(t.Context(), Bucket, obj.Key, minio.RemoveObjectOptions{}); err != nil { + t.Logf("failed to remove object %s: %v", obj.Key, err) + } + } +} + +func waitForReady(t *testing.T) { + t.Helper() + client := Client(t) + timeout := time.After(30 * time.Second) + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-timeout: + t.Fatal(errors.New("timed out waiting for minio to start")) + case <-ticker.C: + if _, err := client.ListBuckets(t.Context()); err == nil { + return + } + } + } +} + +func createBucket(t *testing.T) { + t.Helper() + client := Client(t) + exists, err := client.BucketExists(t.Context(), Bucket) + assert.NoError(t, err) + if !exists { + assert.NoError(t, client.MakeBucket(t.Context(), Bucket, minio.MakeBucketOptions{})) + } +}