diff --git a/internal/metadatadb/api.go b/internal/metadatadb/api.go index ca68723..d4db49f 100644 --- a/internal/metadatadb/api.go +++ b/internal/metadatadb/api.go @@ -11,65 +11,45 @@ import ( "log/slog" "slices" "sync" - "time" "github.com/alecthomas/errors" "github.com/block/cachew/internal/logging" ) -// Config controls the metadata store's sync behaviour. -type Config struct { - SyncInterval time.Duration `hcl:"sync-interval,optional" help:"How often to sync with the backend." default:"30s"` - LockTTL time.Duration `hcl:"lock-ttl,optional" help:"TTL for namespace locks." default:"30s"` -} - -// op is an idempotent operation that applies itself to the state. -type op interface { - apply(state map[string]any) -} - -// ErrInvalidToken is returned by Backend.Store when the token does not match -// the current version, indicating a concurrent write. -var ErrInvalidToken = errors.New("invalid token") - -// Backend is the pluggable storage and locking layer. +// Backend is the pluggable storage layer for the metadata store. Implementations +// handle write operations (Apply), read queries (Query), and persistence (Flush). type Backend interface { - // Load returns the current state and an opaque token identifying the - // version. The token must be passed to Store to prevent stale writes. - Load(ctx context.Context, namespace string) (data json.RawMessage, token string, err error) - // Store persists the state. It must return ErrInvalidToken if the token - // does not match the current version (i.e. another writer stored since - // our Load). For S3, the token maps to an ETag with If-Match. - Store(ctx context.Context, namespace string, data json.RawMessage, token string) error - Lock(ctx context.Context, namespace string) error - Unlock(ctx context.Context, namespace string) error + // Apply applies one or more write operations to the given namespace. + Apply(ctx context.Context, namespace string, ops ...Op) error + // Query executes a read query and unmarshals the result into target. + // Target must be a pointer to the expected result type. + Query(ctx context.Context, namespace string, q ReadOp, target any) error + // Flush forces pending state to be persisted for the given namespace. + Flush(ctx context.Context, namespace string) error + // Close shuts down the backend, flushing any pending state. + Close(ctx context.Context) error } // Store is the top-level metadata store. type Store struct { backend Backend - config Config logger *slog.Logger mu sync.Mutex ns map[string]*Namespace - ctx context.Context - cancel context.CancelFunc } -func New(ctx context.Context, config Config, backend Backend) *Store { +// New creates a new metadata store backed by the given Backend. +func New(ctx context.Context, backend Backend) *Store { logger := logging.FromContext(ctx) - ctx, cancel := context.WithCancel(ctx) return &Store{ backend: backend, - config: config, logger: logger.With("component", "metadata"), ns: make(map[string]*Namespace), - ctx: ctx, - cancel: cancel, } } +// Namespace returns the namespace with the given name, creating it if needed. func (s *Store) Namespace(name string) *Namespace { s.mu.Lock() defer s.mu.Unlock() @@ -77,46 +57,35 @@ func (s *Store) Namespace(name string) *Namespace { return ns } ns := &Namespace{ - store: s, - name: name, - state: make(map[string]any), - done: make(chan struct{}), + backend: s.backend, + name: name, } - go ns.syncLoop() s.ns[name] = ns return ns } -func (s *Store) Close() error { - s.cancel() - s.mu.Lock() - defer s.mu.Unlock() - for _, ns := range s.ns { - <-ns.done - } - return nil +// Close shuts down the store and its backend. +func (s *Store) Close(ctx context.Context) error { + return errors.Wrap(s.backend.Close(ctx), "close backend") } -// Namespace is a scoped collection of named data structures. Mutations are -// applied locally immediately and synced periodically to the backend. +// Namespace is a scoped collection of named data structures. type Namespace struct { - store *Store + backend Backend name string - mu sync.RWMutex - state map[string]any - pending []op - syncMu sync.Mutex - done chan struct{} } // Flush forces an immediate sync with the backend. -func (n *Namespace) Flush(ctx context.Context) error { return n.doSync(ctx) } +func (n *Namespace) Flush(ctx context.Context) error { + return errors.Wrap(n.backend.Flush(ctx, n.name), "flush namespace") +} + +func (n *Namespace) apply(ops ...Op) { + _ = n.backend.Apply(context.Background(), n.name, ops...) //nolint:errcheck // local backends never fail +} -func (n *Namespace) apply(o op) { - n.mu.Lock() - defer n.mu.Unlock() - o.apply(n.state) - n.pending = append(n.pending, o) +func (n *Namespace) query(q ReadOp, target any) { + _ = n.backend.Query(context.Background(), n.name, q, target) //nolint:errcheck // local backends never fail } // Scalar is a single value. Last write wins. @@ -130,22 +99,19 @@ func NewScalar[V any](ns *Namespace, name string) *Scalar[V] { return &Scalar[V]{ns: ns, name: name} } -func (s *Scalar[V]) Set(value V) { s.ns.apply(&scalarSetOp[V]{name: s.name, value: value}) } -func (s *Scalar[V]) Delete() { s.ns.apply(&scalarDeleteOp{name: s.name}) } +func (s *Scalar[V]) Set(value V) { s.ns.apply(ScalarSet{Key: s.name, Value: value}) } +func (s *Scalar[V]) Delete() { s.ns.apply(ScalarDelete{Key: s.name}) } func (s *Scalar[V]) Get() (V, bool) { - s.ns.mu.RLock() - defer s.ns.mu.RUnlock() - raw, ok := s.ns.state[s.name] - if !ok { - var zero V - return zero, false + var result struct { + Value V + OK bool } - return jsonRoundTrip[V](raw), true + s.ns.query(ScalarGet{Key: s.name}, &result) + return result.Value, result.OK } -// Int is an integer with arithmetic operations. All ops are applied -// sequentially — the lock serialises flushes. +// Int is an integer with arithmetic operations. type Int struct { ns *Namespace name string @@ -156,19 +122,18 @@ func NewInt(ns *Namespace, name string) *Int { return &Int{ns: ns, name: name} } -func (i *Int) Set(value int64) { i.ns.apply(&intSetOp{name: i.name, value: value}) } -func (i *Int) Add(delta int64) { i.ns.apply(&intAddOp{name: i.name, delta: delta}) } -func (i *Int) Mul(factor int64) { i.ns.apply(&intMulOp{name: i.name, factor: factor}) } -func (i *Int) Div(divisor int64) { i.ns.apply(&intDivOp{name: i.name, divisor: divisor}) } +func (i *Int) Set(value int64) { i.ns.apply(IntSet{Key: i.name, Value: value}) } +func (i *Int) Add(delta int64) { i.ns.apply(IntAdd{Key: i.name, Delta: delta}) } +func (i *Int) Mul(factor int64) { i.ns.apply(IntMul{Key: i.name, Factor: factor}) } +func (i *Int) Div(divisor int64) { i.ns.apply(IntDiv{Key: i.name, Divisor: divisor}) } func (i *Int) Get() int64 { - i.ns.mu.RLock() - defer i.ns.mu.RUnlock() - return toInt64(i.ns.state[i.name]) + var v int64 + i.ns.query(IntGet{Key: i.name}, &v) + return v } // IntMap is a keyed collection of integer values supporting atomic increment. -// Keys are JSON-marshaled to strings internally. type IntMap[K comparable] struct { ns *Namespace name string @@ -180,72 +145,41 @@ func NewIntMap[K comparable](ns *Namespace, name string) *IntMap[K] { } func (m *IntMap[K]) Set(key K, value int64) { - m.ns.apply(&intMapSetOp[K]{name: m.name, key: key, value: value}) + m.ns.apply(IntMapSet{Key: m.name, MapKey: key, Value: value}) } func (m *IntMap[K]) Add(key K, delta int64) { - m.ns.apply(&intMapAddOp[K]{name: m.name, key: key, delta: delta}) + m.ns.apply(IntMapAdd{Key: m.name, MapKey: key, Delta: delta}) } func (m *IntMap[K]) Mul(key K, factor int64) { - m.ns.apply(&intMapMulOp[K]{name: m.name, key: key, factor: factor}) + m.ns.apply(IntMapMul{Key: m.name, MapKey: key, Factor: factor}) } func (m *IntMap[K]) Div(key K, divisor int64) { - m.ns.apply(&intMapDivOp[K]{name: m.name, key: key, divisor: divisor}) + m.ns.apply(IntMapDiv{Key: m.name, MapKey: key, Divisor: divisor}) +} +func (m *IntMap[K]) Delete(key K) { + m.ns.apply(IntMapDelete{Key: m.name, MapKey: key}) } -func (m *IntMap[K]) Delete(key K) { m.ns.apply(&intMapDeleteOp[K]{name: m.name, key: key}) } func (m *IntMap[K]) Get(key K) int64 { - m.ns.mu.RLock() - defer m.ns.mu.RUnlock() - raw, ok := m.ns.state[m.name] - if !ok { - return 0 - } - return toInt64(raw.(map[string]any)[marshalKey(key)]) + var v int64 + m.ns.query(IntMapGet{Key: m.name, MapKey: key}, &v) + return v } func (m *IntMap[K]) Keys() []K { - m.ns.mu.RLock() - defer m.ns.mu.RUnlock() - raw, ok := m.ns.state[m.name] - if !ok { - return nil - } - entries := raw.(map[string]any) - result := make([]K, 0, len(entries)) - for k := range entries { - result = append(result, unmarshalKey[K](k)) - } - slices.SortFunc(result, func(a, b K) int { - ka, kb := marshalKey(a), marshalKey(b) - if ka < kb { - return -1 - } - if ka > kb { - return 1 - } - return 0 - }) + var result []K + m.ns.query(IntMapKeys{Key: m.name}, &result) + slices.SortFunc(result, compareKeys[K]) return result } func (m *IntMap[K]) Entries() map[K]int64 { - m.ns.mu.RLock() - defer m.ns.mu.RUnlock() - raw, ok := m.ns.state[m.name] - if !ok { - return nil - } - entries := raw.(map[string]any) - result := make(map[K]int64, len(entries)) - for k, v := range entries { - result[unmarshalKey[K](k)] = toInt64(v) - } + var result map[K]int64 + m.ns.query(IntMapEntries{Key: m.name}, &result) return result } -// Set is an unordered collection of unique members. Members are stored as -// JSON-marshaled string keys internally, so any comparable JSON-serializable -// type can be used. +// Set is an unordered collection of unique members. type Set[V comparable] struct { ns *Namespace name string @@ -256,64 +190,23 @@ func NewSet[V comparable](ns *Namespace, name string) *Set[V] { return &Set[V]{ns: ns, name: name} } -func (s *Set[V]) Add(member V) { s.ns.apply(&setAddOp[V]{name: s.name, member: member}) } -func (s *Set[V]) Remove(member V) { s.ns.apply(&setRemoveOp[V]{name: s.name, member: member}) } +func (s *Set[V]) Add(member V) { s.ns.apply(SetAdd{Key: s.name, Member: member}) } +func (s *Set[V]) Remove(member V) { s.ns.apply(SetRemove{Key: s.name, Member: member}) } func (s *Set[V]) Contains(member V) bool { - s.ns.mu.RLock() - defer s.ns.mu.RUnlock() - raw, ok := s.ns.state[s.name] - if !ok { - return false - } - _, ok = raw.(map[string]any)[marshalKey(member)] - return ok + var v bool + s.ns.query(SetContains{Key: s.name, Member: member}, &v) + return v } func (s *Set[V]) Members() []V { - s.ns.mu.RLock() - defer s.ns.mu.RUnlock() - raw, ok := s.ns.state[s.name] - if !ok { - return nil - } - m := raw.(map[string]any) - result := make([]V, 0, len(m)) - for k := range m { - result = append(result, unmarshalKey[V](k)) - } - slices.SortFunc(result, func(a, b V) int { - ka, kb := marshalKey(a), marshalKey(b) - if ka < kb { - return -1 - } - if ka > kb { - return 1 - } - return 0 - }) + var result []V + s.ns.query(SetMembers{Key: s.name}, &result) + slices.SortFunc(result, compareKeys[V]) return result } -func marshalKey[V any](v V) string { - data, err := json.Marshal(v) - if err != nil { - panic(fmt.Sprintf("metadatadb: marshal key %T: %v", v, err)) - } - return string(data) -} - -func unmarshalKey[V any](s string) V { - var v V - if err := json.Unmarshal([]byte(s), &v); err != nil { - panic(fmt.Sprintf("metadatadb: unmarshal key into %T: %v", v, err)) - } - return v -} - -// Map is a keyed collection of values. Keys are JSON-marshaled to strings -// internally, so any comparable JSON-serializable type can be used. -// Last write per key wins. +// Map is a keyed collection of values. Last write per key wins. type Map[K comparable, V any] struct { ns *Namespace name string @@ -325,66 +218,32 @@ func NewMap[K comparable, V any](ns *Namespace, name string) *Map[K, V] { } func (m *Map[K, V]) Set(key K, value V) { - m.ns.apply(&mapSetOp[K, V]{name: m.name, key: key, value: value}) + m.ns.apply(MapSet{Key: m.name, MapKey: key, Value: value}) } func (m *Map[K, V]) Delete(key K) { - m.ns.apply(&mapDeleteOp[K]{name: m.name, key: key}) + m.ns.apply(MapDelete{Key: m.name, MapKey: key}) } func (m *Map[K, V]) Get(key K) (V, bool) { - m.ns.mu.RLock() - defer m.ns.mu.RUnlock() - raw, ok := m.ns.state[m.name] - if !ok { - var zero V - return zero, false + var result struct { + Value V + OK bool } - v, ok := raw.(map[string]any)[marshalKey(key)] - if !ok { - var zero V - return zero, false - } - return jsonRoundTrip[V](v), true + m.ns.query(MapGet{Key: m.name, MapKey: key}, &result) + return result.Value, result.OK } func (m *Map[K, V]) Keys() []K { - m.ns.mu.RLock() - defer m.ns.mu.RUnlock() - raw, ok := m.ns.state[m.name] - if !ok { - return nil - } - entries := raw.(map[string]any) - result := make([]K, 0, len(entries)) - for k := range entries { - result = append(result, unmarshalKey[K](k)) - } - slices.SortFunc(result, func(a, b K) int { - ka, kb := marshalKey(a), marshalKey(b) - if ka < kb { - return -1 - } - if ka > kb { - return 1 - } - return 0 - }) + var result []K + m.ns.query(MapKeys{Key: m.name}, &result) + slices.SortFunc(result, compareKeys[K]) return result } func (m *Map[K, V]) Entries() map[K]V { - m.ns.mu.RLock() - defer m.ns.mu.RUnlock() - raw, ok := m.ns.state[m.name] - if !ok { - return nil - } - entries := raw.(map[string]any) - result := make(map[K]V, len(entries)) - for k, v := range entries { - result[unmarshalKey[K](k)] = jsonRoundTrip[V](v) - } + var result map[K]V + m.ns.query(MapEntries{Key: m.name}, &result) return result } @@ -400,56 +259,36 @@ func NewList[V any](ns *Namespace, name string) *List[V] { } func (l *List[V]) Append(value V) { - l.ns.apply(&listAppendOp[V]{name: l.name, value: value}) + l.ns.apply(ListAppend{Key: l.name, Value: value}) } func (l *List[V]) Entries() []V { - l.ns.mu.RLock() - defer l.ns.mu.RUnlock() - raw, ok := l.ns.state[l.name] - if !ok { - return nil - } - src := raw.([]any) - result := make([]V, len(src)) - for i, v := range src { - result[i] = jsonRoundTrip[V](v) - } + var result []V + l.ns.query(ListEntries{Key: l.name}, &result) return result } func (l *List[V]) Len() int { - l.ns.mu.RLock() - defer l.ns.mu.RUnlock() - raw, ok := l.ns.state[l.name] - if !ok { - return 0 - } - return len(raw.([]any)) + var v int + l.ns.query(ListLen{Key: l.name}, &v) + return v } -// jsonRoundTrip marshals v to JSON then unmarshals into V, handling the -// type mismatch between locally-stored Go values and JSON-deserialized values -// after a sync round-trip. -func jsonRoundTrip[V any](v any) V { - data, err := json.Marshal(v) +func compareKeys[K comparable](a, b K) int { + ka, err := json.Marshal(a) if err != nil { - panic(fmt.Sprintf("metadata: marshal %T: %v", v, err)) + panic(fmt.Sprintf("metadatadb: marshal key %T: %v", a, err)) } - var result V - if err := json.Unmarshal(data, &result); err != nil { - panic(fmt.Sprintf("metadata: unmarshal into %T: %v", result, err)) + kb, err := json.Marshal(b) + if err != nil { + panic(fmt.Sprintf("metadatadb: marshal key %T: %v", b, err)) } - return result -} - -func toInt64(v any) int64 { - switch n := v.(type) { - case int64: - return n - case float64: - return int64(n) - default: - return 0 + sa, sb := string(ka), string(kb) + if sa < sb { + return -1 + } + if sa > sb { + return 1 } + return 0 } diff --git a/internal/metadatadb/memory.go b/internal/metadatadb/memory.go index bcd428d..16abe01 100644 --- a/internal/metadatadb/memory.go +++ b/internal/metadatadb/memory.go @@ -3,73 +3,60 @@ package metadatadb import ( "context" "encoding/json" - "strconv" "sync" - "sync/atomic" "github.com/alecthomas/errors" ) // MemoryBackend is an in-memory Backend for testing and single-instance -// deployments. It is safe for concurrent use across multiple Store instances. +// deployments. Ops are applied directly — there is no sync or persistence. type MemoryBackend struct { - mu sync.Mutex - data map[string]json.RawMessage - tokens map[string]string - version atomic.Int64 - locks map[string]chan struct{} + mu sync.RWMutex + state map[string]map[string]any // namespace -> state } func NewMemoryBackend() *MemoryBackend { - return &MemoryBackend{ - data: make(map[string]json.RawMessage), - tokens: make(map[string]string), - locks: make(map[string]chan struct{}), - } -} - -func (m *MemoryBackend) Load(_ context.Context, namespace string) (json.RawMessage, string, error) { - m.mu.Lock() - defer m.mu.Unlock() - return m.data[namespace], m.tokens[namespace], nil + return &MemoryBackend{state: make(map[string]map[string]any)} } -func (m *MemoryBackend) Store(_ context.Context, namespace string, data json.RawMessage, token string) error { +func (m *MemoryBackend) Apply(_ context.Context, namespace string, ops ...Op) error { m.mu.Lock() defer m.mu.Unlock() - if m.tokens[namespace] != token { - return ErrInvalidToken + ns := m.ns(namespace) + for _, o := range ops { + applyOp(ns, o) } - m.data[namespace] = data - m.tokens[namespace] = strconv.FormatInt(m.version.Add(1), 10) return nil } -func (m *MemoryBackend) Lock(ctx context.Context, namespace string) error { - for { - m.mu.Lock() - ch, locked := m.locks[namespace] - if !locked { - m.locks[namespace] = make(chan struct{}) - m.mu.Unlock() - return nil - } - m.mu.Unlock() +func (m *MemoryBackend) Query(_ context.Context, namespace string, q ReadOp, target any) error { + m.mu.RLock() + defer m.mu.RUnlock() + result := queryState(m.ns(namespace), q) + return errors.Wrap(jsonUnmarshalInto(result, target), "memory query") +} - select { - case <-ch: - case <-ctx.Done(): - return errors.WithStack(ctx.Err()) - } +func (m *MemoryBackend) Flush(_ context.Context, _ string) error { return nil } +func (m *MemoryBackend) Close(_ context.Context) error { return nil } + +func (m *MemoryBackend) ns(namespace string) map[string]any { + ns, ok := m.state[namespace] + if !ok { + ns = make(map[string]any) + m.state[namespace] = ns } + return ns } -func (m *MemoryBackend) Unlock(_ context.Context, namespace string) error { - m.mu.Lock() - defer m.mu.Unlock() - if ch, ok := m.locks[namespace]; ok { - close(ch) - delete(m.locks, namespace) +// jsonUnmarshalInto marshals src to JSON then unmarshals into target, +// bridging between the internal any-typed state and the caller's typed pointer. +func jsonUnmarshalInto(src any, target any) error { + if src == nil { + return nil } - return nil + data, err := json.Marshal(src) + if err != nil { + return errors.Wrap(err, "marshal") + } + return errors.Wrap(json.Unmarshal(data, target), "unmarshal") } diff --git a/internal/metadatadb/memory_test.go b/internal/metadatadb/memory_test.go index f726fa8..02b5f4d 100644 --- a/internal/metadatadb/memory_test.go +++ b/internal/metadatadb/memory_test.go @@ -2,23 +2,19 @@ package metadatadb_test import ( "testing" - "time" "github.com/block/cachew/internal/metadatadb" "github.com/block/cachew/internal/metadatadb/metadatadbtest" ) func TestMemoryBackend(t *testing.T) { - metadatadbtest.Suite(t, func(t *testing.T) metadatadb.Backend { + metadatadbtest.Suite(t, func(t *testing.T, n int) []metadatadb.Backend { t.Helper() - return metadatadb.NewMemoryBackend() - }) -} - -func TestMemoryBackendSoak(t *testing.T) { - metadatadbtest.Soak(t, metadatadb.NewMemoryBackend(), metadatadbtest.SoakConfig{ - Duration: 5 * time.Second, - Concurrency: 4, - NumKeys: 20, + backend := metadatadb.NewMemoryBackend() + backends := make([]metadatadb.Backend, n) + for i := range backends { + backends[i] = backend + } + return backends }) } diff --git a/internal/metadatadb/metadatadbtest/soak.go b/internal/metadatadb/metadatadbtest/soak.go index 08c5ce2..7c36ff8 100644 --- a/internal/metadatadb/metadatadbtest/soak.go +++ b/internal/metadatadb/metadatadbtest/soak.go @@ -97,9 +97,8 @@ func Soak(t *testing.T, backend metadatadb.Backend, config SoakConfig) SoakResul ctx, cancel := context.WithTimeout(ctx, config.Duration+time.Minute) defer cancel() - cfg := metadatadb.Config{SyncInterval: time.Hour, LockTTL: 5 * time.Second} - store := metadatadb.New(ctx, cfg, backend) - t.Cleanup(func() { assert.NoError(t, store.Close()) }) + store := metadatadb.New(ctx, backend) + t.Cleanup(func() { assert.NoError(t, store.Close(ctx)) }) ns := store.Namespace("soak") tr := newTracker() diff --git a/internal/metadatadb/metadatadbtest/suite.go b/internal/metadatadb/metadatadbtest/suite.go index 3861cc3..4698a8e 100644 --- a/internal/metadatadb/metadatadbtest/suite.go +++ b/internal/metadatadb/metadatadbtest/suite.go @@ -2,10 +2,8 @@ package metadatadbtest import ( "context" - "encoding/json" "log/slog" "testing" - "time" "github.com/alecthomas/assert/v2" @@ -14,63 +12,67 @@ import ( ) // Suite runs a comprehensive test suite against a metadatadb.Backend implementation. -func Suite(t *testing.T, newBackend func(t *testing.T) metadatadb.Backend) { +// The factory must return n backends that share the same underlying storage. +func Suite(t *testing.T, newBackends func(t *testing.T, n int) []metadatadb.Backend) { + one := func(t *testing.T) metadatadb.Backend { return newBackends(t, 1)[0] } + t.Run("Scalar", func(t *testing.T) { - testScalar(t, newBackend(t)) + testScalar(t, one(t)) }) t.Run("ScalarDelete", func(t *testing.T) { - testScalarDelete(t, newBackend(t)) + testScalarDelete(t, one(t)) }) t.Run("ScalarStruct", func(t *testing.T) { - testScalarStruct(t, newBackend(t)) + testScalarStruct(t, one(t)) }) t.Run("Int", func(t *testing.T) { - testInt(t, newBackend(t)) + testInt(t, one(t)) }) t.Run("IntArithmetic", func(t *testing.T) { - testIntArithmetic(t, newBackend(t)) + testIntArithmetic(t, one(t)) }) t.Run("IntDivByZero", func(t *testing.T) { - testIntDivByZero(t, newBackend(t)) + testIntDivByZero(t, one(t)) }) t.Run("Set", func(t *testing.T) { - testSet(t, newBackend(t)) + testSet(t, one(t)) }) t.Run("SetRemove", func(t *testing.T) { - testSetRemove(t, newBackend(t)) + testSetRemove(t, one(t)) }) t.Run("Map", func(t *testing.T) { - testMap(t, newBackend(t)) + testMap(t, one(t)) }) t.Run("MapDelete", func(t *testing.T) { - testMapDelete(t, newBackend(t)) + testMapDelete(t, one(t)) }) t.Run("MapStruct", func(t *testing.T) { - testMapStruct(t, newBackend(t)) + testMapStruct(t, one(t)) }) t.Run("IntMap", func(t *testing.T) { - testIntMap(t, newBackend(t)) + testIntMap(t, one(t)) }) t.Run("IntMapIncr", func(t *testing.T) { - testIntMapIncr(t, newBackend(t)) + testIntMapIncr(t, one(t)) }) t.Run("List", func(t *testing.T) { - testList(t, newBackend(t)) + testList(t, one(t)) }) t.Run("NamespaceIsolation", func(t *testing.T) { - testNamespaceIsolation(t, newBackend(t)) + testNamespaceIsolation(t, one(t)) }) t.Run("FlushPersists", func(t *testing.T) { - testFlushPersists(t, newBackend(t)) + testFlushPersists(t, one(t)) }) t.Run("FlushRoundTrip", func(t *testing.T) { - testFlushRoundTrip(t, newBackend(t)) + testFlushRoundTrip(t, one(t)) }) t.Run("TwoStoresSync", func(t *testing.T) { - testTwoStoresSync(t, newBackend(t)) + testTwoStoresSync(t, one(t)) }) - t.Run("TokenMismatch", func(t *testing.T) { - testTokenMismatch(t, newBackend(t)) + t.Run("TwoBackendsConcurrentOps", func(t *testing.T) { + backends := newBackends(t, 2) + testTwoBackendsConcurrentOps(t, backends[0], backends[1]) }) } @@ -80,9 +82,8 @@ func testContext() context.Context { func newStore(t *testing.T, backend metadatadb.Backend) *metadatadb.Store { t.Helper() - cfg := metadatadb.Config{SyncInterval: time.Hour, LockTTL: 5 * time.Second} - s := metadatadb.New(testContext(), cfg, backend) - t.Cleanup(func() { assert.NoError(t, s.Close()) }) + s := metadatadb.New(testContext(), backend) + t.Cleanup(func() { assert.NoError(t, s.Close(testContext())) }) return s } @@ -406,30 +407,29 @@ func testTwoStoresSync(t *testing.T, backend metadatadb.Backend) { assert.Equal(t, int64(15), metadatadb.NewInt(ns1, "counter").Get()) } -func testTokenMismatch(t *testing.T, backend metadatadb.Backend) { +func testTwoBackendsConcurrentOps(t *testing.T, b1, b2 metadatadb.Backend) { ctx := testContext() + s1 := metadatadb.New(ctx, b1) + t.Cleanup(func() { assert.NoError(t, s1.Close(ctx)) }) + s2 := metadatadb.New(ctx, b2) + t.Cleanup(func() { assert.NoError(t, s2.Close(ctx)) }) + + ns1 := s1.Namespace("shared") + ns2 := s2.Namespace("shared") + + // Both backends apply ops locally before either flushes. + metadatadb.NewInt(ns1, "val").Add(10) + metadatadb.NewInt(ns2, "val").Add(-5) + + // Backend 1 flushes first — remote becomes 10. + assert.NoError(t, ns1.Flush(ctx)) + + // Backend 2 flushes — replays Add(-5) onto remote 10 = 5. + assert.NoError(t, ns2.Flush(ctx)) + + // Backend 1 reloads to pick up backend 2's contribution. + assert.NoError(t, ns1.Flush(ctx)) - // Directly exercise the Backend token semantics. - _, token, err := backend.Load(ctx, "test") - assert.NoError(t, err) - assert.NoError(t, backend.Store(ctx, "test", json.RawMessage(`{"key":"v1"}`), token)) - - // Two readers load the same version. - _, token1, err := backend.Load(ctx, "test") - assert.NoError(t, err) - _, token2, err := backend.Load(ctx, "test") - assert.NoError(t, err) - assert.Equal(t, token1, token2) - - // First writer succeeds. - assert.NoError(t, backend.Store(ctx, "test", json.RawMessage(`{"key":"v2"}`), token1)) - - // Second writer fails — token is stale. - err = backend.Store(ctx, "test", json.RawMessage(`{"key":"v3"}`), token2) - assert.IsError(t, err, metadatadb.ErrInvalidToken) - - // Reload and retry succeeds. - _, token3, err := backend.Load(ctx, "test") - assert.NoError(t, err) - assert.NoError(t, backend.Store(ctx, "test", json.RawMessage(`{"key":"v3"}`), token3)) + assert.Equal(t, int64(5), metadatadb.NewInt(ns1, "val").Get()) + assert.Equal(t, int64(5), metadatadb.NewInt(ns2, "val").Get()) } diff --git a/internal/metadatadb/oplog.go b/internal/metadatadb/oplog.go deleted file mode 100644 index 65621a0..0000000 --- a/internal/metadatadb/oplog.go +++ /dev/null @@ -1,314 +0,0 @@ -package metadatadb - -import ( - "context" - "encoding/json" - "time" - - "github.com/alecthomas/errors" -) - -// Scalar ops - -type scalarSetOp[V any] struct { - name string - value V -} - -func (o *scalarSetOp[V]) apply(state map[string]any) { state[o.name] = o.value } - -type scalarDeleteOp struct{ name string } - -func (o *scalarDeleteOp) apply(state map[string]any) { delete(state, o.name) } - -// Int ops - -type intSetOp struct { - name string - value int64 -} - -func (o *intSetOp) apply(state map[string]any) { state[o.name] = o.value } - -type intAddOp struct { - name string - delta int64 -} - -func (o *intAddOp) apply(state map[string]any) { - state[o.name] = toInt64(state[o.name]) + o.delta -} - -type intMulOp struct { - name string - factor int64 -} - -func (o *intMulOp) apply(state map[string]any) { - state[o.name] = toInt64(state[o.name]) * o.factor -} - -type intDivOp struct { - name string - divisor int64 -} - -func (o *intDivOp) apply(state map[string]any) { - if o.divisor == 0 { - return - } - state[o.name] = toInt64(state[o.name]) / o.divisor -} - -// Set ops — stored as map[string]any keyed by JSON-marshaled members. - -type setAddOp[V comparable] struct { - name string - member V -} - -func (o *setAddOp[V]) apply(state map[string]any) { - m, ok := state[o.name].(map[string]any) - if !ok { - m = make(map[string]any) - state[o.name] = m - } - m[marshalKey(o.member)] = true -} - -type setRemoveOp[V comparable] struct { - name string - member V -} - -func (o *setRemoveOp[V]) apply(state map[string]any) { - if m, ok := state[o.name].(map[string]any); ok { - delete(m, marshalKey(o.member)) - } -} - -// IntMap ops — stored as map[string]any with int64 values. - -type intMapSetOp[K comparable] struct { - name string - key K - value int64 -} - -func (o *intMapSetOp[K]) apply(state map[string]any) { - m, ok := state[o.name].(map[string]any) - if !ok { - m = make(map[string]any) - state[o.name] = m - } - m[marshalKey(o.key)] = o.value -} - -type intMapAddOp[K comparable] struct { - name string - key K - delta int64 -} - -func (o *intMapAddOp[K]) apply(state map[string]any) { - m, ok := state[o.name].(map[string]any) - if !ok { - m = make(map[string]any) - state[o.name] = m - } - k := marshalKey(o.key) - m[k] = toInt64(m[k]) + o.delta -} - -type intMapMulOp[K comparable] struct { - name string - key K - factor int64 -} - -func (o *intMapMulOp[K]) apply(state map[string]any) { - m, ok := state[o.name].(map[string]any) - if !ok { - m = make(map[string]any) - state[o.name] = m - } - k := marshalKey(o.key) - m[k] = toInt64(m[k]) * o.factor -} - -type intMapDivOp[K comparable] struct { - name string - key K - divisor int64 -} - -func (o *intMapDivOp[K]) apply(state map[string]any) { - if o.divisor == 0 { - return - } - m, ok := state[o.name].(map[string]any) - if !ok { - return - } - k := marshalKey(o.key) - m[k] = toInt64(m[k]) / o.divisor -} - -type intMapDeleteOp[K comparable] struct { - name string - key K -} - -func (o *intMapDeleteOp[K]) apply(state map[string]any) { - if m, ok := state[o.name].(map[string]any); ok { - delete(m, marshalKey(o.key)) - } -} - -// Map ops — stored as map[string]any keyed by JSON-marshaled keys. - -type mapSetOp[K comparable, V any] struct { - name string - key K - value V -} - -func (o *mapSetOp[K, V]) apply(state map[string]any) { - m, ok := state[o.name].(map[string]any) - if !ok { - m = make(map[string]any) - state[o.name] = m - } - m[marshalKey(o.key)] = o.value -} - -type mapDeleteOp[K comparable] struct { - name string - key K -} - -func (o *mapDeleteOp[K]) apply(state map[string]any) { - if m, ok := state[o.name].(map[string]any); ok { - delete(m, marshalKey(o.key)) - } -} - -// List ops — stored as []any - -type listAppendOp[V any] struct { - name string - value V -} - -func (o *listAppendOp[V]) apply(state map[string]any) { - s, _ := state[o.name].([]any) - state[o.name] = append(s, any(o.value)) -} - -// Sync - -func (n *Namespace) doSync(ctx context.Context) error { - n.syncMu.Lock() - defer n.syncMu.Unlock() - - n.mu.Lock() - pending := n.pending - n.pending = nil - n.mu.Unlock() - - hasPending := len(pending) > 0 - if hasPending { - if err := n.store.backend.Lock(ctx, n.name); err != nil { - n.restorePending(pending) - return errors.Wrap(err, "lock namespace") - } - defer func() { - if err := n.store.backend.Unlock(ctx, n.name); err != nil { - n.store.logger.WarnContext(ctx, "unlock failed", "namespace", n.name, "error", err) - } - }() - } - - remote, err := n.loadReplayStore(ctx, pending) - if err != nil { - n.restorePending(pending) - return err - } - - n.mu.Lock() - n.state = remote - for _, o := range n.pending { - o.apply(n.state) - } - n.mu.Unlock() - - return nil -} - -const maxTokenRetries = 3 - -// loadReplayStore loads remote state, replays ops, and stores the result. -// Retries the full cycle on ErrInvalidToken. -func (n *Namespace) loadReplayStore(ctx context.Context, pending []op) (map[string]any, error) { - for range maxTokenRetries { - remote, err := n.tryLoadReplayStore(ctx, pending) - if errors.Is(err, ErrInvalidToken) { - continue - } - return remote, err - } - return nil, errors.New("max token retries exceeded") -} - -func (n *Namespace) tryLoadReplayStore(ctx context.Context, pending []op) (map[string]any, error) { - data, token, err := n.store.backend.Load(ctx, n.name) - if err != nil { - return nil, errors.Wrap(err, "load namespace") - } - - remote := make(map[string]any) - if data != nil { - if err := json.Unmarshal(data, &remote); err != nil { - return nil, errors.Wrap(err, "unmarshal state") - } - } - - for _, o := range pending { - o.apply(remote) - } - - if len(pending) > 0 { - merged, err := json.Marshal(remote) - if err != nil { - return nil, errors.Wrap(err, "marshal state") - } - if err := n.store.backend.Store(ctx, n.name, merged, token); err != nil { - return nil, errors.Wrap(err, "store namespace") - } - } - - return remote, nil -} - -func (n *Namespace) restorePending(ops []op) { - n.mu.Lock() - defer n.mu.Unlock() - n.pending = append(ops, n.pending...) -} - -func (n *Namespace) syncLoop() { - defer close(n.done) - logger := n.store.logger.With("namespace", n.name) - ticker := time.NewTicker(n.store.config.SyncInterval) - defer ticker.Stop() - for { - select { - case <-n.store.ctx.Done(): - return - case <-ticker.C: - if err := n.doSync(n.store.ctx); err != nil { - logger.WarnContext(n.store.ctx, "sync failed", "error", err) - } - } - } -} diff --git a/internal/metadatadb/ops.go b/internal/metadatadb/ops.go new file mode 100644 index 0000000..6cc4607 --- /dev/null +++ b/internal/metadatadb/ops.go @@ -0,0 +1,464 @@ +package metadatadb + +import ( + "encoding/json" + "fmt" + + "github.com/alecthomas/errors" +) + +// errInvalidToken is returned when an optimistic concurrency token does not +// match the current version, indicating a concurrent write. +var errInvalidToken = errors.New("invalid token") + +// Op is a write operation applied to the metadata store. Concrete types form a +// closed set (sum type) — backends handle each variant via exhaustive type switch. +// +//sumtype:decl +type Op interface { + op() // private marker +} + +// ReadOp is a read query against the metadata store. Like Op, concrete types +// form a closed set dispatched by the backend. +// +//sumtype:decl +type ReadOp interface { + readOp() // private marker +} + +// Scalar ops + +// ScalarSet sets a scalar value. Last write wins. +type ScalarSet struct { + Key string + Value any +} + +func (ScalarSet) op() {} + +// ScalarDelete removes a scalar value. +type ScalarDelete struct{ Key string } + +func (ScalarDelete) op() {} + +// ScalarGet reads a scalar value. Returns (value, true) or (nil, false). +type ScalarGet struct{ Key string } + +func (ScalarGet) readOp() {} + +// Int ops + +// IntSet sets an integer to an exact value. +type IntSet struct { + Key string + Value int64 +} + +func (IntSet) op() {} + +// IntAdd adds a delta to an integer. +type IntAdd struct { + Key string + Delta int64 +} + +func (IntAdd) op() {} + +// IntMul multiplies an integer by a factor. +type IntMul struct { + Key string + Factor int64 +} + +func (IntMul) op() {} + +// IntDiv divides an integer by a divisor. Zero divisor is a no-op. +type IntDiv struct { + Key string + Divisor int64 +} + +func (IntDiv) op() {} + +// IntGet reads an integer value. Returns int64 (zero if absent). +type IntGet struct{ Key string } + +func (IntGet) readOp() {} + +// Set ops + +// SetAdd adds a member to a set. Idempotent. +type SetAdd struct { + Key string + Member any +} + +func (SetAdd) op() {} + +// SetRemove removes a member from a set. No-op if absent. +type SetRemove struct { + Key string + Member any +} + +func (SetRemove) op() {} + +// SetContains checks whether a member exists in a set. Returns bool. +type SetContains struct { + Key string + Member any +} + +func (SetContains) readOp() {} + +// SetMembers returns all set members. +type SetMembers struct{ Key string } + +func (SetMembers) readOp() {} + +// IntMap ops + +// IntMapSet sets a keyed integer to an exact value. +type IntMapSet struct { + Key string + MapKey any + Value int64 +} + +func (IntMapSet) op() {} + +// IntMapAdd adds a delta to a keyed integer. +type IntMapAdd struct { + Key string + MapKey any + Delta int64 +} + +func (IntMapAdd) op() {} + +// IntMapMul multiplies a keyed integer by a factor. +type IntMapMul struct { + Key string + MapKey any + Factor int64 +} + +func (IntMapMul) op() {} + +// IntMapDiv divides a keyed integer by a divisor. Zero divisor is a no-op. +type IntMapDiv struct { + Key string + MapKey any + Divisor int64 +} + +func (IntMapDiv) op() {} + +// IntMapDelete removes a key from an integer map. +type IntMapDelete struct { + Key string + MapKey any +} + +func (IntMapDelete) op() {} + +// IntMapGet reads a keyed integer value. Returns int64 (zero if absent). +type IntMapGet struct { + Key string + MapKey any +} + +func (IntMapGet) readOp() {} + +// IntMapKeys returns all keys in an integer map. +type IntMapKeys struct{ Key string } + +func (IntMapKeys) readOp() {} + +// IntMapEntries returns all entries in an integer map. +type IntMapEntries struct{ Key string } + +func (IntMapEntries) readOp() {} + +// Map ops + +// MapSet sets a keyed value in a map. Last write per key wins. +type MapSet struct { + Key string + MapKey any + Value any +} + +func (MapSet) op() {} + +// MapDelete removes a key from a map. +type MapDelete struct { + Key string + MapKey any +} + +func (MapDelete) op() {} + +// MapGet reads a keyed value from a map. Returns (value, true) or (nil, false). +type MapGet struct { + Key string + MapKey any +} + +func (MapGet) readOp() {} + +// MapKeys returns all keys in a map. +type MapKeys struct{ Key string } + +func (MapKeys) readOp() {} + +// MapEntries returns all entries in a map. +type MapEntries struct{ Key string } + +func (MapEntries) readOp() {} + +// List ops + +// ListAppend appends a value to a list. +type ListAppend struct { + Key string + Value any +} + +func (ListAppend) op() {} + +// ListEntries returns all list entries as []any. +type ListEntries struct{ Key string } + +func (ListEntries) readOp() {} + +// ListLen returns the length of a list as int. +type ListLen struct{ Key string } + +func (ListLen) readOp() {} + +// applyOp applies a single write Op to the in-memory state via exhaustive type switch. +func applyOp(state map[string]any, o Op) { //nolint:funlen + switch o := o.(type) { + case ScalarSet: + state[o.Key] = o.Value + case ScalarDelete: + delete(state, o.Key) + + case IntSet: + state[o.Key] = o.Value + case IntAdd: + state[o.Key] = toInt64(state[o.Key]) + o.Delta + case IntMul: + state[o.Key] = toInt64(state[o.Key]) * o.Factor + case IntDiv: + if o.Divisor != 0 { + state[o.Key] = toInt64(state[o.Key]) / o.Divisor + } + + case SetAdd: + m, ok := state[o.Key].(map[string]any) + if !ok { + m = make(map[string]any) + state[o.Key] = m + } + m[marshalKey(o.Member)] = true + case SetRemove: + if m, ok := state[o.Key].(map[string]any); ok { + delete(m, marshalKey(o.Member)) + } + + case IntMapSet: + m := getOrCreateSubmap(state, o.Key) + m[marshalKey(o.MapKey)] = o.Value + case IntMapAdd: + m := getOrCreateSubmap(state, o.Key) + k := marshalKey(o.MapKey) + m[k] = toInt64(m[k]) + o.Delta + case IntMapMul: + m := getOrCreateSubmap(state, o.Key) + k := marshalKey(o.MapKey) + m[k] = toInt64(m[k]) * o.Factor + case IntMapDiv: + if o.Divisor == 0 { + return + } + m, ok := state[o.Key].(map[string]any) + if !ok { + return + } + k := marshalKey(o.MapKey) + m[k] = toInt64(m[k]) / o.Divisor + case IntMapDelete: + if m, ok := state[o.Key].(map[string]any); ok { + delete(m, marshalKey(o.MapKey)) + } + + case MapSet: + m := getOrCreateSubmap(state, o.Key) + m[marshalKey(o.MapKey)] = o.Value + case MapDelete: + if m, ok := state[o.Key].(map[string]any); ok { + delete(m, marshalKey(o.MapKey)) + } + + case ListAppend: + s, _ := state[o.Key].([]any) + state[o.Key] = append(s, o.Value) + + default: + panic(fmt.Sprintf("metadatadb: unhandled op type %T", o)) + } +} + +// decodeMapKeys converts a map with JSON-encoded string keys into a map +// whose keys are the decoded JSON values. This produces a map that +// json.Marshal will encode correctly for the caller's target type. +func decodeMapKeys(m map[string]any) map[string]any { + result := make(map[string]any, len(m)) + for k, v := range m { + // k is a JSON-encoded key (e.g. "\"hello\"" for string "hello", + // "42" for int 42). We need to decode it so that when json.Marshal + // re-encodes the result map, the keys come out correctly. + var decoded any + if err := json.Unmarshal([]byte(k), &decoded); err != nil { + result[k] = v + continue + } + // For string-keyed maps, decoded is a string — use it directly. + // For numeric keys, fmt.Sprint produces "42" which json.Marshal + // will use as the map key string. + result[fmt.Sprint(decoded)] = v + } + return result +} + +func marshalKey(v any) string { + data, err := json.Marshal(v) + if err != nil { + panic(fmt.Sprintf("metadatadb: marshal key %T: %v", v, err)) + } + return string(data) +} + +func toInt64(v any) int64 { + switch n := v.(type) { + case int64: + return n + case float64: + return int64(n) + default: + return 0 + } +} + +func getOrCreateSubmap(state map[string]any, key string) map[string]any { + m, ok := state[key].(map[string]any) + if !ok { + m = make(map[string]any) + state[key] = m + } + return m +} + +// queryResult is used to return optional values from queryState. +type queryResult struct { + Value any + OK bool +} + +// queryState executes a ReadOp against the in-memory state via exhaustive type switch. +func queryState(state map[string]any, q ReadOp) any { //nolint:funlen + switch q := q.(type) { + case ScalarGet: + raw, ok := state[q.Key] + if !ok { + return nil + } + return queryResult{Value: raw, OK: true} + + case IntGet: + return toInt64(state[q.Key]) + + case SetContains: + m, ok := state[q.Key].(map[string]any) + if !ok { + return false + } + _, ok = m[marshalKey(q.Member)] + return ok + case SetMembers: + raw, ok := state[q.Key].(map[string]any) + if !ok { + return nil + } + members := make([]any, 0, len(raw)) + for k := range raw { + members = append(members, json.RawMessage(k)) + } + return members + + case IntMapGet: + m, ok := state[q.Key].(map[string]any) + if !ok { + return int64(0) + } + return toInt64(m[marshalKey(q.MapKey)]) + case IntMapKeys: + raw, ok := state[q.Key].(map[string]any) + if !ok { + return nil + } + keys := make([]any, 0, len(raw)) + for k := range raw { + keys = append(keys, json.RawMessage(k)) + } + return keys + case IntMapEntries: + raw, ok := state[q.Key].(map[string]any) + if !ok { + return nil + } + return decodeMapKeys(raw) + + case MapGet: + m, ok := state[q.Key].(map[string]any) + if !ok { + return nil + } + v, ok := m[marshalKey(q.MapKey)] + if !ok { + return nil + } + return queryResult{Value: v, OK: true} + case MapKeys: + raw, ok := state[q.Key].(map[string]any) + if !ok { + return nil + } + keys := make([]any, 0, len(raw)) + for k := range raw { + keys = append(keys, json.RawMessage(k)) + } + return keys + case MapEntries: + raw, ok := state[q.Key].(map[string]any) + if !ok { + return nil + } + return decodeMapKeys(raw) + + case ListEntries: + return state[q.Key] + case ListLen: + raw, ok := state[q.Key].([]any) + if !ok { + return 0 + } + return len(raw) + + default: + panic(fmt.Sprintf("metadatadb: unhandled query type %T", q)) + } +} diff --git a/internal/metadatadb/s3.go b/internal/metadatadb/s3.go index af962a9..5173010 100644 --- a/internal/metadatadb/s3.go +++ b/internal/metadatadb/s3.go @@ -6,30 +6,38 @@ import ( "encoding/json" "log/slog" "net/http" + "sync" "time" "github.com/alecthomas/errors" "github.com/minio/minio-go/v7" ) -// S3Backend stores metadata state as JSON objects in S3. Locking uses a -// separate lock object with TTL-based expiry for stale lock recovery. The -// idempotence token maps to the S3 object ETag. +// S3Backend stores metadata state as JSON objects in S3 with periodic sync. +// Writes are applied to local state immediately and queued for the next flush. +// Locking uses a separate lock object with TTL-based expiry for stale lock +// recovery. The idempotence token maps to the S3 object ETag. type S3Backend struct { - client *minio.Client - logger *slog.Logger - bucket string - prefix string - lockTTL time.Duration + client *minio.Client + logger *slog.Logger + bucket string + prefix string + lockTTL time.Duration + syncInterval time.Duration + mu sync.Mutex + ns map[string]*s3Namespace + ctx context.Context + cancel context.CancelFunc } // S3BackendConfig configures the S3 metadata backend. type S3BackendConfig struct { - Client *minio.Client - Logger *slog.Logger - Bucket string - Prefix string - LockTTL time.Duration + Client *minio.Client + Logger *slog.Logger + Bucket string + Prefix string + LockTTL time.Duration + SyncInterval time.Duration } func NewS3Backend(ctx context.Context, config S3BackendConfig) (*S3Backend, error) { @@ -39,6 +47,9 @@ func NewS3Backend(ctx context.Context, config S3BackendConfig) (*S3Backend, erro if config.LockTTL == 0 { config.LockTTL = 30 * time.Second } + if config.SyncInterval == 0 { + config.SyncInterval = 30 * time.Second + } logger := config.Logger if logger == nil { logger = slog.Default() @@ -51,19 +62,78 @@ func NewS3Backend(ctx context.Context, config S3BackendConfig) (*S3Backend, erro return nil, errors.Errorf("bucket %s does not exist", config.Bucket) } + ctx, cancel := context.WithCancel(ctx) return &S3Backend{ - client: config.Client, - logger: logger, - bucket: config.Bucket, - prefix: config.Prefix, - lockTTL: config.LockTTL, + client: config.Client, + logger: logger, + bucket: config.Bucket, + prefix: config.Prefix, + lockTTL: config.LockTTL, + syncInterval: config.SyncInterval, + ns: make(map[string]*s3Namespace), + ctx: ctx, + cancel: cancel, }, nil } +func (s *S3Backend) namespace(name string) *s3Namespace { + s.mu.Lock() + defer s.mu.Unlock() + if ns, ok := s.ns[name]; ok { + return ns + } + ns := &s3Namespace{ + backend: s, + name: name, + state: make(map[string]any), + done: make(chan struct{}), + } + go ns.syncLoop() + s.ns[name] = ns + return ns +} + +func (s *S3Backend) Apply(_ context.Context, namespace string, ops ...Op) error { + ns := s.namespace(namespace) + ns.mu.Lock() + defer ns.mu.Unlock() + for _, o := range ops { + applyOp(ns.state, o) + } + ns.pending = append(ns.pending, ops...) + return nil +} + +func (s *S3Backend) Query(_ context.Context, namespace string, q ReadOp, target any) error { + ns := s.namespace(namespace) + ns.mu.RLock() + defer ns.mu.RUnlock() + result := queryState(ns.state, q) + return jsonUnmarshalInto(result, target) +} + +func (s *S3Backend) Flush(ctx context.Context, namespace string) error { + return s.namespace(namespace).doSync(ctx) +} + +func (s *S3Backend) Close(_ context.Context) error { + s.cancel() + s.mu.Lock() + defer s.mu.Unlock() + for _, ns := range s.ns { + <-ns.done + } + return nil +} + +// S3 object key helpers + func (s *S3Backend) stateKey(namespace string) string { return s.prefix + "/" + namespace + ".json" } func (s *S3Backend) lockKey(namespace string) string { return s.prefix + "/" + namespace + ".lock" } -func (s *S3Backend) Load(ctx context.Context, namespace string) (json.RawMessage, string, error) { +// S3 load/store/lock/unlock + +func (s *S3Backend) load(ctx context.Context, namespace string) (json.RawMessage, string, error) { obj, err := s.client.GetObject(ctx, s.bucket, s.stateKey(namespace), minio.GetObjectOptions{}) if err != nil { return nil, "", errors.Wrap(err, "get object") @@ -87,10 +157,8 @@ func (s *S3Backend) Load(ctx context.Context, namespace string) (json.RawMessage return buf.Bytes(), info.ETag, nil } -func (s *S3Backend) Store(ctx context.Context, namespace string, data json.RawMessage, token string) error { - opts := minio.PutObjectOptions{ - ContentType: "application/json", - } +func (s *S3Backend) store(ctx context.Context, namespace string, data json.RawMessage, token string) error { + opts := minio.PutObjectOptions{ContentType: "application/json"} if token != "" { opts.SetMatchETag(token) } else { @@ -101,17 +169,16 @@ func (s *S3Backend) Store(ctx context.Context, namespace string, data json.RawMe bytes.NewReader(data), int64(len(data)), opts) if err != nil { if isPreconditionFailed(err) { - return ErrInvalidToken + return errInvalidToken } return errors.Wrap(err, "put object") } return nil } -func (s *S3Backend) Lock(ctx context.Context, namespace string) error { +func (s *S3Backend) lockNamespace(ctx context.Context, namespace string) error { key := s.lockKey(namespace) for { - // Try to acquire by writing a lock object with If-None-Match: * opts := minio.PutObjectOptions{ UserMetadata: map[string]string{ "Expires-At": time.Now().Add(s.lockTTL).Format(time.RFC3339), @@ -129,7 +196,6 @@ func (s *S3Backend) Lock(ctx context.Context, namespace string) error { return errors.Wrap(err, "acquire lock") } - // Lock exists — check if it's stale and remove it. if err := s.tryExpireStaleLock(ctx, key); err != nil { s.logger.WarnContext(ctx, "stale lock check failed", "key", key, "error", err) } else { @@ -144,12 +210,10 @@ func (s *S3Backend) Lock(ctx context.Context, namespace string) error { } } -func (s *S3Backend) Unlock(ctx context.Context, namespace string) error { - err := s.client.RemoveObject(ctx, s.bucket, s.lockKey(namespace), minio.RemoveObjectOptions{}) - if err != nil { - return errors.Wrap(err, "remove lock") - } - return nil +func (s *S3Backend) unlockNamespace(ctx context.Context, namespace string) error { + return errors.Wrap( + s.client.RemoveObject(ctx, s.bucket, s.lockKey(namespace), minio.RemoveObjectOptions{}), + "remove lock") } func (s *S3Backend) tryExpireStaleLock(ctx context.Context, key string) error { @@ -164,12 +228,126 @@ func (s *S3Backend) tryExpireStaleLock(ctx context.Context, key string) error { if time.Now().Before(expiresAt) { return errors.New("lock not expired") } - if err := s.client.RemoveObject(ctx, s.bucket, key, minio.RemoveObjectOptions{}); err != nil { - return errors.Wrap(err, "remove stale lock") + return errors.Wrap( + s.client.RemoveObject(ctx, s.bucket, key, minio.RemoveObjectOptions{}), + "remove stale lock") +} + +// Per-namespace sync machinery + +type s3Namespace struct { + backend *S3Backend + name string + mu sync.RWMutex + state map[string]any + pending []Op + syncMu sync.Mutex + done chan struct{} +} + +const maxTokenRetries = 3 + +func (n *s3Namespace) doSync(ctx context.Context) error { + n.syncMu.Lock() + defer n.syncMu.Unlock() + + n.mu.Lock() + pending := n.pending + n.pending = nil + n.mu.Unlock() + + if len(pending) > 0 { + if err := n.backend.lockNamespace(ctx, n.name); err != nil { + n.restorePending(pending) + return errors.Wrap(err, "lock namespace") + } + defer func() { + if err := n.backend.unlockNamespace(ctx, n.name); err != nil { + n.backend.logger.WarnContext(ctx, "unlock failed", "namespace", n.name, "error", err) + } + }() } + + remote, err := n.loadReplayStore(ctx, pending) + if err != nil { + n.restorePending(pending) + return err + } + + n.mu.Lock() + n.state = remote + for _, o := range n.pending { + applyOp(n.state, o) + } + n.mu.Unlock() + return nil } +func (n *s3Namespace) loadReplayStore(ctx context.Context, pending []Op) (map[string]any, error) { + for range maxTokenRetries { + remote, err := n.tryLoadReplayStore(ctx, pending) + if errors.Is(err, errInvalidToken) { + continue + } + return remote, err + } + return nil, errors.New("max token retries exceeded") +} + +func (n *s3Namespace) tryLoadReplayStore(ctx context.Context, pending []Op) (map[string]any, error) { + data, token, err := n.backend.load(ctx, n.name) + if err != nil { + return nil, errors.Wrap(err, "load namespace") + } + + remote := make(map[string]any) + if data != nil { + if err := json.Unmarshal(data, &remote); err != nil { + return nil, errors.Wrap(err, "unmarshal state") + } + } + + for _, o := range pending { + applyOp(remote, o) + } + + if len(pending) > 0 { + merged, err := json.Marshal(remote) + if err != nil { + return nil, errors.Wrap(err, "marshal state") + } + if err := n.backend.store(ctx, n.name, merged, token); err != nil { + return nil, errors.Wrap(err, "store namespace") + } + } + + return remote, nil +} + +func (n *s3Namespace) restorePending(ops []Op) { + n.mu.Lock() + defer n.mu.Unlock() + n.pending = append(ops, n.pending...) +} + +func (n *s3Namespace) syncLoop() { + defer close(n.done) + logger := n.backend.logger.With("namespace", n.name) + ticker := time.NewTicker(n.backend.syncInterval) + defer ticker.Stop() + for { + select { + case <-n.backend.ctx.Done(): + return + case <-ticker.C: + if err := n.doSync(n.backend.ctx); err != nil { + logger.WarnContext(n.backend.ctx, "sync failed", "error", err) + } + } + } +} + func isNotFound(err error) bool { var resp minio.ErrorResponse return errors.As(err, &resp) && resp.StatusCode == http.StatusNotFound diff --git a/internal/metadatadb/s3_test.go b/internal/metadatadb/s3_test.go index 4244f2c..34ea82d 100644 --- a/internal/metadatadb/s3_test.go +++ b/internal/metadatadb/s3_test.go @@ -14,16 +14,21 @@ import ( func TestS3Backend(t *testing.T) { bucket := s3clienttest.Start(t) - metadatadbtest.Suite(t, func(t *testing.T) metadatadb.Backend { + metadatadbtest.Suite(t, func(t *testing.T, n int) []metadatadb.Backend { t.Helper() - b, err := metadatadb.NewS3Backend(t.Context(), metadatadb.S3BackendConfig{ - Client: s3clienttest.Client(t), - Bucket: bucket, - Prefix: "_meta-" + t.Name(), - LockTTL: 5 * time.Second, - }) - assert.NoError(t, err) - return b + backends := make([]metadatadb.Backend, n) + for i := range backends { + b, err := metadatadb.NewS3Backend(t.Context(), metadatadb.S3BackendConfig{ + Client: s3clienttest.Client(t), + Bucket: bucket, + Prefix: "_meta-" + t.Name(), + LockTTL: 5 * time.Second, + SyncInterval: time.Hour, + }) + assert.NoError(t, err) + backends[i] = b + } + return backends }) } @@ -31,10 +36,11 @@ func TestS3BackendSoak(t *testing.T) { bucket := s3clienttest.Start(t) b, err := metadatadb.NewS3Backend(t.Context(), metadatadb.S3BackendConfig{ - Client: s3clienttest.Client(t), - Bucket: bucket, - Prefix: "_meta-soak", - LockTTL: 5 * time.Second, + Client: s3clienttest.Client(t), + Bucket: bucket, + Prefix: "_meta-soak", + LockTTL: 5 * time.Second, + SyncInterval: time.Hour, }) assert.NoError(t, err)