From 8b5a8460a3b039f4b8adbe542540b20413f8f52d Mon Sep 17 00:00:00 2001 From: Francesco Cosentino Date: Sat, 23 Aug 2025 11:55:01 +0300 Subject: [PATCH 1/7] feat(backend): add hinted handoff, parallel reads, and gossip protocol to distributed cache - Add hinted handoff mechanism to queue writes for temporarily unavailable replicas * Queue hints with TTL and replay them when nodes recover * Add configuration options for hint TTL, replay interval, and max hints per node * Include metrics for queued, replayed, expired, and dropped hints - Implement parallel reads for improved performance * Enable concurrent fan-out to replica nodes for quorum/all consistency * Early termination when quorum is satisfied * Configurable via WithDistParallelReads option - Add simple gossip protocol for membership information sharing * Periodic random peer selection for gossip exchange * Automatic node state synchronization based on incarnation numbers * Configurable gossip interval - Enhance replica failure handling in replicateTo method - Add comprehensive metrics tracking for new distributed features - Update cspell configuration to include nosec directive This significantly improves the resilience and performance of the distributed cache system by handling temporary node failures gracefully and enabling more efficient read operations. --- cspell.config.yaml | 1 + pkg/backend/dist_memory.go | 344 ++++++++++++++++++++++++++++++++++++- 2 files changed, 342 insertions(+), 3 deletions(-) diff --git a/cspell.config.yaml b/cspell.config.yaml index 63dd475..b221d12 100644 --- a/cspell.config.yaml +++ b/cspell.config.yaml @@ -61,6 +61,7 @@ words: - Newf - nolint - nonamedreturns + - nosec - NOVENDOR - paralleltest - Pipeliner diff --git a/pkg/backend/dist_memory.go b/pkg/backend/dist_memory.go index f56a30a..dc25f7b 100644 --- a/pkg/backend/dist_memory.go +++ b/pkg/backend/dist_memory.go @@ -2,8 +2,11 @@ package backend import ( "context" + "crypto/rand" "errors" "hash/fnv" + "math/big" + "sync" "sync/atomic" "time" @@ -48,6 +51,27 @@ type DistMemory struct { readConsistency ConsistencyLevel writeConsistency ConsistencyLevel versionCounter uint64 // global monotonic for this node (lamport-like) + + // hinted handoff + hintTTL time.Duration + hintReplayInt time.Duration + hintMaxPerNode int + hintsMu sync.Mutex + hints map[string][]hintedEntry // nodeID -> queue + hintStopCh chan struct{} + + // parallel reads + parallelReads bool + + // simple gossip + gossipInterval time.Duration + gossipStopCh chan struct{} +} + +// hintedEntry represents a deferred replica write. +type hintedEntry struct { + item *cache.Item + expire time.Time } // ConsistencyLevel defines read/write consistency semantics. @@ -143,6 +167,35 @@ func WithDistVirtualNodes(n int) DistMemoryOption { } } +// WithDistHintTTL sets TTL for hinted handoff entries. +func WithDistHintTTL(d time.Duration) DistMemoryOption { + return func(dm *DistMemory) { dm.hintTTL = d } +} + +// WithDistHintReplayInterval sets how often to attempt replay of hints. +func WithDistHintReplayInterval(d time.Duration) DistMemoryOption { + return func(dm *DistMemory) { dm.hintReplayInt = d } +} + +// WithDistHintMaxPerNode caps number of queued hints per target node. +func WithDistHintMaxPerNode(n int) DistMemoryOption { + return func(dm *DistMemory) { + if n > 0 { + dm.hintMaxPerNode = n + } + } +} + +// WithDistParallelReads enables parallel quorum/all read fan-out. +func WithDistParallelReads(enable bool) DistMemoryOption { + return func(dm *DistMemory) { dm.parallelReads = enable } +} + +// WithDistGossipInterval enables simple membership gossip at provided interval. +func WithDistGossipInterval(d time.Duration) DistMemoryOption { + return func(dm *DistMemory) { dm.gossipInterval = d } +} + // WithDistNode identity (id optional; derived from address if empty). Address used for future RPC. func WithDistNode(id, address string) DistMemoryOption { return func(dm *DistMemory) { @@ -177,6 +230,8 @@ func NewDistMemory(ctx context.Context, opts ...DistMemoryOption) (IBackend[Dist dm.initMembershipIfNeeded() dm.tryStartHTTP(ctx) dm.startHeartbeatIfEnabled(ctx) + dm.startHintReplayIfEnabled(ctx) + dm.startGossipIfEnabled() return dm, nil } @@ -227,6 +282,10 @@ func (dm *DistMemory) Get(ctx context.Context, key string) (*cache.Item, bool) { return dm.getOne(ctx, key, owners) } + if dm.parallelReads { + return dm.getWithConsistencyParallel(ctx, key, owners) + } + return dm.getWithConsistency(ctx, key, owners) } @@ -451,6 +510,10 @@ type distMetrics struct { versionConflicts int64 // times a newer version (or tie-broken origin) replaced previous candidate versionTieBreaks int64 // subset of conflicts decided by origin tie-break readPrimaryPromote int64 // times read path skipped unreachable primary and promoted next owner + hintedQueued int64 // hints queued + hintedReplayed int64 // hints successfully replayed + hintedExpired int64 // hints expired before delivery + hintedDropped int64 // hints dropped due to non-not-found transport errors } // DistMetrics snapshot. @@ -468,6 +531,10 @@ type DistMetrics struct { VersionConflicts int64 VersionTieBreaks int64 ReadPrimaryPromote int64 + HintedQueued int64 + HintedReplayed int64 + HintedExpired int64 + HintedDropped int64 } // Metrics returns a snapshot of distributed metrics. @@ -486,6 +553,10 @@ func (dm *DistMemory) Metrics() DistMetrics { VersionConflicts: atomic.LoadInt64(&dm.metrics.versionConflicts), VersionTieBreaks: atomic.LoadInt64(&dm.metrics.versionTieBreaks), ReadPrimaryPromote: atomic.LoadInt64(&dm.metrics.readPrimaryPromote), + HintedQueued: atomic.LoadInt64(&dm.metrics.hintedQueued), + HintedReplayed: atomic.LoadInt64(&dm.metrics.hintedReplayed), + HintedExpired: atomic.LoadInt64(&dm.metrics.hintedExpired), + HintedDropped: atomic.LoadInt64(&dm.metrics.hintedDropped), } } @@ -495,6 +566,14 @@ func (dm *DistMemory) Stop(ctx context.Context) error { //nolint:ireturn close(dm.stopCh) } + if dm.hintStopCh != nil { + close(dm.hintStopCh) + } + + if dm.gossipStopCh != nil { + close(dm.gossipStopCh) + } + if dm.httpServer != nil { err := dm.httpServer.stop(ctx) // best-effort if err != nil { @@ -730,20 +809,279 @@ func (dm *DistMemory) fetchOwner(ctx context.Context, key string, idx int, oid c // replicateTo sends writes to replicas (best-effort) returning ack count. func (dm *DistMemory) replicateTo(ctx context.Context, item *cache.Item, replicas []cluster.NodeID) int { //nolint:ireturn acks := 0 - for _, oid := range replicas { if oid == dm.localNode.ID { continue } - if dm.transport != nil && dm.transport.ForwardSet(ctx, string(oid), item, false) == nil { - acks++ + if dm.transport != nil { + err := dm.transport.ForwardSet(ctx, string(oid), item, false) + if err == nil { + acks++ + + continue + } + + if errors.Is(err, sentinel.ErrBackendNotFound) { // queue hint for unreachable replica + dm.queueHint(string(oid), item) + } } } return acks } +// getWithConsistencyParallel performs parallel owner fan-out until quorum/all reached. +func (dm *DistMemory) getWithConsistencyParallel(ctx context.Context, key string, owners []cluster.NodeID) (*cache.Item, bool) { //nolint:ireturn + needed := dm.requiredAcks(len(owners), dm.readConsistency) + + type res struct { + it *cache.Item + ok bool + } + + ch := make(chan res, len(owners)) + + ctxFetch, cancel := context.WithCancel(ctx) + defer cancel() + + for idx, oid := range owners { // launch all + go func() { + it, ok := dm.fetchOwner(ctxFetch, key, idx, oid) + ch <- res{it: it, ok: ok} + }() + } + + acks := 0 + + var chosen *cache.Item + for range owners { + r := <-ch + if r.ok { + chosen = dm.chooseNewer(chosen, r.it) + + acks++ + if acks >= needed && chosen != nil { // early satisfied + cancel() + + break + } + } + } + + if acks < needed || chosen == nil { + return nil, false + } + + dm.repairReplicas(ctx, key, chosen, owners) + + return chosen, true +} + +// --- Hinted handoff implementation ---. +func (dm *DistMemory) queueHint(nodeID string, item *cache.Item) { + if dm.hintTTL <= 0 { // disabled + return + } + + dm.hintsMu.Lock() + + if dm.hints == nil { + dm.hints = make(map[string][]hintedEntry) + } + + queueHints := dm.hints[nodeID] + if dm.hintMaxPerNode > 0 && len(queueHints) >= dm.hintMaxPerNode { // drop oldest + queueHints = queueHints[1:] + } + + cloned := *item + + queueHints = append(queueHints, hintedEntry{item: &cloned, expire: time.Now().Add(dm.hintTTL)}) + dm.hints[nodeID] = queueHints + dm.hintsMu.Unlock() + atomic.AddInt64(&dm.metrics.hintedQueued, 1) +} + +func (dm *DistMemory) startHintReplayIfEnabled(ctx context.Context) { + if dm.hintReplayInt <= 0 || dm.hintTTL <= 0 { + return + } + + dm.hintStopCh = make(chan struct{}) + go dm.hintReplayLoop(ctx) +} + +func (dm *DistMemory) hintReplayLoop(ctx context.Context) { //nolint:ireturn + ticker := time.NewTicker(dm.hintReplayInt) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + dm.replayHints(ctx) + case <-dm.hintStopCh: + return + case <-ctx.Done(): + return + } + } +} + +func (dm *DistMemory) replayHints(ctx context.Context) { //nolint:ireturn + if dm.transport == nil { + return + } + + now := time.Now() + + dm.hintsMu.Lock() + + for nodeID, q := range dm.hints { + out := q[:0] + for _, hint := range q { + if now.After(hint.expire) { // expired + atomic.AddInt64(&dm.metrics.hintedExpired, 1) + + continue + } + + err := dm.transport.ForwardSet(ctx, nodeID, hint.item, false) // best-effort + if err == nil { // success + atomic.AddInt64(&dm.metrics.hintedReplayed, 1) + + continue + } + + if errors.Is(err, sentinel.ErrBackendNotFound) { // keep + out = append(out, hint) + + continue + } + + atomic.AddInt64(&dm.metrics.hintedDropped, 1) + } + + if len(out) == 0 { + delete(dm.hints, nodeID) + } else { + dm.hints[nodeID] = out + } + } + + dm.hintsMu.Unlock() +} + +// --- Simple gossip (in-process only) ---. +func (dm *DistMemory) startGossipIfEnabled() { //nolint:ireturn + if dm.gossipInterval <= 0 { + return + } + + dm.gossipStopCh = make(chan struct{}) + go dm.gossipLoop() +} + +func (dm *DistMemory) gossipLoop() { //nolint:ireturn + ticker := time.NewTicker(dm.gossipInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + dm.runGossipTick() + case <-dm.gossipStopCh: + return + } + } +} + +func (dm *DistMemory) runGossipTick() { //nolint:ireturn + if dm.membership == nil || dm.transport == nil { + return + } + + peers := dm.membership.List() + if len(peers) <= 1 { + return + } + + var candidates []*cluster.Node + for _, n := range peers { + if n.ID != dm.localNode.ID { + candidates = append(candidates, n) + } + } + + if len(candidates) == 0 { + return + } + + // secure random selection (not strictly required but avoids G404 warning) + n := len(candidates) + + idxBig, err := rand.Int(rand.Reader, big.NewInt(int64(n))) // #nosec G404 - cryptographic randomness acceptable here + if err != nil { + return + } + + target := candidates[idxBig.Int64()] + + ip, ok := dm.transport.(*InProcessTransport) + if !ok { + return + } + + remote, ok2 := ip.backends[string(target.ID)] + if !ok2 { + return + } + + snapshot := dm.membership.List() + remote.acceptGossip(snapshot) +} + +func (dm *DistMemory) acceptGossip(nodes []*cluster.Node) { //nolint:ireturn + if dm.membership == nil { + return + } + + for _, node := range nodes { + if node.ID == dm.localNode.ID { + continue + } + + existing := false + for _, cur := range dm.membership.List() { + if cur.ID == node.ID { + existing = true + + if node.Incarnation > cur.Incarnation { + dm.membership.Upsert(&cluster.Node{ + ID: node.ID, + Address: node.Address, + State: node.State, + Incarnation: node.Incarnation, + LastSeen: time.Now(), + }) + } + + break + } + } + + if !existing { + dm.membership.Upsert(&cluster.Node{ + ID: node.ID, + Address: node.Address, + State: node.State, + Incarnation: node.Incarnation, + LastSeen: time.Now(), + }) + } + } +} + // chooseNewer picks the item with higher version; on version tie uses lexicographically smaller Origin as winner. func (dm *DistMemory) chooseNewer(itemA, itemB *cache.Item) *cache.Item { //nolint:ireturn if itemA == nil { From 1b0250ebbcf5e675b4eb2761a7866f645514cf0d Mon Sep 17 00:00:00 2001 From: Francesco Cosentino Date: Sat, 23 Aug 2025 14:37:27 +0300 Subject: [PATCH 2/7] feat: implement Merkle tree anti-entropy synchronization Add comprehensive Merkle tree-based anti-entropy mechanism for distributed cache synchronization: - Implement BuildMerkleTree() to create hash trees from cache data with configurable chunk sizes - Add SyncWith() method for comparing local/remote trees and pulling newer versions - Extend DistTransport interface with FetchMerkle() for remote tree retrieval - Add /internal/merkle HTTP endpoint for tree access over network - Include merkle sync metrics (operations count and keys pulled) - Add comprehensive test coverage for sync convergence scenarios - Support both in-process and HTTP transports (HTTP fetch merkle marked as unsupported) This enables efficient detection and repair of data inconsistencies between distributed cache nodes by comparing compact tree representations rather than full data sets. --- cspell.config.yaml | 3 + pkg/backend/dist_http_server.go | 13 + pkg/backend/dist_http_transport.go | 10 + pkg/backend/dist_memory.go | 291 ++++++++++++++++++ .../hypercache_distmemory_integration_test.go | 1 + ...cache_distmemory_remove_readrepair_test.go | 2 + .../hypercache_distmemory_versioning_test.go | 2 + tests/merkle_sync_test.go | 82 +++++ 8 files changed, 404 insertions(+) create mode 100644 tests/merkle_sync_test.go diff --git a/cspell.config.yaml b/cspell.config.yaml index b221d12..1a45473 100644 --- a/cspell.config.yaml +++ b/cspell.config.yaml @@ -33,6 +33,7 @@ words: - Fprintln - freqs - funlen + - gerr - gitversion - GITVERSION - goccy @@ -49,11 +50,13 @@ words: - ints - ireturn - Itemm + - keyf - lamport - LFUDA - localmodule - logrus - memprofile + - Merkle - Mgmt - msgpack - mvdan diff --git a/pkg/backend/dist_http_server.go b/pkg/backend/dist_http_server.go index ecfdf48..bfeb6ad 100644 --- a/pkg/backend/dist_http_server.go +++ b/pkg/backend/dist_http_server.go @@ -40,6 +40,7 @@ func (s *distHTTPServer) start(ctx context.Context, dm *DistMemory) error { //no s.registerGet(ctx, dm) s.registerRemove(ctx, dm) s.registerHealth() + s.registerMerkle(ctx, dm) return s.listen(ctx) } @@ -112,6 +113,18 @@ func (s *distHTTPServer) registerHealth() { //nolint:ireturn s.app.Get("/health", func(fctx fiber.Ctx) error { return fctx.SendString("ok") }) } +func (s *distHTTPServer) registerMerkle(_ context.Context, dm *DistMemory) { //nolint:ireturn + s.app.Get("/internal/merkle", func(fctx fiber.Ctx) error { + tree := dm.BuildMerkleTree() + + return fctx.JSON(fiber.Map{ + "root": tree.Root, + "leaf_hashes": tree.LeafHashes, + "chunk_size": tree.ChunkSize, + }) + }) +} + func (s *distHTTPServer) listen(ctx context.Context) error { //nolint:ireturn lc := net.ListenConfig{} diff --git a/pkg/backend/dist_http_transport.go b/pkg/backend/dist_http_transport.go index 2f9d0da..fa58112 100644 --- a/pkg/backend/dist_http_transport.go +++ b/pkg/backend/dist_http_transport.go @@ -258,3 +258,13 @@ func (t *DistHTTPTransport) Health(ctx context.Context, nodeID string) error { / return nil } + +// FetchMerkle currently unsupported over HTTP transport (would require new endpoint). +func (t *DistHTTPTransport) FetchMerkle(_ context.Context, _ string) (*MerkleTree, error) { //nolint:ireturn + // reference receiver (t) for future extension and to satisfy linters about unused receiver + if t == nil { + return nil, errNoTransport + } + + return nil, ewrap.New("fetch merkle not implemented for http transport") +} diff --git a/pkg/backend/dist_memory.go b/pkg/backend/dist_memory.go index dc25f7b..46c3694 100644 --- a/pkg/backend/dist_memory.go +++ b/pkg/backend/dist_memory.go @@ -3,9 +3,12 @@ package backend import ( "context" "crypto/rand" + "crypto/sha256" "errors" + "hash" "hash/fnv" "math/big" + "sort" "sync" "sync/atomic" "time" @@ -21,6 +24,16 @@ const ( listPrealloc = 32 // pre-allocation size for List results ) +// Merkle internal tuning constants & errors. +const ( + defaultMerkleChunkSize = 128 + merklePreallocEntries = 256 + merkleVersionBytes = 8 + shiftPerByte = 8 // bit shift per byte when encoding uint64 +) + +var errNoTransport = errors.New("no transport") + // DistMemory is a sharded in-process distributed-like backend. It simulates // distribution by consistent hashing across a fixed set of in-memory shards. // It is intended for single-process multi-shard experimentation; NOT cross-process. @@ -66,6 +79,9 @@ type DistMemory struct { // simple gossip gossipInterval time.Duration gossipStopCh chan struct{} + + // anti-entropy + merkleChunkSize int // number of keys per leaf chunk (power-of-two recommended) } // hintedEntry represents a deferred replica write. @@ -118,6 +134,161 @@ func WithDistShardCount(n int) DistMemoryOption { } } +// WithDistMerkleChunkSize sets the number of keys per leaf hash chunk (default 128 if 0). +func WithDistMerkleChunkSize(n int) DistMemoryOption { + return func(dm *DistMemory) { + if n > 0 { + dm.merkleChunkSize = n + } + } +} + +// --- Merkle tree anti-entropy structures --- + +// MerkleTree represents a binary hash tree over key/version pairs. +type MerkleTree struct { // minimal representation + LeafHashes [][]byte // ordered leaf hashes + Root []byte + ChunkSize int +} + +// BuildMerkleTree constructs a Merkle tree snapshot of local data (best-effort, locks each shard briefly). +func (dm *DistMemory) BuildMerkleTree() *MerkleTree { //nolint:ireturn + chunkSize := dm.merkleChunkSize + if chunkSize <= 0 { + chunkSize = defaultMerkleChunkSize + } + + entries := dm.merkleEntries() + if len(entries) == 0 { + return &MerkleTree{ChunkSize: chunkSize} + } + + sort.Slice(entries, func(i, j int) bool { return entries[i].k < entries[j].k }) + + hasher := sha256.New() + buf := make([]byte, merkleVersionBytes) + leaves := make([][]byte, 0, (len(entries)+chunkSize-1)/chunkSize) + + for i := 0; i < len(entries); i += chunkSize { + end := i + chunkSize + if end > len(entries) { + end = len(entries) + } + + hasher.Reset() + + for _, e := range entries[i:end] { + _, _ = hasher.Write([]byte(e.k)) + encodeUint64BigEndian(buf, e.v) + + _, _ = hasher.Write(buf) + } + + leaves = append(leaves, append([]byte(nil), hasher.Sum(nil)...)) + } + + root := foldMerkle(leaves, hasher) + + return &MerkleTree{LeafHashes: leaves, Root: append([]byte(nil), root...), ChunkSize: chunkSize} +} + +// merkleKV is an internal pair used during tree construction & sync. +type merkleKV struct { + k string + v uint64 +} + +// DiffLeafRanges compares two trees and returns indexes of differing leaf chunks. +func (mt *MerkleTree) DiffLeafRanges(other *MerkleTree) []int { //nolint:ireturn + if mt == nil || other == nil { + return nil + } + + if len(mt.LeafHashes) != len(other.LeafHashes) { // size mismatch -> full resync + idxs := make([]int, len(mt.LeafHashes)) + for i := range idxs { + idxs[i] = i + } + + return idxs + } + + var diffs []int + for i := range mt.LeafHashes { + if !equalBytes(mt.LeafHashes[i], other.LeafHashes[i]) { + diffs = append(diffs, i) + } + } + + return diffs +} + +func equalBytes(a, b []byte) bool { // tiny helper + if len(a) != len(b) { + return false + } + + for i := range a { + if a[i] != b[i] { + return false + } + } + + return true +} + +// SyncWith performs Merkle anti-entropy against a remote node (pull newer versions for differing chunks). +func (dm *DistMemory) SyncWith(ctx context.Context, nodeID string) error { //nolint:ireturn + if dm.transport == nil { + return errNoTransport + } + + remoteTree, err := dm.transport.FetchMerkle(ctx, nodeID) + if err != nil { + return err + } + + localTree := dm.BuildMerkleTree() + + diffs := localTree.DiffLeafRanges(remoteTree) + if len(diffs) == 0 { // already consistent + return nil + } + + // Collect and sort local entries for chunk boundary mapping. + entries := dm.merkleEntries() + sort.Slice(entries, func(i, j int) bool { return entries[i].k < entries[j].k }) + + // Enumerate remote keys for missing detection when using in-process transport. + missingKeys := dm.enumerateRemoteOnlyKeys(nodeID, entries) + + chunkSize := localTree.ChunkSize + for _, ci := range diffs { + start := ci * chunkSize + if start >= len(entries) { // diff chunk beyond local entries (only missing keys handled later) + continue + } + + end := start + chunkSize + if end > len(entries) { + end = len(entries) + } + + for _, e := range entries[start:end] { + dm.fetchAndAdopt(ctx, nodeID, e.k) + } + } + // fetch keys that exist only remotely + for k := range missingKeys { + dm.fetchAndAdopt(ctx, nodeID, k) + } + + atomic.AddInt64(&dm.metrics.merkleSyncs, 1) + + return nil +} + // WithDistCapacity sets logical capacity (not strictly enforced yet). func WithDistCapacity(capacity int) DistMemoryOption { return func(dm *DistMemory) { dm.capacity = capacity } @@ -139,6 +310,9 @@ func WithDistTransport(t DistTransport) DistMemoryOption { return func(dm *DistMemory) { dm.transport = t } } +// SetTransport sets the transport post-construction (testing helper). +func (dm *DistMemory) SetTransport(t DistTransport) { dm.transport = t } + // WithDistHeartbeat configures heartbeat interval and suspect/dead thresholds. // If interval <= 0 heartbeat is disabled. func WithDistHeartbeat(interval, suspectAfter, deadAfter time.Duration) DistMemoryOption { @@ -427,6 +601,7 @@ type DistTransport interface { ForwardGet(ctx context.Context, nodeID string, key string) (*cache.Item, bool, error) ForwardRemove(ctx context.Context, nodeID string, key string, replicate bool) error Health(ctx context.Context, nodeID string) error + FetchMerkle(ctx context.Context, nodeID string) (*MerkleTree, error) } // InProcessTransport implements DistTransport for multiple DistMemory instances in the same process. @@ -495,6 +670,16 @@ func (t *InProcessTransport) Health(_ context.Context, nodeID string) error { // return nil } +// FetchMerkle returns a snapshot Merkle tree of the target backend. +func (t *InProcessTransport) FetchMerkle(_ context.Context, nodeID string) (*MerkleTree, error) { //nolint:ireturn + b, ok := t.backends[nodeID] + if !ok { + return nil, sentinel.ErrBackendNotFound + } + + return b.BuildMerkleTree(), nil +} + // distMetrics holds internal counters (best-effort, not atomic snapshot consistent). type distMetrics struct { forwardGet int64 @@ -514,6 +699,8 @@ type distMetrics struct { hintedReplayed int64 // hints successfully replayed hintedExpired int64 // hints expired before delivery hintedDropped int64 // hints dropped due to non-not-found transport errors + merkleSyncs int64 // merkle sync operations completed + merkleKeysPulled int64 // keys applied during sync } // DistMetrics snapshot. @@ -535,6 +722,8 @@ type DistMetrics struct { HintedReplayed int64 HintedExpired int64 HintedDropped int64 + MerkleSyncs int64 + MerkleKeysPulled int64 } // Metrics returns a snapshot of distributed metrics. @@ -557,6 +746,8 @@ func (dm *DistMemory) Metrics() DistMetrics { HintedReplayed: atomic.LoadInt64(&dm.metrics.hintedReplayed), HintedExpired: atomic.LoadInt64(&dm.metrics.hintedExpired), HintedDropped: atomic.LoadInt64(&dm.metrics.hintedDropped), + MerkleSyncs: atomic.LoadInt64(&dm.metrics.merkleSyncs), + MerkleKeysPulled: atomic.LoadInt64(&dm.metrics.merkleKeysPulled), } } @@ -584,6 +775,106 @@ func (dm *DistMemory) Stop(ctx context.Context) error { //nolint:ireturn return nil } +// enumerateRemoteOnlyKeys returns keys present only on the remote side (best-effort, in-process only). +func (dm *DistMemory) enumerateRemoteOnlyKeys(nodeID string, local []merkleKV) map[string]struct{} { //nolint:ireturn + missing := make(map[string]struct{}) + + ip, ok := dm.transport.(*InProcessTransport) + if !ok { + return missing + } + + remote, ok := ip.backends[nodeID] + if !ok { + return missing + } + + for _, shard := range remote.shards { + if shard == nil { + continue + } + + rch := shard.items.IterBuffered() + for t := range rch { + missing[t.Key] = struct{}{} + } + } + + for _, e := range local { // remove any that we already have + delete(missing, e.k) + } + + return missing +} + +// fetchAndAdopt pulls a key from a remote node and adopts it if it's newer or absent locally. +func (dm *DistMemory) fetchAndAdopt(ctx context.Context, nodeID, key string) { + it, ok, gerr := dm.transport.ForwardGet(ctx, nodeID, key) + if gerr != nil || !ok { + return + } + + if cur, okLocal := dm.shardFor(key).items.Get(key); !okLocal || it.Version > cur.Version { + dm.applySet(ctx, it, false) + atomic.AddInt64(&dm.metrics.merkleKeysPulled, 1) + } +} + +// merkleEntries gathers key/version pairs from all shards. +func (dm *DistMemory) merkleEntries() []merkleKV { + entries := make([]merkleKV, 0, merklePreallocEntries) + + for _, shard := range dm.shards { + if shard == nil { + continue + } + + ch := shard.items.IterBuffered() + for t := range ch { + entries = append(entries, merkleKV{k: t.Key, v: t.Val.Version}) + } + } + + return entries +} + +func encodeUint64BigEndian(buf []byte, v uint64) { + for i := merkleVersionBytes - 1; i >= 0; i-- { // big endian for deterministic hashing + buf[i] = byte(v) + + v >>= shiftPerByte + } +} + +// foldMerkle reduces leaf hashes into a single root using a binary tree. +func foldMerkle(leaves [][]byte, hasher hash.Hash) []byte { //nolint:ireturn + if len(leaves) == 0 { + return nil + } + + level := leaves + for len(level) > 1 { + next := make([][]byte, 0, (len(level)+1)/2) + for i := 0; i < len(level); i += 2 { + if i+1 == len(level) { // odd node promoted + next = append(next, append([]byte(nil), level[i]...)) + + break + } + + hasher.Reset() + + _, _ = hasher.Write(level[i]) + _, _ = hasher.Write(level[i+1]) + next = append(next, append([]byte(nil), hasher.Sum(nil)...)) + } + + level = next + } + + return level[0] +} + // ensureShardConfig initializes shards respecting configured shardCount. func (dm *DistMemory) ensureShardConfig() { //nolint:ireturn if dm.shardCount <= 0 { diff --git a/tests/hypercache_distmemory_integration_test.go b/tests/hypercache_distmemory_integration_test.go index 54e594c..abf5730 100644 --- a/tests/hypercache_distmemory_integration_test.go +++ b/tests/hypercache_distmemory_integration_test.go @@ -55,6 +55,7 @@ func TestDistMemoryForwardingReplication(t *testing.T) { } item := &cachev2.Item{Key: k, Value: k} + err := item.Valid() if err != nil { t.Fatalf("item valid %s: %v", k, err) diff --git a/tests/hypercache_distmemory_remove_readrepair_test.go b/tests/hypercache_distmemory_remove_readrepair_test.go index b536834..e998b84 100644 --- a/tests/hypercache_distmemory_remove_readrepair_test.go +++ b/tests/hypercache_distmemory_remove_readrepair_test.go @@ -47,6 +47,7 @@ func TestDistMemoryRemoveReplication(t *testing.T) { } item := &cachev2.Item{Key: key, Value: "val"} + err := item.Valid() if err != nil { t.Fatalf("valid: %v", err) @@ -104,6 +105,7 @@ func TestDistMemoryReadRepair(t *testing.T) { } item := &cachev2.Item{Key: key, Value: "val"} + err := item.Valid() if err != nil { t.Fatalf("valid: %v", err) diff --git a/tests/hypercache_distmemory_versioning_test.go b/tests/hypercache_distmemory_versioning_test.go index eca4875..49f08f2 100644 --- a/tests/hypercache_distmemory_versioning_test.go +++ b/tests/hypercache_distmemory_versioning_test.go @@ -65,6 +65,7 @@ func TestDistMemoryVersioningQuorum(t *testing.T) { //nolint:paralleltest // Write key via primary. item1 := &cachev2.Item{Key: key, Value: "v1"} + err := b1.Set(context.Background(), item1) if err != nil { t.Fatalf("initial set: %v", err) @@ -94,6 +95,7 @@ func TestDistMemoryVersioningQuorum(t *testing.T) { //nolint:paralleltest transport.Unregister(string(n3.ID)) item2 := &cachev2.Item{Key: key, Value: "v2"} + err = b1.Set(context.Background(), item2) if err != nil && !errors.Is(err, sentinel.ErrQuorumFailed) { t.Fatalf("unexpected error after replica loss: %v", err) diff --git a/tests/merkle_sync_test.go b/tests/merkle_sync_test.go new file mode 100644 index 0000000..71020d1 --- /dev/null +++ b/tests/merkle_sync_test.go @@ -0,0 +1,82 @@ +package tests + +import ( + "context" + "testing" + "time" + + backend "github.com/hyp3rd/hypercache/pkg/backend" + cache "github.com/hyp3rd/hypercache/pkg/cache/v2" +) + +// TestMerkleSyncConvergence ensures SyncWith pulls newer keys from remote. +func TestMerkleSyncConvergence(t *testing.T) { + ctx := context.Background() + transport := backend.NewInProcessTransport() + + bA, err := backend.NewDistMemory(ctx, + backend.WithDistNode("A", "127.0.0.1:9101"), + backend.WithDistReplication(1), + backend.WithDistMerkleChunkSize(2), + ) + if err != nil { + t.Fatalf("new dist memory A: %v", err) + } + + dmA := any(bA).(*backend.DistMemory) + + bB, err := backend.NewDistMemory(ctx, + backend.WithDistNode("B", "127.0.0.1:9102"), + backend.WithDistReplication(1), + backend.WithDistMerkleChunkSize(2), + ) + if err != nil { + t.Fatalf("new dist memory B: %v", err) + } + + dmB := any(bB).(*backend.DistMemory) + + dmA.SetTransport(transport) + dmB.SetTransport(transport) + + // register for in-process lookups + transport.Register(dmA) + transport.Register(dmB) + + // inject divergent data (A has extra/newer) + for i := range 5 { + it := &cache.Item{Key: keyf("k", i), Value: []byte("vA"), Version: uint64(i + 1), Origin: "A", LastUpdated: time.Now()} + dmA.DebugInject(it) + } + + // B shares only first 2 keys older versions + for i := range 2 { + it := &cache.Item{Key: keyf("k", i), Value: []byte("old"), Version: uint64(i), Origin: "B", LastUpdated: time.Now()} + dmB.DebugInject(it) + } + + // Run sync B->A to pull newer + if err := dmB.SyncWith(ctx, string(dmA.LocalNodeID())); err != nil { + // HTTP transport fetch merkle unsupported; we rely on in-process + if testing.Verbose() { + t.Logf("sync error: %v", err) + } + } + + // Validate B now has all 5 keys with correct versions (>= A's) + for i := range 5 { + k := keyf("k", i) + itA, _ := dmA.Get(ctx, k) + + itB, _ := dmB.Get(ctx, k) + if itA == nil || itB == nil { + t.Fatalf("missing key %s after sync", k) + } + + if itB.Version < itA.Version { + t.Fatalf("expected B version >= A version for %s", k) + } + } +} + +func keyf(prefix string, i int) string { return prefix + ":" + string(rune('a'+i)) } From 9455492c1a81a7554147bf01717a8997fe6ce898 Mon Sep 17 00:00:00 2001 From: Francesco Cosentino Date: Sat, 23 Aug 2025 17:03:53 +0300 Subject: [PATCH 3/7] feat: implement HTTP transport Merkle sync and auto-sync functionality - Add FetchMerkle() and ListKeys() methods to HTTP transport - Implement periodic auto-sync with configurable intervals and peer limits - Add /internal/keys endpoint for key enumeration - Refactor sync logic into modular helper methods - Add comprehensive HTTP Merkle sync test coverage - Enhance distributed metrics with auto-sync tracking - Clean up HTTP transport code and remove redundant comments This enables full anti-entropy synchronization over HTTP transport and provides automatic background sync capabilities for distributed cache consistency. --- pkg/backend/dist_http_server.go | 17 +++ pkg/backend/dist_http_transport.go | 149 ++++++++++++------ pkg/backend/dist_memory.go | 220 +++++++++++++++++++++++---- tests/hypercache_http_merkle_test.go | 88 +++++++++++ 4 files changed, 398 insertions(+), 76 deletions(-) create mode 100644 tests/hypercache_http_merkle_test.go diff --git a/pkg/backend/dist_http_server.go b/pkg/backend/dist_http_server.go index bfeb6ad..e6bd1b6 100644 --- a/pkg/backend/dist_http_server.go +++ b/pkg/backend/dist_http_server.go @@ -123,6 +123,23 @@ func (s *distHTTPServer) registerMerkle(_ context.Context, dm *DistMemory) { //n "chunk_size": tree.ChunkSize, }) }) + + // naive keys listing for anti-entropy (testing only). Not efficient for large datasets. + s.app.Get("/internal/keys", func(fctx fiber.Ctx) error { + var keys []string + for _, shard := range dm.shards { + if shard == nil { + continue + } + + ch := shard.items.IterBuffered() + for t := range ch { + keys = append(keys, t.Key) + } + } + + return fctx.JSON(fiber.Map{"keys": keys}) + }) } func (s *distHTTPServer) listen(ctx context.Context) error { //nolint:ireturn diff --git a/pkg/backend/dist_http_transport.go b/pkg/backend/dist_http_transport.go index fa58112..4e748dd 100644 --- a/pkg/backend/dist_http_transport.go +++ b/pkg/backend/dist_http_transport.go @@ -1,4 +1,3 @@ -// Package backend provides backend implementations including a distributed HTTP transport. package backend import ( @@ -18,41 +17,36 @@ import ( ) // DistHTTPTransport implements DistTransport over HTTP JSON. -type DistHTTPTransport struct { // minimal MVP +type DistHTTPTransport struct { client *http.Client - baseURLFn func(nodeID string) (string, bool) // resolves nodeID -> base URL (scheme+host) + baseURLFn func(nodeID string) (string, bool) } -// internal status code threshold for error classification. const statusThreshold = 300 -// NewDistHTTPTransport creates a new HTTP transport. -func NewDistHTTPTransport(timeout time.Duration, resolver func(string) (string, bool)) *DistHTTPTransport { +// NewDistHTTPTransport constructs a DistHTTPTransport with the given timeout and +// nodeID->baseURL resolver. Timeout <=0 defaults to 2s. +func NewDistHTTPTransport(timeout time.Duration, resolver func(string) (string, bool)) *DistHTTPTransport { //nolint:ireturn if timeout <= 0 { timeout = 2 * time.Second } - return &DistHTTPTransport{ - client: &http.Client{Timeout: timeout}, - baseURLFn: resolver, - } + return &DistHTTPTransport{client: &http.Client{Timeout: timeout}, baseURLFn: resolver} } -// request/response DTOs moved to dist_http_types.go - const ( errMsgNewRequest = "new request" errMsgDoRequest = "do request" ) -// ForwardSet forwards a set (or replication) request to a remote node. +// ForwardSet sends a Set/Replicate request to a remote node. func (t *DistHTTPTransport) ForwardSet(ctx context.Context, nodeID string, item *cache.Item, replicate bool) error { //nolint:ireturn base, ok := t.baseURLFn(nodeID) if !ok { return sentinel.ErrBackendNotFound } - reqBody := httpSetRequest{ // split for line length + reqBody := httpSetRequest{ Key: item.Key, Value: item.Value, Expiration: item.Expiration.Milliseconds(), @@ -66,9 +60,7 @@ func (t *DistHTTPTransport) ForwardSet(ctx context.Context, nodeID string, item return ewrap.Wrap(err, "marshal set request") } - url := base + "/internal/cache/set" - - hreq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(payloadBytes)) + hreq, err := http.NewRequestWithContext(ctx, http.MethodPost, base+"/internal/cache/set", bytes.NewReader(payloadBytes)) if err != nil { return ewrap.Wrap(err, errMsgNewRequest) } @@ -80,15 +72,12 @@ func (t *DistHTTPTransport) ForwardSet(ctx context.Context, nodeID string, item return ewrap.Wrap(err, errMsgDoRequest) } - defer func() { - _ = resp.Body.Close() //nolint:errcheck // best-effort - }() + defer func() { _ = resp.Body.Close() }() //nolint:errcheck if resp.StatusCode == http.StatusNotFound { return sentinel.ErrBackendNotFound } - const statusThreshold = 300 // local redeclare for linter clarity if resp.StatusCode >= statusThreshold { body, rerr := io.ReadAll(resp.Body) if rerr != nil { @@ -101,18 +90,14 @@ func (t *DistHTTPTransport) ForwardSet(ctx context.Context, nodeID string, item return nil } -// type httpGetResponse formerly used for direct decoding; replaced by map-based decoding to satisfy linters. - -// ForwardGet fetches an item from a remote node. -func (t *DistHTTPTransport) ForwardGet(ctx context.Context, nodeID string, key string) (*cache.Item, bool, error) { //nolint:ireturn +// ForwardGet fetches a single item from a remote node. +func (t *DistHTTPTransport) ForwardGet(ctx context.Context, nodeID, key string) (*cache.Item, bool, error) { //nolint:ireturn base, ok := t.baseURLFn(nodeID) if !ok { return nil, false, sentinel.ErrBackendNotFound } - url := fmt.Sprintf("%s/internal/cache/get?key=%s", base, key) - - hreq, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + hreq, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/internal/cache/get?key=%s", base, key), nil) if err != nil { return nil, false, ewrap.Wrap(err, errMsgNewRequest) } @@ -122,7 +107,7 @@ func (t *DistHTTPTransport) ForwardGet(ctx context.Context, nodeID string, key s return nil, false, ewrap.Wrap(err, errMsgDoRequest) } - defer func() { _ = resp.Body.Close() }() //nolint:errcheck // best-effort + defer func() { _ = resp.Body.Close() }() //nolint:errcheck if resp.StatusCode == http.StatusNotFound { return nil, false, sentinel.ErrBackendNotFound @@ -144,7 +129,6 @@ func (t *DistHTTPTransport) ForwardGet(ctx context.Context, nodeID string, key s return item, true, nil } -// decodeGetBody decodes a get response body into an item. func decodeGetBody(r io.Reader) (*cache.Item, bool, error) { //nolint:ireturn var raw map[string]json.RawMessage @@ -168,7 +152,6 @@ func decodeGetBody(r io.Reader) (*cache.Item, bool, error) { //nolint:ireturn } if ib, ok := raw["item"]; ok && len(ib) > 0 { - // define minimal mirror struct to ensure json tags present (satisfy musttag) var mirror struct { Key string `json:"key"` Value json.RawMessage `json:"value"` @@ -181,8 +164,8 @@ func decodeGetBody(r io.Reader) (*cache.Item, bool, error) { //nolint:ireturn if err != nil { return nil, false, ewrap.Wrap(err, "unmarshal mirror") } - // reconstruct cache.Item (we ignore expiration formatting difference vs ms) - return &cache.Item{ // multi-line for readability + + return &cache.Item{ Key: mirror.Key, Value: mirror.Value, Expiration: time.Duration(mirror.Expiration) * time.Millisecond, @@ -195,16 +178,14 @@ func decodeGetBody(r io.Reader) (*cache.Item, bool, error) { //nolint:ireturn return &cache.Item{}, true, nil } -// ForwardRemove forwards a remove operation. -func (t *DistHTTPTransport) ForwardRemove(ctx context.Context, nodeID string, key string, replicate bool) error { //nolint:ireturn +// ForwardRemove propagates a delete operation to a remote node. +func (t *DistHTTPTransport) ForwardRemove(ctx context.Context, nodeID, key string, replicate bool) error { //nolint:ireturn base, ok := t.baseURLFn(nodeID) if !ok { return sentinel.ErrBackendNotFound } - url := fmt.Sprintf("%s/internal/cache/remove?key=%s&replicate=%t", base, key, replicate) - - hreq, err := http.NewRequestWithContext(ctx, http.MethodDelete, url, nil) + hreq, err := http.NewRequestWithContext(ctx, http.MethodDelete, fmt.Sprintf("%s/internal/cache/remove?key=%s&replicate=%t", base, key, replicate), nil) if err != nil { return ewrap.Wrap(err, errMsgNewRequest) } @@ -214,7 +195,7 @@ func (t *DistHTTPTransport) ForwardRemove(ctx context.Context, nodeID string, ke return ewrap.Wrap(err, errMsgDoRequest) } - defer func() { _ = resp.Body.Close() }() //nolint:errcheck // best-effort + defer func() { _ = resp.Body.Close() }() //nolint:errcheck if resp.StatusCode == http.StatusNotFound { return sentinel.ErrBackendNotFound @@ -227,16 +208,14 @@ func (t *DistHTTPTransport) ForwardRemove(ctx context.Context, nodeID string, ke return nil } -// Health probes a remote node health endpoint. +// Health performs a health probe against a remote node. func (t *DistHTTPTransport) Health(ctx context.Context, nodeID string) error { //nolint:ireturn base, ok := t.baseURLFn(nodeID) if !ok { return sentinel.ErrBackendNotFound } - url := base + "/health" - - hreq, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + hreq, err := http.NewRequestWithContext(ctx, http.MethodGet, base+"/health", nil) if err != nil { return ewrap.Wrap(err, errMsgNewRequest) } @@ -246,7 +225,7 @@ func (t *DistHTTPTransport) Health(ctx context.Context, nodeID string) error { / return ewrap.Wrap(err, errMsgDoRequest) } - defer func() { _ = resp.Body.Close() }() //nolint:errcheck // best-effort + defer func() { _ = resp.Body.Close() }() //nolint:errcheck if resp.StatusCode == http.StatusNotFound { return sentinel.ErrBackendNotFound @@ -259,12 +238,86 @@ func (t *DistHTTPTransport) Health(ctx context.Context, nodeID string) error { / return nil } -// FetchMerkle currently unsupported over HTTP transport (would require new endpoint). -func (t *DistHTTPTransport) FetchMerkle(_ context.Context, _ string) (*MerkleTree, error) { //nolint:ireturn - // reference receiver (t) for future extension and to satisfy linters about unused receiver +// FetchMerkle retrieves a Merkle tree snapshot from a remote node. +func (t *DistHTTPTransport) FetchMerkle(ctx context.Context, nodeID string) (*MerkleTree, error) { //nolint:ireturn if t == nil { return nil, errNoTransport } - return nil, ewrap.New("fetch merkle not implemented for http transport") + base, ok := t.baseURLFn(nodeID) + if !ok { + return nil, sentinel.ErrBackendNotFound + } + + hreq, err := http.NewRequestWithContext(ctx, http.MethodGet, base+"/internal/merkle", nil) + if err != nil { + return nil, ewrap.Wrap(err, errMsgNewRequest) + } + + resp, err := t.client.Do(hreq) + if err != nil { + return nil, ewrap.Wrap(err, errMsgDoRequest) + } + + defer func() { _ = resp.Body.Close() }() //nolint:errcheck + + if resp.StatusCode == http.StatusNotFound { + return nil, sentinel.ErrBackendNotFound + } + + if resp.StatusCode >= statusThreshold { + return nil, ewrap.Newf("fetch merkle status %d", resp.StatusCode) + } + + var body struct { + Root []byte `json:"root"` + LeafHashes [][]byte `json:"leaf_hashes"` + ChunkSize int `json:"chunk_size"` + } + + dec := json.NewDecoder(resp.Body) + + err = dec.Decode(&body) + if err != nil { + return nil, ewrap.Wrap(err, "decode merkle") + } + + return &MerkleTree{Root: body.Root, LeafHashes: body.LeafHashes, ChunkSize: body.ChunkSize}, nil +} + +// ListKeys returns all keys from a remote node (expensive; used for tests / anti-entropy fallback). +func (t *DistHTTPTransport) ListKeys(ctx context.Context, nodeID string) ([]string, error) { //nolint:ireturn + base, ok := t.baseURLFn(nodeID) + if !ok { + return nil, sentinel.ErrBackendNotFound + } + + hreq, err := http.NewRequestWithContext(ctx, http.MethodGet, base+"/internal/keys", nil) + if err != nil { + return nil, ewrap.Wrap(err, errMsgNewRequest) + } + + resp, err := t.client.Do(hreq) + if err != nil { + return nil, ewrap.Wrap(err, errMsgDoRequest) + } + + defer func() { _ = resp.Body.Close() }() //nolint:errcheck + + if resp.StatusCode >= statusThreshold { + return nil, ewrap.Newf("list keys status %d", resp.StatusCode) + } + + var body struct { + Keys []string `json:"keys"` + } + + dec := json.NewDecoder(resp.Body) + + err = dec.Decode(&body) + if err != nil { + return nil, ewrap.Wrap(err, "decode keys") + } + + return body.Keys, nil } diff --git a/pkg/backend/dist_memory.go b/pkg/backend/dist_memory.go index 46c3694..3fdf4a2 100644 --- a/pkg/backend/dist_memory.go +++ b/pkg/backend/dist_memory.go @@ -82,6 +82,14 @@ type DistMemory struct { // anti-entropy merkleChunkSize int // number of keys per leaf chunk (power-of-two recommended) + + // periodic merkle auto-sync + autoSyncInterval time.Duration + autoSyncStopCh chan struct{} + autoSyncPeersPerInterval int // limit number of peers synced per tick (0=all) + + lastAutoSyncDuration atomic.Int64 // nanoseconds of last full loop + lastAutoSyncError atomic.Value // error string or nil } // hintedEntry represents a deferred replica write. @@ -143,6 +151,20 @@ func WithDistMerkleChunkSize(n int) DistMemoryOption { } } +// WithDistMerkleAutoSync enables periodic anti-entropy sync attempts. If interval <= 0 disables. +func WithDistMerkleAutoSync(interval time.Duration) DistMemoryOption { + return func(dm *DistMemory) { + dm.autoSyncInterval = interval + } +} + +// WithDistMerkleAutoSyncPeers limits number of peers synced per interval (0 or <0 = all). +func WithDistMerkleAutoSyncPeers(n int) DistMemoryOption { + return func(dm *DistMemory) { + dm.autoSyncPeersPerInterval = n + } +} + // --- Merkle tree anti-entropy structures --- // MerkleTree represents a binary hash tree over key/version pairs. @@ -250,40 +272,20 @@ func (dm *DistMemory) SyncWith(ctx context.Context, nodeID string) error { //nol } localTree := dm.BuildMerkleTree() - + entries := dm.sortedMerkleEntries() diffs := localTree.DiffLeafRanges(remoteTree) - if len(diffs) == 0 { // already consistent - return nil - } - - // Collect and sort local entries for chunk boundary mapping. - entries := dm.merkleEntries() - sort.Slice(entries, func(i, j int) bool { return entries[i].k < entries[j].k }) - - // Enumerate remote keys for missing detection when using in-process transport. - missingKeys := dm.enumerateRemoteOnlyKeys(nodeID, entries) + missing := dm.resolveMissingKeys(ctx, nodeID, entries) - chunkSize := localTree.ChunkSize - for _, ci := range diffs { - start := ci * chunkSize - if start >= len(entries) { // diff chunk beyond local entries (only missing keys handled later) - continue - } + dm.applyMerkleDiffs(ctx, nodeID, entries, diffs, localTree.ChunkSize) - end := start + chunkSize - if end > len(entries) { - end = len(entries) - } - - for _, e := range entries[start:end] { - dm.fetchAndAdopt(ctx, nodeID, e.k) - } - } - // fetch keys that exist only remotely - for k := range missingKeys { + for k := range missing { dm.fetchAndAdopt(ctx, nodeID, k) } + if len(diffs) == 0 && len(missing) == 0 { + return nil + } + atomic.AddInt64(&dm.metrics.merkleSyncs, 1) return nil @@ -406,6 +408,7 @@ func NewDistMemory(ctx context.Context, opts ...DistMemoryOption) (IBackend[Dist dm.startHeartbeatIfEnabled(ctx) dm.startHintReplayIfEnabled(ctx) dm.startGossipIfEnabled() + dm.startAutoSyncIfEnabled(ctx) return dm, nil } @@ -586,6 +589,23 @@ func (dm *DistMemory) DebugInject(it *cache.Item) { //nolint:ireturn // LocalNodeID returns this instance's node ID (testing helper). func (dm *DistMemory) LocalNodeID() cluster.NodeID { return dm.localNode.ID } +// LocalNodeAddr returns the configured node address (host:port) used by HTTP server. +func (dm *DistMemory) LocalNodeAddr() string { //nolint:ireturn + return dm.nodeAddr +} + +// SetLocalNode manually sets the local node (testing helper before starting HTTP). +func (dm *DistMemory) SetLocalNode(node *cluster.Node) { //nolint:ireturn + dm.localNode = node + if dm.nodeAddr == "" && node != nil { + dm.nodeAddr = node.Address + } + + if dm.membership != nil && node != nil { + dm.membership.Upsert(node) + } +} + // DebugOwners returns current owners slice for a key (for tests). func (dm *DistMemory) DebugOwners(key string) []cluster.NodeID { if dm.ring == nil { @@ -701,6 +721,7 @@ type distMetrics struct { hintedDropped int64 // hints dropped due to non-not-found transport errors merkleSyncs int64 // merkle sync operations completed merkleKeysPulled int64 // keys applied during sync + autoSyncLoops int64 // number of auto-sync ticks executed } // DistMetrics snapshot. @@ -724,10 +745,20 @@ type DistMetrics struct { HintedDropped int64 MerkleSyncs int64 MerkleKeysPulled int64 + AutoSyncLoops int64 + LastAutoSyncNanos int64 + LastAutoSyncError string } // Metrics returns a snapshot of distributed metrics. func (dm *DistMemory) Metrics() DistMetrics { + lastErr := "" + if v := dm.lastAutoSyncError.Load(); v != nil { + if s, ok := v.(string); ok { + lastErr = s + } + } + return DistMetrics{ ForwardGet: atomic.LoadInt64(&dm.metrics.forwardGet), ForwardSet: atomic.LoadInt64(&dm.metrics.forwardSet), @@ -748,6 +779,9 @@ func (dm *DistMemory) Metrics() DistMetrics { HintedDropped: atomic.LoadInt64(&dm.metrics.hintedDropped), MerkleSyncs: atomic.LoadInt64(&dm.metrics.merkleSyncs), MerkleKeysPulled: atomic.LoadInt64(&dm.metrics.merkleKeysPulled), + AutoSyncLoops: atomic.LoadInt64(&dm.metrics.autoSyncLoops), + LastAutoSyncNanos: dm.lastAutoSyncDuration.Load(), + LastAutoSyncError: lastErr, } } @@ -765,6 +799,10 @@ func (dm *DistMemory) Stop(ctx context.Context) error { //nolint:ireturn close(dm.gossipStopCh) } + if dm.autoSyncStopCh != nil { + close(dm.autoSyncStopCh) + } + if dm.httpServer != nil { err := dm.httpServer.stop(ctx) // best-effort if err != nil { @@ -775,6 +813,64 @@ func (dm *DistMemory) Stop(ctx context.Context) error { //nolint:ireturn return nil } +// --- Sync helper methods (placed after exported methods to satisfy ordering linter) --- + +// sortedMerkleEntries returns merkle entries sorted by key. +func (dm *DistMemory) sortedMerkleEntries() []merkleKV { //nolint:ireturn + entries := dm.merkleEntries() + sort.Slice(entries, func(i, j int) bool { return entries[i].k < entries[j].k }) + + return entries +} + +// resolveMissingKeys enumerates remote-only keys using in-process or HTTP listing. +func (dm *DistMemory) resolveMissingKeys(ctx context.Context, nodeID string, entries []merkleKV) map[string]struct{} { //nolint:ireturn + missing := dm.enumerateRemoteOnlyKeys(nodeID, entries) + if len(missing) != 0 { + return missing + } + + httpT, ok := dm.transport.(*DistHTTPTransport) + if !ok { + return missing + } + + keys, kerr := httpT.ListKeys(ctx, nodeID) + if kerr != nil || len(keys) == 0 { + return missing + } + + mset := make(map[string]struct{}, len(keys)) + for _, k := range keys { // populate + mset[k] = struct{}{} + } + + for _, e := range entries { // remove existing + delete(mset, e.k) + } + + return mset +} + +// applyMerkleDiffs fetches and adopts keys for differing Merkle chunks. +func (dm *DistMemory) applyMerkleDiffs(ctx context.Context, nodeID string, entries []merkleKV, diffs []int, chunkSize int) { //nolint:ireturn + for _, ci := range diffs { + start := ci * chunkSize + if start >= len(entries) { + continue + } + + end := start + chunkSize + if end > len(entries) { + end = len(entries) + } + + for _, e := range entries[start:end] { + dm.fetchAndAdopt(ctx, nodeID, e.k) + } + } +} + // enumerateRemoteOnlyKeys returns keys present only on the remote side (best-effort, in-process only). func (dm *DistMemory) enumerateRemoteOnlyKeys(nodeID string, local []merkleKV) map[string]struct{} { //nolint:ireturn missing := make(map[string]struct{}) @@ -1273,6 +1369,74 @@ func (dm *DistMemory) startGossipIfEnabled() { //nolint:ireturn go dm.gossipLoop() } +// startAutoSyncIfEnabled launches periodic merkle syncs to all other members. +func (dm *DistMemory) startAutoSyncIfEnabled(ctx context.Context) { //nolint:ireturn + if dm.autoSyncInterval <= 0 || dm.membership == nil { + return + } + + if dm.autoSyncStopCh != nil { // already running + return + } + + dm.autoSyncStopCh = make(chan struct{}) + + interval := dm.autoSyncInterval + go dm.autoSyncLoop(ctx, interval) +} + +func (dm *DistMemory) autoSyncLoop(ctx context.Context, interval time.Duration) { //nolint:ireturn + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-dm.autoSyncStopCh: + return + case <-ticker.C: + dm.runAutoSyncTick(ctx) + } + } +} + +// runAutoSyncTick performs one auto-sync cycle; separated for lower complexity. +func (dm *DistMemory) runAutoSyncTick(ctx context.Context) { //nolint:ireturn + start := time.Now() + + var lastErr error + + members := dm.membership.List() + limit := dm.autoSyncPeersPerInterval + synced := 0 + + for _, member := range members { + if member == nil || string(member.ID) == dm.nodeID { // skip self + continue + } + + if limit > 0 && synced >= limit { + break + } + + err := dm.SyncWith(ctx, string(member.ID)) + if err != nil { // capture last error only + lastErr = err + } + + synced++ + } + + dm.lastAutoSyncDuration.Store(time.Since(start).Nanoseconds()) + + if lastErr != nil { + dm.lastAutoSyncError.Store(lastErr.Error()) + } else { + dm.lastAutoSyncError.Store("") + } + + atomic.AddInt64(&dm.metrics.autoSyncLoops, 1) +} + func (dm *DistMemory) gossipLoop() { //nolint:ireturn ticker := time.NewTicker(dm.gossipInterval) defer ticker.Stop() diff --git a/tests/hypercache_http_merkle_test.go b/tests/hypercache_http_merkle_test.go new file mode 100644 index 0000000..b16950b --- /dev/null +++ b/tests/hypercache_http_merkle_test.go @@ -0,0 +1,88 @@ +package tests + +import ( + "context" + "net/http" + "testing" + "time" + + "github.com/hyp3rd/hypercache/internal/cluster" + "github.com/hyp3rd/hypercache/pkg/backend" + cachev2 "github.com/hyp3rd/hypercache/pkg/cache/v2" +) + +// TestHTTPFetchMerkle ensures HTTP transport can fetch a remote Merkle tree and SyncWith works over HTTP. +func TestHTTPFetchMerkle(t *testing.T) { + ctx := context.Background() + + // shared ring/membership + ring := cluster.NewRing(cluster.WithReplication(1)) + membership := cluster.NewMembership(ring) + + // create two nodes with HTTP server enabled (addresses) + n1 := cluster.NewNode("", "127.0.0.1:9201") + b1i, err := backend.NewDistMemory(ctx, + backend.WithDistMembership(membership, n1), + backend.WithDistNode("n1", "127.0.0.1:9201"), + backend.WithDistMerkleChunkSize(2), + ) + if err != nil { + t.Fatalf("b1: %v", err) + } + n2 := cluster.NewNode("", "127.0.0.1:9202") + b2i, err := backend.NewDistMemory(ctx, + backend.WithDistMembership(membership, n2), + backend.WithDistNode("n2", "127.0.0.1:9202"), + backend.WithDistMerkleChunkSize(2), + ) + if err != nil { + t.Fatalf("b2: %v", err) + } + + b1 := b1i.(*backend.DistMemory) //nolint:forcetypeassert + b2 := b2i.(*backend.DistMemory) //nolint:forcetypeassert + + // HTTP transport resolver maps node IDs to http base URLs. + resolver := func(id string) (string, bool) { + switch id { // node IDs same as provided + case "n1": + return "http://" + b1.LocalNodeAddr(), true + case "n2": + return "http://" + b2.LocalNodeAddr(), true + } + return "", false + } + transport := backend.NewDistHTTPTransport(2*time.Second, resolver) + b1.SetTransport(transport) + b2.SetTransport(transport) + + // ensure membership has both before writes (already upserted in constructors) + // write some keys to b1 only + for i := range 5 { // direct inject to sidestep replication/forwarding complexity + item := &cachev2.Item{Key: httpKey(i), Value: []byte("v"), Version: uint64(i + 1), Origin: "n1", LastUpdated: time.Now()} + b1.DebugInject(item) + } + // ensure HTTP merkle endpoint reachable + resp, err := http.Get("http://" + b1.LocalNodeAddr() + "/internal/merkle") + if err != nil { + t.Fatalf("merkle http get: %v", err) + } + _ = resp.Body.Close() + if resp.StatusCode != 200 { + t.Fatalf("unexpected status %d", resp.StatusCode) + } + + // b2 sync from b1 via HTTP transport + if err := b2.SyncWith(ctx, "n1"); err != nil { + t.Fatalf("sync: %v", err) + } + + // validate keys present on b2 + for i := range 5 { + if _, ok := b2.Get(ctx, httpKey(i)); !ok { + t.Fatalf("missing key %d post-sync", i) + } + } +} + +func httpKey(i int) string { return "hkey:" + string(rune('a'+i)) } From 02fb8afef1229575ce9e0c81cd1c1eab0ae3040d Mon Sep 17 00:00:00 2001 From: Francesco Cosentino Date: Sat, 23 Aug 2025 18:09:02 +0300 Subject: [PATCH 4/7] feat: implement Merkle tree anti-entropy with tombstone-based delete semantics - Add Merkle tree synchronization with timing metrics (build, diff, fetch durations) - Implement tombstone versioning to prevent key resurrection during anti-entropy - Add new HTTP endpoints for Merkle tree inspection (/internal/merkle, /internal/keys) - Introduce configuration options for Merkle chunk size, auto-sync, and key enumeration caps - Enhance delete operations with versioned tombstones to maintain consistency - Add comprehensive test suite for Merkle sync edge cases (empty trees, no-diff, single missing keys, tombstone preservation) - Update documentation with new distributed memory capabilities and configuration options This enables robust distributed consistency by preventing stale data resurrection and providing efficient anti-entropy synchronization between cache nodes. --- .github/instructions/instructions.md | 16 ++++ README.md | 8 ++ pkg/backend/dist_memory.go | 115 ++++++++++++++++++++++-- tests/hypercache_http_merkle_test.go | 7 +- tests/merkle_delete_tombstone_test.go | 62 +++++++++++++ tests/merkle_empty_tree_test.go | 31 +++++++ tests/merkle_no_diff_test.go | 41 +++++++++ tests/merkle_single_missing_key_test.go | 42 +++++++++ 8 files changed, 314 insertions(+), 8 deletions(-) create mode 100644 .github/instructions/instructions.md create mode 100644 tests/merkle_delete_tombstone_test.go create mode 100644 tests/merkle_empty_tree_test.go create mode 100644 tests/merkle_no_diff_test.go create mode 100644 tests/merkle_single_missing_key_test.go diff --git a/.github/instructions/instructions.md b/.github/instructions/instructions.md new file mode 100644 index 0000000..5d24fb1 --- /dev/null +++ b/.github/instructions/instructions.md @@ -0,0 +1,16 @@ +--- +applyTo: '**' +--- + +# Basic instructions + +Always run `golagci-lint` and staticcheck after completing a Golang task. +Verify the quality of the code you provide, including repetitions, flaws, and ways to modernise the approach to ensure consistency. Adopt a development method and stick consistently to it. +Parse the Makefile in the project, and you will find the commands you need to lint, polish, and test the code. + +Always document the solutions we find, and where applicable, use the ./docs folder for extensive documentation. + +## Toolset + +- [Makefile](../../Makefile). +- Always run: `make lint` diff --git a/README.md b/README.md index 9d3899d..d477fb0 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,8 @@ Endpoints (subject to change): - GET /config – sanitized runtime config (now includes replication + virtual node settings when using DistMemory) - GET /dist/metrics – distributed backend forwarding / replication counters (DistMemory only) - GET /dist/owners?key=K – current ring owners (IDs) for key K (DistMemory only, debug) +- GET /internal/merkle – Merkle tree snapshot (DistMemory experimental anti-entropy) +- GET /internal/keys – Full key enumeration (debug / anti-entropy fallback; expensive) - GET /cluster/members – membership snapshot (id, address, state, incarnation, replication factor, virtual nodes) - GET /cluster/ring – ring vnode hashes (debug / diagnostics) - POST /evict – trigger eviction cycle @@ -181,6 +183,10 @@ if err != nil { | `WithManagementHTTP` | Start optional management HTTP server. | | `WithDistReplication` | (DistMemory) Set replication factor (owners per key). | | `WithDistVirtualNodes` | (DistMemory) Virtual nodes per physical node for consistent hashing. | +| `WithDistMerkleChunkSize` | (DistMemory) Keys per Merkle leaf chunk (power-of-two recommended). | +| `WithDistMerkleAutoSync` | (DistMemory) Interval for background Merkle sync (<=0 disables). | +| `WithDistMerkleAutoSyncPeers` | (DistMemory) Limit peers synced per auto-sync tick (0=all). | +| `WithDistListKeysCap` | (DistMemory) Cap number of keys fetched via fallback enumeration. | | `WithDistNode` | (DistMemory) Explicit node identity (id/address). | | `WithDistSeeds` | (DistMemory) Static seed addresses to pre-populate membership. | @@ -204,6 +210,8 @@ Current capabilities: - Ownership enforcement (non‑owners forward to primary). - Replica fan‑out on writes (best‑effort) & replica removals. - Read‑repair when a local owner misses but another replica has the key. +- Basic delete semantics with tombstones: deletions propagate as versioned tombstones preventing + resurrection during anti-entropy (tombstone retention is in‑memory, no persistence yet). - Metrics exposed via management endpoints (`/dist/metrics`, `/dist/owners`, `/cluster/members`, `/cluster/ring`). Planned next steps (roadmap excerpts): network transport abstraction, quorum reads/writes, versioning (vector clocks or lamport), failure detection / node states, rebalancing & anti‑entropy sync. diff --git a/pkg/backend/dist_memory.go b/pkg/backend/dist_memory.go index 3fdf4a2..d8161f2 100644 --- a/pkg/backend/dist_memory.go +++ b/pkg/backend/dist_memory.go @@ -82,6 +82,7 @@ type DistMemory struct { // anti-entropy merkleChunkSize int // number of keys per leaf chunk (power-of-two recommended) + listKeysMax int // cap for fallback ListKeys pulls (0 = unlimited) // periodic merkle auto-sync autoSyncInterval time.Duration @@ -90,6 +91,9 @@ type DistMemory struct { lastAutoSyncDuration atomic.Int64 // nanoseconds of last full loop lastAutoSyncError atomic.Value // error string or nil + + // tombstone version source when no prior item exists (monotonic per process) + tombVersionCounter atomic.Uint64 } // hintedEntry represents a deferred replica write. @@ -98,6 +102,13 @@ type hintedEntry struct { expire time.Time } +// tombstone marks a delete intent with version ordering to prevent resurrection. +type tombstone struct { + version uint64 + origin string + at time.Time +} + // ConsistencyLevel defines read/write consistency semantics. type ConsistencyLevel int @@ -128,6 +139,7 @@ func (dm *DistMemory) Ring() *cluster.Ring { return dm.ring } type distShard struct { items cache.ConcurrentMap + tombs map[string]tombstone // per-key tombstones } // DistMemoryOption configures DistMemory backend. @@ -165,6 +177,15 @@ func WithDistMerkleAutoSyncPeers(n int) DistMemoryOption { } } +// WithDistListKeysCap caps number of keys fetched via fallback ListKeys (0 = unlimited). +func WithDistListKeysCap(n int) DistMemoryOption { + return func(dm *DistMemory) { + if n >= 0 { + dm.listKeysMax = n + } + } +} + // --- Merkle tree anti-entropy structures --- // MerkleTree represents a binary hash tree over key/version pairs. @@ -266,19 +287,32 @@ func (dm *DistMemory) SyncWith(ctx context.Context, nodeID string) error { //nol return errNoTransport } + startFetch := time.Now() + remoteTree, err := dm.transport.FetchMerkle(ctx, nodeID) if err != nil { return err } + fetchDur := time.Since(startFetch) + atomic.StoreInt64(&dm.metrics.merkleFetchNanos, fetchDur.Nanoseconds()) + + startBuild := time.Now() localTree := dm.BuildMerkleTree() + buildDur := time.Since(startBuild) + atomic.StoreInt64(&dm.metrics.merkleBuildNanos, buildDur.Nanoseconds()) + entries := dm.sortedMerkleEntries() + startDiff := time.Now() diffs := localTree.DiffLeafRanges(remoteTree) + diffDur := time.Since(startDiff) + atomic.StoreInt64(&dm.metrics.merkleDiffNanos, diffDur.Nanoseconds()) + missing := dm.resolveMissingKeys(ctx, nodeID, entries) dm.applyMerkleDiffs(ctx, nodeID, entries, diffs, localTree.ChunkSize) - for k := range missing { + for k := range missing { // missing = remote-only keys dm.fetchAndAdopt(ctx, nodeID, k) } @@ -583,7 +617,10 @@ func (dm *DistMemory) DebugInject(it *cache.Item) { //nolint:ireturn return } - dm.shardFor(it.Key).items.Set(it.Key, it) + sh := dm.shardFor(it.Key) + // test helper: injecting a concrete item implies intent to resurrect; clear any tombstone so normal version comparison applies + delete(sh.tombs, it.Key) + sh.items.Set(it.Key, it) } // LocalNodeID returns this instance's node ID (testing helper). @@ -721,6 +758,9 @@ type distMetrics struct { hintedDropped int64 // hints dropped due to non-not-found transport errors merkleSyncs int64 // merkle sync operations completed merkleKeysPulled int64 // keys applied during sync + merkleBuildNanos int64 // last build duration (ns) + merkleDiffNanos int64 // last diff duration (ns) + merkleFetchNanos int64 // last remote fetch duration (ns) autoSyncLoops int64 // number of auto-sync ticks executed } @@ -745,6 +785,9 @@ type DistMetrics struct { HintedDropped int64 MerkleSyncs int64 MerkleKeysPulled int64 + MerkleBuildNanos int64 + MerkleDiffNanos int64 + MerkleFetchNanos int64 AutoSyncLoops int64 LastAutoSyncNanos int64 LastAutoSyncError string @@ -779,6 +822,9 @@ func (dm *DistMemory) Metrics() DistMetrics { HintedDropped: atomic.LoadInt64(&dm.metrics.hintedDropped), MerkleSyncs: atomic.LoadInt64(&dm.metrics.merkleSyncs), MerkleKeysPulled: atomic.LoadInt64(&dm.metrics.merkleKeysPulled), + MerkleBuildNanos: atomic.LoadInt64(&dm.metrics.merkleBuildNanos), + MerkleDiffNanos: atomic.LoadInt64(&dm.metrics.merkleDiffNanos), + MerkleFetchNanos: atomic.LoadInt64(&dm.metrics.merkleFetchNanos), AutoSyncLoops: atomic.LoadInt64(&dm.metrics.autoSyncLoops), LastAutoSyncNanos: dm.lastAutoSyncDuration.Load(), LastAutoSyncError: lastErr, @@ -840,6 +886,10 @@ func (dm *DistMemory) resolveMissingKeys(ctx context.Context, nodeID string, ent return missing } + if dm.listKeysMax > 0 && len(keys) > dm.listKeysMax { // cap enforcement + keys = keys[:dm.listKeysMax] + } + mset := make(map[string]struct{}, len(keys)) for _, k := range keys { // populate mset[k] = struct{}{} @@ -849,6 +899,11 @@ func (dm *DistMemory) resolveMissingKeys(ctx context.Context, nodeID string, ent delete(mset, e.k) } + // track number of remote-only keys discovered via fallback + if len(mset) > 0 { + atomic.AddInt64(&dm.metrics.merkleKeysPulled, int64(len(mset))) + } + return mset } @@ -906,11 +961,38 @@ func (dm *DistMemory) enumerateRemoteOnlyKeys(nodeID string, local []merkleKV) m // fetchAndAdopt pulls a key from a remote node and adopts it if it's newer or absent locally. func (dm *DistMemory) fetchAndAdopt(ctx context.Context, nodeID, key string) { it, ok, gerr := dm.transport.ForwardGet(ctx, nodeID, key) - if gerr != nil || !ok { + if gerr != nil { // remote failure: ignore return } - if cur, okLocal := dm.shardFor(key).items.Get(key); !okLocal || it.Version > cur.Version { + sh := dm.shardFor(key) + + if !ok { // remote missing key (could be delete) -> adopt tombstone locally if we still have it + if _, hasTomb := sh.tombs[key]; hasTomb { // already have tombstone + return + } + + if cur, okLocal := sh.items.Get(key); okLocal { // create tombstone advancing version + sh.items.Remove(key) + + nextVer := cur.Version + 1 + + sh.tombs[key] = tombstone{version: nextVer, origin: string(dm.localNode.ID), at: time.Now()} + } + + return + } + + if tomb, hasTomb := sh.tombs[key]; hasTomb { + // If tombstone version newer or equal, do not resurrect + if tomb.version >= it.Version { + return + } + // remote has newer version; clear tombstone (key resurrected intentionally) + delete(sh.tombs, key) + } + + if cur, okLocal := sh.items.Get(key); !okLocal || it.Version > cur.Version { dm.applySet(ctx, it, false) atomic.AddInt64(&dm.metrics.merkleKeysPulled, 1) } @@ -929,6 +1011,10 @@ func (dm *DistMemory) merkleEntries() []merkleKV { for t := range ch { entries = append(entries, merkleKV{k: t.Key, v: t.Val.Version}) } + + for k, ts := range shard.tombs { // include tombstones + entries = append(entries, merkleKV{k: k, v: ts.version}) + } } return entries @@ -978,7 +1064,7 @@ func (dm *DistMemory) ensureShardConfig() { //nolint:ireturn } for range dm.shardCount { - dm.shards = append(dm.shards, &distShard{items: cache.New()}) + dm.shards = append(dm.shards, &distShard{items: cache.New(), tombs: make(map[string]tombstone)}) } } @@ -1760,7 +1846,22 @@ func (dm *DistMemory) applySet(ctx context.Context, item *cache.Item, replicate // applyRemove deletes locally and optionally fan-outs removal to replicas. func (dm *DistMemory) applyRemove(ctx context.Context, key string, replicate bool) { - dm.shardFor(key).items.Remove(key) + sh := dm.shardFor(key) + // capture version from existing item (if any) and increment for tombstone + var nextVer uint64 + if it, ok := sh.items.Get(key); ok && it != nil { + ci := it // already *cache.Item (ConcurrentMap stores *cache.Item) + + nextVer = ci.Version + 1 + } + + if nextVer == 0 { // no prior item seen; allocate monotonic tomb version + nextVer = dm.tombVersionCounter.Add(1) + } + + sh.items.Remove(key) + + sh.tombs[key] = tombstone{version: nextVer, origin: string(dm.localNode.ID), at: time.Now()} if !replicate || dm.ring == nil || dm.transport == nil { return @@ -1778,7 +1879,7 @@ func (dm *DistMemory) applyRemove(ctx context.Context, key string, replicate boo continue } - _ = dm.transport.ForwardRemove(ctx, string(oid), key, false) //nolint:errcheck // best-effort + _ = dm.transport.ForwardRemove(ctx, string(oid), key, false) //nolint:errcheck // best-effort (tombstone inferred remotely) } } diff --git a/tests/hypercache_http_merkle_test.go b/tests/hypercache_http_merkle_test.go index b16950b..a259f3c 100644 --- a/tests/hypercache_http_merkle_test.go +++ b/tests/hypercache_http_merkle_test.go @@ -21,6 +21,7 @@ func TestHTTPFetchMerkle(t *testing.T) { // create two nodes with HTTP server enabled (addresses) n1 := cluster.NewNode("", "127.0.0.1:9201") + b1i, err := backend.NewDistMemory(ctx, backend.WithDistMembership(membership, n1), backend.WithDistNode("n1", "127.0.0.1:9201"), @@ -29,7 +30,9 @@ func TestHTTPFetchMerkle(t *testing.T) { if err != nil { t.Fatalf("b1: %v", err) } + n2 := cluster.NewNode("", "127.0.0.1:9202") + b2i, err := backend.NewDistMemory(ctx, backend.WithDistMembership(membership, n2), backend.WithDistNode("n2", "127.0.0.1:9202"), @@ -50,6 +53,7 @@ func TestHTTPFetchMerkle(t *testing.T) { case "n2": return "http://" + b2.LocalNodeAddr(), true } + return "", false } transport := backend.NewDistHTTPTransport(2*time.Second, resolver) @@ -67,8 +71,9 @@ func TestHTTPFetchMerkle(t *testing.T) { if err != nil { t.Fatalf("merkle http get: %v", err) } + _ = resp.Body.Close() - if resp.StatusCode != 200 { + if resp.StatusCode != http.StatusOK { t.Fatalf("unexpected status %d", resp.StatusCode) } diff --git a/tests/merkle_delete_tombstone_test.go b/tests/merkle_delete_tombstone_test.go new file mode 100644 index 0000000..2934977 --- /dev/null +++ b/tests/merkle_delete_tombstone_test.go @@ -0,0 +1,62 @@ +package tests + +import ( + "context" + "testing" + "time" + + backend "github.com/hyp3rd/hypercache/pkg/backend" + cache "github.com/hyp3rd/hypercache/pkg/cache/v2" +) + +// TestMerkleDeleteTombstone ensures a deleted key does not resurrect via sync. +func TestMerkleDeleteTombstone(t *testing.T) { + ctx := context.Background() + transport := backend.NewInProcessTransport() + + a, _ := backend.NewDistMemory(ctx, backend.WithDistNode("A", "127.0.0.1:9501"), backend.WithDistReplication(1), backend.WithDistMerkleChunkSize(2)) + b, _ := backend.NewDistMemory(ctx, backend.WithDistNode("B", "127.0.0.1:9502"), backend.WithDistReplication(1), backend.WithDistMerkleChunkSize(2)) + + da := any(a).(*backend.DistMemory) + db := any(b).(*backend.DistMemory) + da.SetTransport(transport) + db.SetTransport(transport) + transport.Register(da) + transport.Register(db) + + it := &cache.Item{Key: "del", Value: []byte("v"), Version: 1, Origin: "A", LastUpdated: time.Now()} + da.DebugInject(it) + if err := db.SyncWith(ctx, string(da.LocalNodeID())); err != nil { + t.Fatalf("initial sync: %v", err) + } + + // Now delete on A + _ = da.Remove(ctx, "del") + + // Ensure local A removed + if val, _ := da.Get(ctx, "del"); val != nil { + t.Fatalf("expected A delete") + } + + // Remote (B) pulls from A to learn about deletion (pull-based anti-entropy) + if err := db.SyncWith(ctx, string(da.LocalNodeID())); err != nil { + t.Fatalf("tomb sync pull: %v", err) + } + + // Ensure B removed or will not resurrect on next sync + if val, _ := db.Get(ctx, "del"); val != nil { + t.Fatalf("expected B delete after tombstone") + } + + // Re-add older version on B (simulate stale write) + stale := &cache.Item{Key: "del", Value: []byte("stale"), Version: 1, Origin: "B", LastUpdated: time.Now()} + db.DebugInject(stale) + + // Sync B with A again; B should keep deletion (not resurrect) + if err := db.SyncWith(ctx, string(da.LocalNodeID())); err != nil { + t.Fatalf("resync: %v", err) + } + if val, _ := db.Get(ctx, "del"); val != nil { + t.Fatalf("tombstone failed; key resurrected") + } +} diff --git a/tests/merkle_empty_tree_test.go b/tests/merkle_empty_tree_test.go new file mode 100644 index 0000000..4ce8993 --- /dev/null +++ b/tests/merkle_empty_tree_test.go @@ -0,0 +1,31 @@ +package tests + +import ( + "context" + "testing" + + backend "github.com/hyp3rd/hypercache/pkg/backend" +) + +// TestMerkleEmptyTrees ensures diff between two empty trees is empty and SyncWith is no-op. +func TestMerkleEmptyTrees(t *testing.T) { + ctx := context.Background() + transport := backend.NewInProcessTransport() + + a, _ := backend.NewDistMemory(ctx, backend.WithDistNode("A", "127.0.0.1:9201"), backend.WithDistReplication(1), backend.WithDistMerkleChunkSize(2)) + b, _ := backend.NewDistMemory(ctx, backend.WithDistNode("B", "127.0.0.1:9202"), backend.WithDistReplication(1), backend.WithDistMerkleChunkSize(2)) + + da := any(a).(*backend.DistMemory) + db := any(b).(*backend.DistMemory) + da.SetTransport(transport) + db.SetTransport(transport) + transport.Register(da) + transport.Register(db) + + if err := da.SyncWith(ctx, string(db.LocalNodeID())); err != nil { + t.Fatalf("sync empty: %v", err) + } + if err := db.SyncWith(ctx, string(da.LocalNodeID())); err != nil { + t.Fatalf("sync empty reverse: %v", err) + } +} diff --git a/tests/merkle_no_diff_test.go b/tests/merkle_no_diff_test.go new file mode 100644 index 0000000..ba9a2f7 --- /dev/null +++ b/tests/merkle_no_diff_test.go @@ -0,0 +1,41 @@ +package tests + +import ( + "context" + "testing" + "time" + + backend "github.com/hyp3rd/hypercache/pkg/backend" + cache "github.com/hyp3rd/hypercache/pkg/cache/v2" +) + +// TestMerkleNoDiff ensures SyncWith returns quickly when trees are identical. +func TestMerkleNoDiff(t *testing.T) { + ctx := context.Background() + transport := backend.NewInProcessTransport() + + a, _ := backend.NewDistMemory(ctx, backend.WithDistNode("A", "127.0.0.1:9401"), backend.WithDistReplication(1), backend.WithDistMerkleChunkSize(4)) + b, _ := backend.NewDistMemory(ctx, backend.WithDistNode("B", "127.0.0.1:9402"), backend.WithDistReplication(1), backend.WithDistMerkleChunkSize(4)) + + da := any(a).(*backend.DistMemory) + db := any(b).(*backend.DistMemory) + da.SetTransport(transport) + db.SetTransport(transport) + transport.Register(da) + transport.Register(db) + + // Inject identical data + for i := range 10 { + itA := &cache.Item{Key: keyf("nd", i), Value: []byte("v"), Version: uint64(i + 1), Origin: "A", LastUpdated: time.Now()} + itB := &cache.Item{Key: keyf("nd", i), Value: []byte("v"), Version: uint64(i + 1), Origin: "B", LastUpdated: time.Now()} + da.DebugInject(itA) + db.DebugInject(itB) + } + + if err := da.SyncWith(ctx, string(db.LocalNodeID())); err != nil { + t.Fatalf("sync: %v", err) + } + if err := db.SyncWith(ctx, string(da.LocalNodeID())); err != nil { + t.Fatalf("sync2: %v", err) + } +} diff --git a/tests/merkle_single_missing_key_test.go b/tests/merkle_single_missing_key_test.go new file mode 100644 index 0000000..9d391fb --- /dev/null +++ b/tests/merkle_single_missing_key_test.go @@ -0,0 +1,42 @@ +package tests + +import ( + "context" + "testing" + "time" + + backend "github.com/hyp3rd/hypercache/pkg/backend" + cache "github.com/hyp3rd/hypercache/pkg/cache/v2" +) + +// TestMerkleSingleMissingKey ensures a single remote-only key is detected and pulled. +func TestMerkleSingleMissingKey(t *testing.T) { + ctx := context.Background() + transport := backend.NewInProcessTransport() + + a, _ := backend.NewDistMemory(ctx, backend.WithDistNode("A", "127.0.0.1:9301"), backend.WithDistReplication(1), backend.WithDistMerkleChunkSize(2)) + b, _ := backend.NewDistMemory(ctx, backend.WithDistNode("B", "127.0.0.1:9302"), backend.WithDistReplication(1), backend.WithDistMerkleChunkSize(2)) + + da := any(a).(*backend.DistMemory) + db := any(b).(*backend.DistMemory) + da.SetTransport(transport) + db.SetTransport(transport) + transport.Register(da) + transport.Register(db) + + // Inject one key only into A + it := &cache.Item{Key: "k1", Value: []byte("v1"), Version: 1, Origin: "A", LastUpdated: time.Now()} + da.DebugInject(it) + + if err := db.SyncWith(ctx, string(da.LocalNodeID())); err != nil { + t.Fatalf("sync single: %v", err) + } + + got, _ := db.Get(ctx, "k1") + if got == nil { + t.Fatalf("expected key pulled") + } + if bs, ok := got.Value.([]byte); !ok || string(bs) != "v1" { + t.Fatalf("unexpected value %v", got.Value) + } +} From 3b495fa86c81d5f92db415b6994742ec6609d975 Mon Sep 17 00:00:00 2001 From: Francesco Cosentino Date: Sat, 23 Aug 2025 18:16:23 +0300 Subject: [PATCH 5/7] docs: enhance README with detailed tombstone mechanics and roadmap progress table - Add comprehensive tombstone versioning and anti-resurrection guard details - Document Merkle phase timing metrics and anti-entropy pull counters - Include roadmap progress table showing current implementation status - Expand descriptions of delete semantics and remote sync behavior - Clarify DebugInject tombstone clearing functionality for testing This update provides much clearer documentation for users and contributors about the current state of distributed cache features and deletion handling. --- README.md | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/README.md b/README.md index d477fb0..1c4ca7a 100644 --- a/README.md +++ b/README.md @@ -212,10 +212,36 @@ Current capabilities: - Read‑repair when a local owner misses but another replica has the key. - Basic delete semantics with tombstones: deletions propagate as versioned tombstones preventing resurrection during anti-entropy (tombstone retention is in‑memory, no persistence yet). + - Tombstone versioning uses a per-process monotonic counter when no prior item version exists (avoids time-based unsigned casts). + - Remote pull sync will infer a tombstone when a key present locally is absent remotely and no local tomb exists (anti-resurrection guard). + - DebugInject intentionally clears any existing tombstone for that key (test helper / simulating authoritative resurrection with higher version). + - Planned: configurable tombstone TTL + periodic compaction to reclaim memory; metrics for active tombstones and purges. - Metrics exposed via management endpoints (`/dist/metrics`, `/dist/owners`, `/cluster/members`, `/cluster/ring`). + - Includes Merkle phase timings (fetch/build/diff nanos) and counters for keys pulled during anti-entropy. Planned next steps (roadmap excerpts): network transport abstraction, quorum reads/writes, versioning (vector clocks or lamport), failure detection / node states, rebalancing & anti‑entropy sync. +### Roadmap / PRD Progress Snapshot + +| Area | Status | +|------|--------| +| Core in-process sharding | Complete (static ring) | +| Replication fan-out | Implemented (best-effort) | +| Read-repair | Implemented | +| Merkle anti-entropy | Implemented (pull-based) | +| Merkle performance metrics | Implemented (fetch/build/diff nanos) | +| Remote-only key enumeration fallback | Implemented with optional cap (`WithDistListKeysCap`) | +| Delete semantics (tombstones) | Implemented (no compaction yet) | +| Tombstone compaction / TTL | Planned | +| Quorum read/write consistency | Partially scaffolded (consistency levels enum) | +| Failure detection / heartbeat | Experimental heartbeat present | +| Membership changes / dynamic rebalancing | Not yet | +| Network transport (HTTP partial) | Basic HTTP management + fetch merkle/keys; full RPC TBD | +| Tracing spans (distributed ops) | Planned | +| Metrics exposure | Basic + Merkle phase metrics | +| Persistence | Not in scope yet | +| Benchmarks & tests | Extensive unit + benchmark coverage | + Example minimal setup: ```go From 7036fc7c5b3ead21c96997066bdcf3333002a458 Mon Sep 17 00:00:00 2001 From: Francesco Cosentino Date: Sat, 23 Aug 2025 18:43:25 +0300 Subject: [PATCH 6/7] feat: add tombstone TTL and compaction with enhanced read consistency - Add configurable tombstone TTL and periodic compaction to reclaim memory - Implement WithDistTombstoneTTL and WithDistTombstoneSweep options - Add tombstone metrics tracking (TombstonesActive, TombstonesPurged) - Enhance quorum reads with targeted stale owner repair - Refactor consistency logic with collectQuorum helper method - Update README with new configuration options and metrics - Add comprehensive test coverage for stale quorum scenarios - Improve error handling and code formatting in existing tests This addresses memory management concerns with tombstone accumulation while improving distributed consistency guarantees through better read repair mechanisms. --- README.md | 7 +- pkg/backend/dist_memory.go | 164 +++++++++++++++++- ...hypercache_distmemory_stale_quorum_test.go | 102 +++++++++++ tests/merkle_delete_tombstone_test.go | 11 +- tests/merkle_empty_tree_test.go | 7 +- tests/merkle_no_diff_test.go | 8 +- tests/merkle_single_missing_key_test.go | 5 +- 7 files changed, 287 insertions(+), 17 deletions(-) create mode 100644 tests/hypercache_distmemory_stale_quorum_test.go diff --git a/README.md b/README.md index 1c4ca7a..767f66f 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ It is optimized for performance and flexibility: - Tunable expiration and eviction intervals (or fully proactive eviction when the eviction interval is set to `0`). - Debounced & coalesced expiration trigger channel to avoid thrashing. -- Non-blocking manual `TriggerEviction()` signal. +- Non-blocking manual `TriggerEviction(context.Context)` signal. - Serializer‑aware memory accounting (item size reflects the backend serialization format when available). - Multiple eviction algorithms with the ability to register custom ones. - Multiple stats collectors (default histogram) and middleware hooks. @@ -189,6 +189,8 @@ if err != nil { | `WithDistListKeysCap` | (DistMemory) Cap number of keys fetched via fallback enumeration. | | `WithDistNode` | (DistMemory) Explicit node identity (id/address). | | `WithDistSeeds` | (DistMemory) Static seed addresses to pre-populate membership. | +| `WithDistTombstoneTTL` | (DistMemory) Retain delete tombstones for this duration before compaction (<=0 = infinite). | +| `WithDistTombstoneSweep` | (DistMemory) Interval to run tombstone compaction (<=0 disables). | *ARC is experimental (not registered by default). @@ -215,9 +217,10 @@ Current capabilities: - Tombstone versioning uses a per-process monotonic counter when no prior item version exists (avoids time-based unsigned casts). - Remote pull sync will infer a tombstone when a key present locally is absent remotely and no local tomb exists (anti-resurrection guard). - DebugInject intentionally clears any existing tombstone for that key (test helper / simulating authoritative resurrection with higher version). - - Planned: configurable tombstone TTL + periodic compaction to reclaim memory; metrics for active tombstones and purges. + - Tombstone TTL + periodic compaction: configure with `WithDistTombstoneTTL` / `WithDistTombstoneSweep`; metrics track active & purged counts. - Metrics exposed via management endpoints (`/dist/metrics`, `/dist/owners`, `/cluster/members`, `/cluster/ring`). - Includes Merkle phase timings (fetch/build/diff nanos) and counters for keys pulled during anti-entropy. + - Tombstone metrics: `TombstonesActive`, `TombstonesPurged`. Planned next steps (roadmap excerpts): network transport abstraction, quorum reads/writes, versioning (vector clocks or lamport), failure detection / node states, rebalancing & anti‑entropy sync. diff --git a/pkg/backend/dist_memory.go b/pkg/backend/dist_memory.go index d8161f2..b964b86 100644 --- a/pkg/backend/dist_memory.go +++ b/pkg/backend/dist_memory.go @@ -94,6 +94,11 @@ type DistMemory struct { // tombstone version source when no prior item exists (monotonic per process) tombVersionCounter atomic.Uint64 + + // tombstone retention / compaction + tombstoneTTL time.Duration + tombstoneSweepInt time.Duration + tombStopCh chan struct{} } // hintedEntry represents a deferred replica write. @@ -131,6 +136,16 @@ func WithDistWriteConsistency(l ConsistencyLevel) DistMemoryOption { return func(dm *DistMemory) { dm.writeConsistency = l } } +// WithDistTombstoneTTL configures how long tombstones are retained before subject to compaction (<=0 keeps indefinitely). +func WithDistTombstoneTTL(d time.Duration) DistMemoryOption { + return func(dm *DistMemory) { dm.tombstoneTTL = d } +} + +// WithDistTombstoneSweep sets sweep interval for tombstone compaction (<=0 disables automatic sweeps). +func WithDistTombstoneSweep(interval time.Duration) DistMemoryOption { + return func(dm *DistMemory) { dm.tombstoneSweepInt = interval } +} + // Membership returns current membership reference (read-only usage). func (dm *DistMemory) Membership() *cluster.Membership { return dm.membership } @@ -443,6 +458,7 @@ func NewDistMemory(ctx context.Context, opts ...DistMemoryOption) (IBackend[Dist dm.startHintReplayIfEnabled(ctx) dm.startGossipIfEnabled() dm.startAutoSyncIfEnabled(ctx) + dm.startTombstoneSweeper() return dm, nil } @@ -762,6 +778,8 @@ type distMetrics struct { merkleDiffNanos int64 // last diff duration (ns) merkleFetchNanos int64 // last remote fetch duration (ns) autoSyncLoops int64 // number of auto-sync ticks executed + tombstonesActive int64 // approximate active tombstones + tombstonesPurged int64 // cumulative purged tombstones } // DistMetrics snapshot. @@ -791,6 +809,8 @@ type DistMetrics struct { AutoSyncLoops int64 LastAutoSyncNanos int64 LastAutoSyncError string + TombstonesActive int64 + TombstonesPurged int64 } // Metrics returns a snapshot of distributed metrics. @@ -828,6 +848,8 @@ func (dm *DistMemory) Metrics() DistMetrics { AutoSyncLoops: atomic.LoadInt64(&dm.metrics.autoSyncLoops), LastAutoSyncNanos: dm.lastAutoSyncDuration.Load(), LastAutoSyncError: lastErr, + TombstonesActive: atomic.LoadInt64(&dm.metrics.tombstonesActive), + TombstonesPurged: atomic.LoadInt64(&dm.metrics.tombstonesPurged), } } @@ -856,6 +878,10 @@ func (dm *DistMemory) Stop(ctx context.Context) error { //nolint:ireturn } } + if dm.tombStopCh != nil { // stop tomb sweeper + close(dm.tombStopCh) + } + return nil } @@ -978,6 +1004,7 @@ func (dm *DistMemory) fetchAndAdopt(ctx context.Context, nodeID, key string) { nextVer := cur.Version + 1 sh.tombs[key] = tombstone{version: nextVer, origin: string(dm.localNode.ID), at: time.Now()} + atomic.StoreInt64(&dm.metrics.tombstonesActive, dm.countTombstones()) } return @@ -990,6 +1017,7 @@ func (dm *DistMemory) fetchAndAdopt(ctx context.Context, nodeID, key string) { } // remote has newer version; clear tombstone (key resurrected intentionally) delete(sh.tombs, key) + atomic.StoreInt64(&dm.metrics.tombstonesActive, dm.countTombstones()) } if cur, okLocal := sh.items.Get(key); !okLocal || it.Version > cur.Version { @@ -1020,6 +1048,74 @@ func (dm *DistMemory) merkleEntries() []merkleKV { return entries } +// startTombstoneSweeper launches periodic compaction if configured. +func (dm *DistMemory) startTombstoneSweeper() { //nolint:ireturn + if dm.tombstoneTTL <= 0 || dm.tombstoneSweepInt <= 0 { + return + } + + dm.tombStopCh = make(chan struct{}) + go func() { + ticker := time.NewTicker(dm.tombstoneSweepInt) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + purged := dm.compactTombstones() + if purged > 0 { + atomic.AddInt64(&dm.metrics.tombstonesPurged, purged) + } + + atomic.StoreInt64(&dm.metrics.tombstonesActive, dm.countTombstones()) + + case <-dm.tombStopCh: + return + } + } + }() +} + +// compactTombstones removes expired tombstones based on TTL, returns count purged. +func (dm *DistMemory) compactTombstones() int64 { //nolint:ireturn + if dm.tombstoneTTL <= 0 { + return 0 + } + + now := time.Now() + + var purged int64 + for _, sh := range dm.shards { + if sh == nil { + continue + } + + for k, ts := range sh.tombs { + if now.Sub(ts.at) >= dm.tombstoneTTL { + delete(sh.tombs, k) + + purged++ + } + } + } + + return purged +} + +// countTombstones returns approximate current count. +func (dm *DistMemory) countTombstones() int64 { //nolint:ireturn + var total int64 + for _, sh := range dm.shards { + if sh == nil { + continue + } + + total += int64(len(sh.tombs)) + } + + return total +} + func encodeUint64BigEndian(buf []byte, v uint64) { for i := merkleVersionBytes - 1; i >= 0; i-- { // big endian for deterministic hashing buf[i] = byte(v) @@ -1229,24 +1325,77 @@ func (dm *DistMemory) getWithConsistency(ctx context.Context, key string, owners var chosen *cache.Item + // gather results sequentially until quorum reached, tracking stale owners + staleOwners := dm.collectQuorum(ctx, key, owners, needed, &chosen, &acks) + if acks < needed || chosen == nil { + return nil, false + } + + dm.repairStaleOwners(ctx, key, chosen, staleOwners) + dm.repairReplicas(ctx, key, chosen, owners) + + return chosen, true +} + +// collectQuorum iterates owners, updates chosen item and acks, returns owners needing repair. +func (dm *DistMemory) collectQuorum( + ctx context.Context, + key string, + owners []cluster.NodeID, + needed int, + chosen **cache.Item, + acks *int, +) []cluster.NodeID { //nolint:ireturn + stale := make([]cluster.NodeID, 0, len(owners)) for idx, oid := range owners { it, ok := dm.fetchOwner(ctx, key, idx, oid) if !ok { continue } - chosen = dm.chooseNewer(chosen, it) - acks++ + prev := *chosen + + *chosen = dm.chooseNewer(*chosen, it) + *acks++ + + if prev != nil && *chosen != prev { + stale = append(stale, oid) + } + + if *acks >= needed && *chosen != nil { // early break + break + } } - if acks < needed || chosen == nil { - return nil, false + return stale +} + +// repairStaleOwners issues best-effort targeted repairs for owners identified as stale. +func (dm *DistMemory) repairStaleOwners( + ctx context.Context, + key string, + chosen *cache.Item, + staleOwners []cluster.NodeID, +) { //nolint:ireturn + if dm.transport == nil || chosen == nil { + return } - // version-based read repair across all owners if stale/missing - dm.repairReplicas(ctx, key, chosen, owners) + for _, oid := range staleOwners { + if oid == dm.localNode.ID { // local handled in repairReplicas + continue + } - return chosen, true + it, ok, err := dm.transport.ForwardGet(ctx, string(oid), key) + if err != nil { // skip unreachable + continue + } + + if !ok || it.Version < chosen.Version || (it.Version == chosen.Version && it.Origin > chosen.Origin) { + _ = dm.transport.ForwardSet(ctx, string(oid), chosen, false) //nolint:errcheck + atomic.AddInt64(&dm.metrics.readRepair, 1) + } + } } // fetchOwner attempts to fetch item from given owner (local or remote) updating metrics. @@ -1862,6 +2011,7 @@ func (dm *DistMemory) applyRemove(ctx context.Context, key string, replicate boo sh.items.Remove(key) sh.tombs[key] = tombstone{version: nextVer, origin: string(dm.localNode.ID), at: time.Now()} + atomic.StoreInt64(&dm.metrics.tombstonesActive, dm.countTombstones()) if !replicate || dm.ring == nil || dm.transport == nil { return diff --git a/tests/hypercache_distmemory_stale_quorum_test.go b/tests/hypercache_distmemory_stale_quorum_test.go new file mode 100644 index 0000000..8a82a6f --- /dev/null +++ b/tests/hypercache_distmemory_stale_quorum_test.go @@ -0,0 +1,102 @@ +package tests + +import ( + "context" + "testing" + "time" + + "github.com/hyp3rd/hypercache/internal/cluster" + "github.com/hyp3rd/hypercache/pkg/backend" + cachev2 "github.com/hyp3rd/hypercache/pkg/cache/v2" +) + +// TestDistMemoryStaleQuorum ensures quorum read returns newest version and repairs stale replicas. +func TestDistMemoryStaleQuorum(t *testing.T) { + ring := cluster.NewRing(cluster.WithReplication(3)) + membership := cluster.NewMembership(ring) + transport := backend.NewInProcessTransport() + + n1 := cluster.NewNode("", "n1:0") + n2 := cluster.NewNode("", "n2:0") + n3 := cluster.NewNode("", "n3:0") + + b1i, _ := backend.NewDistMemory(context.TODO(), backend.WithDistMembership(membership, n1), backend.WithDistTransport(transport), backend.WithDistReadConsistency(backend.ConsistencyQuorum)) + b2i, _ := backend.NewDistMemory(context.TODO(), backend.WithDistMembership(membership, n2), backend.WithDistTransport(transport), backend.WithDistReadConsistency(backend.ConsistencyQuorum)) + b3i, _ := backend.NewDistMemory(context.TODO(), backend.WithDistMembership(membership, n3), backend.WithDistTransport(transport), backend.WithDistReadConsistency(backend.ConsistencyQuorum)) + + b1 := b1i.(*backend.DistMemory) //nolint:forcetypeassert + b2 := b2i.(*backend.DistMemory) //nolint:forcetypeassert + b3 := b3i.(*backend.DistMemory) //nolint:forcetypeassert + + transport.Register(b1) + transport.Register(b2) + transport.Register(b3) + + key := "sq-key" + + owners := ring.Lookup(key) + if len(owners) != 3 { + t.Skip("replication factor !=3") + } + + // Write initial version via primary + primary := owners[0] + item := &cachev2.Item{Key: key, Value: "v1"} + + _ = item.Valid() + if primary == b1.LocalNodeID() { + _ = b1.Set(context.Background(), item) + } else if primary == b2.LocalNodeID() { + _ = b2.Set(context.Background(), item) + } else { + _ = b3.Set(context.Background(), item) + } + + // Manually bump version on one replica to simulate a newer write that others missed + // Pick owners[1] as ahead replica + aheadID := owners[1] + ahead := map[cluster.NodeID]*backend.DistMemory{b1.LocalNodeID(): b1, b2.LocalNodeID(): b2, b3.LocalNodeID(): b3}[aheadID] + ahead.DebugInject(&cachev2.Item{Key: key, Value: "v2", Version: 5, Origin: string(ahead.LocalNodeID()), LastUpdated: time.Now()}) + + // Drop local copy on owners[2] to simulate stale/missing + lagID := owners[2] + lag := map[cluster.NodeID]*backend.DistMemory{b1.LocalNodeID(): b1, b2.LocalNodeID(): b2, b3.LocalNodeID(): b3}[lagID] + lag.DebugDropLocal(key) + + // Issue quorum read from a non-ahead node (choose primary if not ahead, else third) + requester := b1 + if requester.LocalNodeID() == aheadID { + requester = b2 + } + + if requester.LocalNodeID() == aheadID { + requester = b3 + } + + got, ok := requester.Get(context.Background(), key) + if !ok { + t.Fatalf("quorum get failed") + } + // Value stored as interface{} may be string (not []byte) in this test + if sval, okCast := got.Value.(string); !okCast || sval != "v2" { + t.Fatalf("expected quorum to return newer version v2, got=%v (type %T)", got.Value, got.Value) + } + + // Allow brief repair propagation + time.Sleep(50 * time.Millisecond) + + // All owners should now have v2 (version 5) + for _, oid := range owners { + inst := map[cluster.NodeID]*backend.DistMemory{b1.LocalNodeID(): b1, b2.LocalNodeID(): b2, b3.LocalNodeID(): b3}[oid] + + it, ok2 := inst.Get(context.Background(), key) + if !ok2 || it.Version != 5 { + t.Fatalf("owner %s not repaired to v2 (v5) -> (%v,%v)", oid, ok2, it) + } + } + + // ReadRepair metric should have incremented somewhere + if b1.Metrics().ReadRepair+b2.Metrics().ReadRepair+b3.Metrics().ReadRepair == 0 { + t.Fatalf("expected read repair metric >0") + } +} diff --git a/tests/merkle_delete_tombstone_test.go b/tests/merkle_delete_tombstone_test.go index 2934977..d9707a6 100644 --- a/tests/merkle_delete_tombstone_test.go +++ b/tests/merkle_delete_tombstone_test.go @@ -19,6 +19,7 @@ func TestMerkleDeleteTombstone(t *testing.T) { da := any(a).(*backend.DistMemory) db := any(b).(*backend.DistMemory) + da.SetTransport(transport) db.SetTransport(transport) transport.Register(da) @@ -26,7 +27,8 @@ func TestMerkleDeleteTombstone(t *testing.T) { it := &cache.Item{Key: "del", Value: []byte("v"), Version: 1, Origin: "A", LastUpdated: time.Now()} da.DebugInject(it) - if err := db.SyncWith(ctx, string(da.LocalNodeID())); err != nil { + err := db.SyncWith(ctx, string(da.LocalNodeID())) + if err != nil { t.Fatalf("initial sync: %v", err) } @@ -39,7 +41,8 @@ func TestMerkleDeleteTombstone(t *testing.T) { } // Remote (B) pulls from A to learn about deletion (pull-based anti-entropy) - if err := db.SyncWith(ctx, string(da.LocalNodeID())); err != nil { + err = db.SyncWith(ctx, string(da.LocalNodeID())) + if err != nil { t.Fatalf("tomb sync pull: %v", err) } @@ -53,9 +56,11 @@ func TestMerkleDeleteTombstone(t *testing.T) { db.DebugInject(stale) // Sync B with A again; B should keep deletion (not resurrect) - if err := db.SyncWith(ctx, string(da.LocalNodeID())); err != nil { + err = db.SyncWith(ctx, string(da.LocalNodeID())) + if err != nil { t.Fatalf("resync: %v", err) } + if val, _ := db.Get(ctx, "del"); val != nil { t.Fatalf("tombstone failed; key resurrected") } diff --git a/tests/merkle_empty_tree_test.go b/tests/merkle_empty_tree_test.go index 4ce8993..3ac3aab 100644 --- a/tests/merkle_empty_tree_test.go +++ b/tests/merkle_empty_tree_test.go @@ -17,15 +17,18 @@ func TestMerkleEmptyTrees(t *testing.T) { da := any(a).(*backend.DistMemory) db := any(b).(*backend.DistMemory) + da.SetTransport(transport) db.SetTransport(transport) transport.Register(da) transport.Register(db) - if err := da.SyncWith(ctx, string(db.LocalNodeID())); err != nil { + err := da.SyncWith(ctx, string(db.LocalNodeID())) + if err != nil { t.Fatalf("sync empty: %v", err) } - if err := db.SyncWith(ctx, string(da.LocalNodeID())); err != nil { + err = db.SyncWith(ctx, string(da.LocalNodeID())) + if err != nil { t.Fatalf("sync empty reverse: %v", err) } } diff --git a/tests/merkle_no_diff_test.go b/tests/merkle_no_diff_test.go index ba9a2f7..2b839aa 100644 --- a/tests/merkle_no_diff_test.go +++ b/tests/merkle_no_diff_test.go @@ -19,6 +19,7 @@ func TestMerkleNoDiff(t *testing.T) { da := any(a).(*backend.DistMemory) db := any(b).(*backend.DistMemory) + da.SetTransport(transport) db.SetTransport(transport) transport.Register(da) @@ -28,14 +29,17 @@ func TestMerkleNoDiff(t *testing.T) { for i := range 10 { itA := &cache.Item{Key: keyf("nd", i), Value: []byte("v"), Version: uint64(i + 1), Origin: "A", LastUpdated: time.Now()} itB := &cache.Item{Key: keyf("nd", i), Value: []byte("v"), Version: uint64(i + 1), Origin: "B", LastUpdated: time.Now()} + da.DebugInject(itA) db.DebugInject(itB) } - if err := da.SyncWith(ctx, string(db.LocalNodeID())); err != nil { + err := da.SyncWith(ctx, string(db.LocalNodeID())) + if err != nil { t.Fatalf("sync: %v", err) } - if err := db.SyncWith(ctx, string(da.LocalNodeID())); err != nil { + err = db.SyncWith(ctx, string(da.LocalNodeID())) + if err != nil { t.Fatalf("sync2: %v", err) } } diff --git a/tests/merkle_single_missing_key_test.go b/tests/merkle_single_missing_key_test.go index 9d391fb..7d3e2ba 100644 --- a/tests/merkle_single_missing_key_test.go +++ b/tests/merkle_single_missing_key_test.go @@ -19,6 +19,7 @@ func TestMerkleSingleMissingKey(t *testing.T) { da := any(a).(*backend.DistMemory) db := any(b).(*backend.DistMemory) + da.SetTransport(transport) db.SetTransport(transport) transport.Register(da) @@ -28,7 +29,8 @@ func TestMerkleSingleMissingKey(t *testing.T) { it := &cache.Item{Key: "k1", Value: []byte("v1"), Version: 1, Origin: "A", LastUpdated: time.Now()} da.DebugInject(it) - if err := db.SyncWith(ctx, string(da.LocalNodeID())); err != nil { + err := db.SyncWith(ctx, string(da.LocalNodeID())) + if err != nil { t.Fatalf("sync single: %v", err) } @@ -36,6 +38,7 @@ func TestMerkleSingleMissingKey(t *testing.T) { if got == nil { t.Fatalf("expected key pulled") } + if bs, ok := got.Value.([]byte); !ok || string(bs) != "v1" { t.Fatalf("unexpected value %v", got.Value) } From af59f5c0e6385e1d43b2b36f3230f09c650e1112 Mon Sep 17 00:00:00 2001 From: Francesco Cosentino Date: Sat, 23 Aug 2025 18:50:57 +0300 Subject: [PATCH 7/7] fix: improve parallel quorum reads with proper goroutine capture and stale tracking - Fix variable capture in goroutines for Go <1.22 compatibility - Add owner tracking to parallel fetch results to enable targeted repairs - Implement stale owner detection during parallel consensus building - Add targeted repair mechanism before full replica repair - Improve code structure and comments for better maintainability This ensures parallel quorum reads correctly identify and repair stale replicas while maintaining compatibility with older Go versions that require explicit variable capture in goroutine closures. --- pkg/backend/dist_memory.go | 47 ++++++++++++++++++++++++++------------ 1 file changed, 32 insertions(+), 15 deletions(-) diff --git a/pkg/backend/dist_memory.go b/pkg/backend/dist_memory.go index b964b86..cc8c320 100644 --- a/pkg/backend/dist_memory.go +++ b/pkg/backend/dist_memory.go @@ -1458,8 +1458,9 @@ func (dm *DistMemory) getWithConsistencyParallel(ctx context.Context, key string needed := dm.requiredAcks(len(owners), dm.readConsistency) type res struct { - it *cache.Item - ok bool + owner cluster.NodeID + it *cache.Item + ok bool } ch := make(chan res, len(owners)) @@ -1467,27 +1468,39 @@ func (dm *DistMemory) getWithConsistencyParallel(ctx context.Context, key string ctxFetch, cancel := context.WithCancel(ctx) defer cancel() - for idx, oid := range owners { // launch all - go func() { - it, ok := dm.fetchOwner(ctxFetch, key, idx, oid) - ch <- res{it: it, ok: ok} - }() + for idx, oid := range owners { // launch all with proper capture (Go <1.22 style) + idxLocal, oidLocal := idx, oid + go func(i int, o cluster.NodeID) { + it, ok := dm.fetchOwner(ctxFetch, key, i, o) + ch <- res{owner: o, it: it, ok: ok} + }(idxLocal, oidLocal) } acks := 0 var chosen *cache.Item + + staleOwners := make([]cluster.NodeID, 0, len(owners)) + for range owners { - r := <-ch - if r.ok { - chosen = dm.chooseNewer(chosen, r.it) + resp := <-ch + if !resp.ok { + continue + } - acks++ - if acks >= needed && chosen != nil { // early satisfied - cancel() + prev := chosen - break - } + chosen = dm.chooseNewer(chosen, resp.it) + acks++ + + if prev != nil && chosen != prev { // newer version found, mark new owner for targeted check (mirrors sequential path semantics) + staleOwners = append(staleOwners, resp.owner) + } + + if acks >= needed && chosen != nil { // early satisfied + cancel() + + break } } @@ -1495,6 +1508,10 @@ func (dm *DistMemory) getWithConsistencyParallel(ctx context.Context, key string return nil, false } + // targeted repairs for owners involved in version advancement (best-effort) + dm.repairStaleOwners(ctx, key, chosen, staleOwners) + + // full repair across all owners to ensure convergence dm.repairReplicas(ctx, key, chosen, owners) return chosen, true