diff --git a/.github/instructions/instructions.md b/.github/instructions/instructions.md new file mode 100644 index 0000000..5d24fb1 --- /dev/null +++ b/.github/instructions/instructions.md @@ -0,0 +1,16 @@ +--- +applyTo: '**' +--- + +# Basic instructions + +Always run `golagci-lint` and staticcheck after completing a Golang task. +Verify the quality of the code you provide, including repetitions, flaws, and ways to modernise the approach to ensure consistency. Adopt a development method and stick consistently to it. +Parse the Makefile in the project, and you will find the commands you need to lint, polish, and test the code. + +Always document the solutions we find, and where applicable, use the ./docs folder for extensive documentation. + +## Toolset + +- [Makefile](../../Makefile). +- Always run: `make lint` diff --git a/README.md b/README.md index 9d3899d..767f66f 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ It is optimized for performance and flexibility: - Tunable expiration and eviction intervals (or fully proactive eviction when the eviction interval is set to `0`). - Debounced & coalesced expiration trigger channel to avoid thrashing. -- Non-blocking manual `TriggerEviction()` signal. +- Non-blocking manual `TriggerEviction(context.Context)` signal. - Serializer‑aware memory accounting (item size reflects the backend serialization format when available). - Multiple eviction algorithms with the ability to register custom ones. - Multiple stats collectors (default histogram) and middleware hooks. @@ -61,6 +61,8 @@ Endpoints (subject to change): - GET /config – sanitized runtime config (now includes replication + virtual node settings when using DistMemory) - GET /dist/metrics – distributed backend forwarding / replication counters (DistMemory only) - GET /dist/owners?key=K – current ring owners (IDs) for key K (DistMemory only, debug) +- GET /internal/merkle – Merkle tree snapshot (DistMemory experimental anti-entropy) +- GET /internal/keys – Full key enumeration (debug / anti-entropy fallback; expensive) - GET /cluster/members – membership snapshot (id, address, state, incarnation, replication factor, virtual nodes) - GET /cluster/ring – ring vnode hashes (debug / diagnostics) - POST /evict – trigger eviction cycle @@ -181,8 +183,14 @@ if err != nil { | `WithManagementHTTP` | Start optional management HTTP server. | | `WithDistReplication` | (DistMemory) Set replication factor (owners per key). | | `WithDistVirtualNodes` | (DistMemory) Virtual nodes per physical node for consistent hashing. | +| `WithDistMerkleChunkSize` | (DistMemory) Keys per Merkle leaf chunk (power-of-two recommended). | +| `WithDistMerkleAutoSync` | (DistMemory) Interval for background Merkle sync (<=0 disables). | +| `WithDistMerkleAutoSyncPeers` | (DistMemory) Limit peers synced per auto-sync tick (0=all). | +| `WithDistListKeysCap` | (DistMemory) Cap number of keys fetched via fallback enumeration. | | `WithDistNode` | (DistMemory) Explicit node identity (id/address). | | `WithDistSeeds` | (DistMemory) Static seed addresses to pre-populate membership. | +| `WithDistTombstoneTTL` | (DistMemory) Retain delete tombstones for this duration before compaction (<=0 = infinite). | +| `WithDistTombstoneSweep` | (DistMemory) Interval to run tombstone compaction (<=0 disables). | *ARC is experimental (not registered by default). @@ -204,10 +212,39 @@ Current capabilities: - Ownership enforcement (non‑owners forward to primary). - Replica fan‑out on writes (best‑effort) & replica removals. - Read‑repair when a local owner misses but another replica has the key. +- Basic delete semantics with tombstones: deletions propagate as versioned tombstones preventing + resurrection during anti-entropy (tombstone retention is in‑memory, no persistence yet). + - Tombstone versioning uses a per-process monotonic counter when no prior item version exists (avoids time-based unsigned casts). + - Remote pull sync will infer a tombstone when a key present locally is absent remotely and no local tomb exists (anti-resurrection guard). + - DebugInject intentionally clears any existing tombstone for that key (test helper / simulating authoritative resurrection with higher version). + - Tombstone TTL + periodic compaction: configure with `WithDistTombstoneTTL` / `WithDistTombstoneSweep`; metrics track active & purged counts. - Metrics exposed via management endpoints (`/dist/metrics`, `/dist/owners`, `/cluster/members`, `/cluster/ring`). + - Includes Merkle phase timings (fetch/build/diff nanos) and counters for keys pulled during anti-entropy. + - Tombstone metrics: `TombstonesActive`, `TombstonesPurged`. Planned next steps (roadmap excerpts): network transport abstraction, quorum reads/writes, versioning (vector clocks or lamport), failure detection / node states, rebalancing & anti‑entropy sync. +### Roadmap / PRD Progress Snapshot + +| Area | Status | +|------|--------| +| Core in-process sharding | Complete (static ring) | +| Replication fan-out | Implemented (best-effort) | +| Read-repair | Implemented | +| Merkle anti-entropy | Implemented (pull-based) | +| Merkle performance metrics | Implemented (fetch/build/diff nanos) | +| Remote-only key enumeration fallback | Implemented with optional cap (`WithDistListKeysCap`) | +| Delete semantics (tombstones) | Implemented (no compaction yet) | +| Tombstone compaction / TTL | Planned | +| Quorum read/write consistency | Partially scaffolded (consistency levels enum) | +| Failure detection / heartbeat | Experimental heartbeat present | +| Membership changes / dynamic rebalancing | Not yet | +| Network transport (HTTP partial) | Basic HTTP management + fetch merkle/keys; full RPC TBD | +| Tracing spans (distributed ops) | Planned | +| Metrics exposure | Basic + Merkle phase metrics | +| Persistence | Not in scope yet | +| Benchmarks & tests | Extensive unit + benchmark coverage | + Example minimal setup: ```go diff --git a/cspell.config.yaml b/cspell.config.yaml index 63dd475..1a45473 100644 --- a/cspell.config.yaml +++ b/cspell.config.yaml @@ -33,6 +33,7 @@ words: - Fprintln - freqs - funlen + - gerr - gitversion - GITVERSION - goccy @@ -49,11 +50,13 @@ words: - ints - ireturn - Itemm + - keyf - lamport - LFUDA - localmodule - logrus - memprofile + - Merkle - Mgmt - msgpack - mvdan @@ -61,6 +64,7 @@ words: - Newf - nolint - nonamedreturns + - nosec - NOVENDOR - paralleltest - Pipeliner diff --git a/pkg/backend/dist_http_server.go b/pkg/backend/dist_http_server.go index ecfdf48..e6bd1b6 100644 --- a/pkg/backend/dist_http_server.go +++ b/pkg/backend/dist_http_server.go @@ -40,6 +40,7 @@ func (s *distHTTPServer) start(ctx context.Context, dm *DistMemory) error { //no s.registerGet(ctx, dm) s.registerRemove(ctx, dm) s.registerHealth() + s.registerMerkle(ctx, dm) return s.listen(ctx) } @@ -112,6 +113,35 @@ func (s *distHTTPServer) registerHealth() { //nolint:ireturn s.app.Get("/health", func(fctx fiber.Ctx) error { return fctx.SendString("ok") }) } +func (s *distHTTPServer) registerMerkle(_ context.Context, dm *DistMemory) { //nolint:ireturn + s.app.Get("/internal/merkle", func(fctx fiber.Ctx) error { + tree := dm.BuildMerkleTree() + + return fctx.JSON(fiber.Map{ + "root": tree.Root, + "leaf_hashes": tree.LeafHashes, + "chunk_size": tree.ChunkSize, + }) + }) + + // naive keys listing for anti-entropy (testing only). Not efficient for large datasets. + s.app.Get("/internal/keys", func(fctx fiber.Ctx) error { + var keys []string + for _, shard := range dm.shards { + if shard == nil { + continue + } + + ch := shard.items.IterBuffered() + for t := range ch { + keys = append(keys, t.Key) + } + } + + return fctx.JSON(fiber.Map{"keys": keys}) + }) +} + func (s *distHTTPServer) listen(ctx context.Context) error { //nolint:ireturn lc := net.ListenConfig{} diff --git a/pkg/backend/dist_http_transport.go b/pkg/backend/dist_http_transport.go index 2f9d0da..4e748dd 100644 --- a/pkg/backend/dist_http_transport.go +++ b/pkg/backend/dist_http_transport.go @@ -1,4 +1,3 @@ -// Package backend provides backend implementations including a distributed HTTP transport. package backend import ( @@ -18,41 +17,36 @@ import ( ) // DistHTTPTransport implements DistTransport over HTTP JSON. -type DistHTTPTransport struct { // minimal MVP +type DistHTTPTransport struct { client *http.Client - baseURLFn func(nodeID string) (string, bool) // resolves nodeID -> base URL (scheme+host) + baseURLFn func(nodeID string) (string, bool) } -// internal status code threshold for error classification. const statusThreshold = 300 -// NewDistHTTPTransport creates a new HTTP transport. -func NewDistHTTPTransport(timeout time.Duration, resolver func(string) (string, bool)) *DistHTTPTransport { +// NewDistHTTPTransport constructs a DistHTTPTransport with the given timeout and +// nodeID->baseURL resolver. Timeout <=0 defaults to 2s. +func NewDistHTTPTransport(timeout time.Duration, resolver func(string) (string, bool)) *DistHTTPTransport { //nolint:ireturn if timeout <= 0 { timeout = 2 * time.Second } - return &DistHTTPTransport{ - client: &http.Client{Timeout: timeout}, - baseURLFn: resolver, - } + return &DistHTTPTransport{client: &http.Client{Timeout: timeout}, baseURLFn: resolver} } -// request/response DTOs moved to dist_http_types.go - const ( errMsgNewRequest = "new request" errMsgDoRequest = "do request" ) -// ForwardSet forwards a set (or replication) request to a remote node. +// ForwardSet sends a Set/Replicate request to a remote node. func (t *DistHTTPTransport) ForwardSet(ctx context.Context, nodeID string, item *cache.Item, replicate bool) error { //nolint:ireturn base, ok := t.baseURLFn(nodeID) if !ok { return sentinel.ErrBackendNotFound } - reqBody := httpSetRequest{ // split for line length + reqBody := httpSetRequest{ Key: item.Key, Value: item.Value, Expiration: item.Expiration.Milliseconds(), @@ -66,9 +60,7 @@ func (t *DistHTTPTransport) ForwardSet(ctx context.Context, nodeID string, item return ewrap.Wrap(err, "marshal set request") } - url := base + "/internal/cache/set" - - hreq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(payloadBytes)) + hreq, err := http.NewRequestWithContext(ctx, http.MethodPost, base+"/internal/cache/set", bytes.NewReader(payloadBytes)) if err != nil { return ewrap.Wrap(err, errMsgNewRequest) } @@ -80,15 +72,12 @@ func (t *DistHTTPTransport) ForwardSet(ctx context.Context, nodeID string, item return ewrap.Wrap(err, errMsgDoRequest) } - defer func() { - _ = resp.Body.Close() //nolint:errcheck // best-effort - }() + defer func() { _ = resp.Body.Close() }() //nolint:errcheck if resp.StatusCode == http.StatusNotFound { return sentinel.ErrBackendNotFound } - const statusThreshold = 300 // local redeclare for linter clarity if resp.StatusCode >= statusThreshold { body, rerr := io.ReadAll(resp.Body) if rerr != nil { @@ -101,18 +90,14 @@ func (t *DistHTTPTransport) ForwardSet(ctx context.Context, nodeID string, item return nil } -// type httpGetResponse formerly used for direct decoding; replaced by map-based decoding to satisfy linters. - -// ForwardGet fetches an item from a remote node. -func (t *DistHTTPTransport) ForwardGet(ctx context.Context, nodeID string, key string) (*cache.Item, bool, error) { //nolint:ireturn +// ForwardGet fetches a single item from a remote node. +func (t *DistHTTPTransport) ForwardGet(ctx context.Context, nodeID, key string) (*cache.Item, bool, error) { //nolint:ireturn base, ok := t.baseURLFn(nodeID) if !ok { return nil, false, sentinel.ErrBackendNotFound } - url := fmt.Sprintf("%s/internal/cache/get?key=%s", base, key) - - hreq, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + hreq, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/internal/cache/get?key=%s", base, key), nil) if err != nil { return nil, false, ewrap.Wrap(err, errMsgNewRequest) } @@ -122,7 +107,7 @@ func (t *DistHTTPTransport) ForwardGet(ctx context.Context, nodeID string, key s return nil, false, ewrap.Wrap(err, errMsgDoRequest) } - defer func() { _ = resp.Body.Close() }() //nolint:errcheck // best-effort + defer func() { _ = resp.Body.Close() }() //nolint:errcheck if resp.StatusCode == http.StatusNotFound { return nil, false, sentinel.ErrBackendNotFound @@ -144,7 +129,6 @@ func (t *DistHTTPTransport) ForwardGet(ctx context.Context, nodeID string, key s return item, true, nil } -// decodeGetBody decodes a get response body into an item. func decodeGetBody(r io.Reader) (*cache.Item, bool, error) { //nolint:ireturn var raw map[string]json.RawMessage @@ -168,7 +152,6 @@ func decodeGetBody(r io.Reader) (*cache.Item, bool, error) { //nolint:ireturn } if ib, ok := raw["item"]; ok && len(ib) > 0 { - // define minimal mirror struct to ensure json tags present (satisfy musttag) var mirror struct { Key string `json:"key"` Value json.RawMessage `json:"value"` @@ -181,8 +164,8 @@ func decodeGetBody(r io.Reader) (*cache.Item, bool, error) { //nolint:ireturn if err != nil { return nil, false, ewrap.Wrap(err, "unmarshal mirror") } - // reconstruct cache.Item (we ignore expiration formatting difference vs ms) - return &cache.Item{ // multi-line for readability + + return &cache.Item{ Key: mirror.Key, Value: mirror.Value, Expiration: time.Duration(mirror.Expiration) * time.Millisecond, @@ -195,16 +178,14 @@ func decodeGetBody(r io.Reader) (*cache.Item, bool, error) { //nolint:ireturn return &cache.Item{}, true, nil } -// ForwardRemove forwards a remove operation. -func (t *DistHTTPTransport) ForwardRemove(ctx context.Context, nodeID string, key string, replicate bool) error { //nolint:ireturn +// ForwardRemove propagates a delete operation to a remote node. +func (t *DistHTTPTransport) ForwardRemove(ctx context.Context, nodeID, key string, replicate bool) error { //nolint:ireturn base, ok := t.baseURLFn(nodeID) if !ok { return sentinel.ErrBackendNotFound } - url := fmt.Sprintf("%s/internal/cache/remove?key=%s&replicate=%t", base, key, replicate) - - hreq, err := http.NewRequestWithContext(ctx, http.MethodDelete, url, nil) + hreq, err := http.NewRequestWithContext(ctx, http.MethodDelete, fmt.Sprintf("%s/internal/cache/remove?key=%s&replicate=%t", base, key, replicate), nil) if err != nil { return ewrap.Wrap(err, errMsgNewRequest) } @@ -214,7 +195,7 @@ func (t *DistHTTPTransport) ForwardRemove(ctx context.Context, nodeID string, ke return ewrap.Wrap(err, errMsgDoRequest) } - defer func() { _ = resp.Body.Close() }() //nolint:errcheck // best-effort + defer func() { _ = resp.Body.Close() }() //nolint:errcheck if resp.StatusCode == http.StatusNotFound { return sentinel.ErrBackendNotFound @@ -227,16 +208,14 @@ func (t *DistHTTPTransport) ForwardRemove(ctx context.Context, nodeID string, ke return nil } -// Health probes a remote node health endpoint. +// Health performs a health probe against a remote node. func (t *DistHTTPTransport) Health(ctx context.Context, nodeID string) error { //nolint:ireturn base, ok := t.baseURLFn(nodeID) if !ok { return sentinel.ErrBackendNotFound } - url := base + "/health" - - hreq, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + hreq, err := http.NewRequestWithContext(ctx, http.MethodGet, base+"/health", nil) if err != nil { return ewrap.Wrap(err, errMsgNewRequest) } @@ -246,7 +225,7 @@ func (t *DistHTTPTransport) Health(ctx context.Context, nodeID string) error { / return ewrap.Wrap(err, errMsgDoRequest) } - defer func() { _ = resp.Body.Close() }() //nolint:errcheck // best-effort + defer func() { _ = resp.Body.Close() }() //nolint:errcheck if resp.StatusCode == http.StatusNotFound { return sentinel.ErrBackendNotFound @@ -258,3 +237,87 @@ func (t *DistHTTPTransport) Health(ctx context.Context, nodeID string) error { / return nil } + +// FetchMerkle retrieves a Merkle tree snapshot from a remote node. +func (t *DistHTTPTransport) FetchMerkle(ctx context.Context, nodeID string) (*MerkleTree, error) { //nolint:ireturn + if t == nil { + return nil, errNoTransport + } + + base, ok := t.baseURLFn(nodeID) + if !ok { + return nil, sentinel.ErrBackendNotFound + } + + hreq, err := http.NewRequestWithContext(ctx, http.MethodGet, base+"/internal/merkle", nil) + if err != nil { + return nil, ewrap.Wrap(err, errMsgNewRequest) + } + + resp, err := t.client.Do(hreq) + if err != nil { + return nil, ewrap.Wrap(err, errMsgDoRequest) + } + + defer func() { _ = resp.Body.Close() }() //nolint:errcheck + + if resp.StatusCode == http.StatusNotFound { + return nil, sentinel.ErrBackendNotFound + } + + if resp.StatusCode >= statusThreshold { + return nil, ewrap.Newf("fetch merkle status %d", resp.StatusCode) + } + + var body struct { + Root []byte `json:"root"` + LeafHashes [][]byte `json:"leaf_hashes"` + ChunkSize int `json:"chunk_size"` + } + + dec := json.NewDecoder(resp.Body) + + err = dec.Decode(&body) + if err != nil { + return nil, ewrap.Wrap(err, "decode merkle") + } + + return &MerkleTree{Root: body.Root, LeafHashes: body.LeafHashes, ChunkSize: body.ChunkSize}, nil +} + +// ListKeys returns all keys from a remote node (expensive; used for tests / anti-entropy fallback). +func (t *DistHTTPTransport) ListKeys(ctx context.Context, nodeID string) ([]string, error) { //nolint:ireturn + base, ok := t.baseURLFn(nodeID) + if !ok { + return nil, sentinel.ErrBackendNotFound + } + + hreq, err := http.NewRequestWithContext(ctx, http.MethodGet, base+"/internal/keys", nil) + if err != nil { + return nil, ewrap.Wrap(err, errMsgNewRequest) + } + + resp, err := t.client.Do(hreq) + if err != nil { + return nil, ewrap.Wrap(err, errMsgDoRequest) + } + + defer func() { _ = resp.Body.Close() }() //nolint:errcheck + + if resp.StatusCode >= statusThreshold { + return nil, ewrap.Newf("list keys status %d", resp.StatusCode) + } + + var body struct { + Keys []string `json:"keys"` + } + + dec := json.NewDecoder(resp.Body) + + err = dec.Decode(&body) + if err != nil { + return nil, ewrap.Wrap(err, "decode keys") + } + + return body.Keys, nil +} diff --git a/pkg/backend/dist_memory.go b/pkg/backend/dist_memory.go index f56a30a..cc8c320 100644 --- a/pkg/backend/dist_memory.go +++ b/pkg/backend/dist_memory.go @@ -2,8 +2,14 @@ package backend import ( "context" + "crypto/rand" + "crypto/sha256" "errors" + "hash" "hash/fnv" + "math/big" + "sort" + "sync" "sync/atomic" "time" @@ -18,6 +24,16 @@ const ( listPrealloc = 32 // pre-allocation size for List results ) +// Merkle internal tuning constants & errors. +const ( + defaultMerkleChunkSize = 128 + merklePreallocEntries = 256 + merkleVersionBytes = 8 + shiftPerByte = 8 // bit shift per byte when encoding uint64 +) + +var errNoTransport = errors.New("no transport") + // DistMemory is a sharded in-process distributed-like backend. It simulates // distribution by consistent hashing across a fixed set of in-memory shards. // It is intended for single-process multi-shard experimentation; NOT cross-process. @@ -48,6 +64,54 @@ type DistMemory struct { readConsistency ConsistencyLevel writeConsistency ConsistencyLevel versionCounter uint64 // global monotonic for this node (lamport-like) + + // hinted handoff + hintTTL time.Duration + hintReplayInt time.Duration + hintMaxPerNode int + hintsMu sync.Mutex + hints map[string][]hintedEntry // nodeID -> queue + hintStopCh chan struct{} + + // parallel reads + parallelReads bool + + // simple gossip + gossipInterval time.Duration + gossipStopCh chan struct{} + + // anti-entropy + merkleChunkSize int // number of keys per leaf chunk (power-of-two recommended) + listKeysMax int // cap for fallback ListKeys pulls (0 = unlimited) + + // periodic merkle auto-sync + autoSyncInterval time.Duration + autoSyncStopCh chan struct{} + autoSyncPeersPerInterval int // limit number of peers synced per tick (0=all) + + lastAutoSyncDuration atomic.Int64 // nanoseconds of last full loop + lastAutoSyncError atomic.Value // error string or nil + + // tombstone version source when no prior item exists (monotonic per process) + tombVersionCounter atomic.Uint64 + + // tombstone retention / compaction + tombstoneTTL time.Duration + tombstoneSweepInt time.Duration + tombStopCh chan struct{} +} + +// hintedEntry represents a deferred replica write. +type hintedEntry struct { + item *cache.Item + expire time.Time +} + +// tombstone marks a delete intent with version ordering to prevent resurrection. +type tombstone struct { + version uint64 + origin string + at time.Time } // ConsistencyLevel defines read/write consistency semantics. @@ -72,6 +136,16 @@ func WithDistWriteConsistency(l ConsistencyLevel) DistMemoryOption { return func(dm *DistMemory) { dm.writeConsistency = l } } +// WithDistTombstoneTTL configures how long tombstones are retained before subject to compaction (<=0 keeps indefinitely). +func WithDistTombstoneTTL(d time.Duration) DistMemoryOption { + return func(dm *DistMemory) { dm.tombstoneTTL = d } +} + +// WithDistTombstoneSweep sets sweep interval for tombstone compaction (<=0 disables automatic sweeps). +func WithDistTombstoneSweep(interval time.Duration) DistMemoryOption { + return func(dm *DistMemory) { dm.tombstoneSweepInt = interval } +} + // Membership returns current membership reference (read-only usage). func (dm *DistMemory) Membership() *cluster.Membership { return dm.membership } @@ -80,6 +154,7 @@ func (dm *DistMemory) Ring() *cluster.Ring { return dm.ring } type distShard struct { items cache.ConcurrentMap + tombs map[string]tombstone // per-key tombstones } // DistMemoryOption configures DistMemory backend. @@ -94,6 +169,177 @@ func WithDistShardCount(n int) DistMemoryOption { } } +// WithDistMerkleChunkSize sets the number of keys per leaf hash chunk (default 128 if 0). +func WithDistMerkleChunkSize(n int) DistMemoryOption { + return func(dm *DistMemory) { + if n > 0 { + dm.merkleChunkSize = n + } + } +} + +// WithDistMerkleAutoSync enables periodic anti-entropy sync attempts. If interval <= 0 disables. +func WithDistMerkleAutoSync(interval time.Duration) DistMemoryOption { + return func(dm *DistMemory) { + dm.autoSyncInterval = interval + } +} + +// WithDistMerkleAutoSyncPeers limits number of peers synced per interval (0 or <0 = all). +func WithDistMerkleAutoSyncPeers(n int) DistMemoryOption { + return func(dm *DistMemory) { + dm.autoSyncPeersPerInterval = n + } +} + +// WithDistListKeysCap caps number of keys fetched via fallback ListKeys (0 = unlimited). +func WithDistListKeysCap(n int) DistMemoryOption { + return func(dm *DistMemory) { + if n >= 0 { + dm.listKeysMax = n + } + } +} + +// --- Merkle tree anti-entropy structures --- + +// MerkleTree represents a binary hash tree over key/version pairs. +type MerkleTree struct { // minimal representation + LeafHashes [][]byte // ordered leaf hashes + Root []byte + ChunkSize int +} + +// BuildMerkleTree constructs a Merkle tree snapshot of local data (best-effort, locks each shard briefly). +func (dm *DistMemory) BuildMerkleTree() *MerkleTree { //nolint:ireturn + chunkSize := dm.merkleChunkSize + if chunkSize <= 0 { + chunkSize = defaultMerkleChunkSize + } + + entries := dm.merkleEntries() + if len(entries) == 0 { + return &MerkleTree{ChunkSize: chunkSize} + } + + sort.Slice(entries, func(i, j int) bool { return entries[i].k < entries[j].k }) + + hasher := sha256.New() + buf := make([]byte, merkleVersionBytes) + leaves := make([][]byte, 0, (len(entries)+chunkSize-1)/chunkSize) + + for i := 0; i < len(entries); i += chunkSize { + end := i + chunkSize + if end > len(entries) { + end = len(entries) + } + + hasher.Reset() + + for _, e := range entries[i:end] { + _, _ = hasher.Write([]byte(e.k)) + encodeUint64BigEndian(buf, e.v) + + _, _ = hasher.Write(buf) + } + + leaves = append(leaves, append([]byte(nil), hasher.Sum(nil)...)) + } + + root := foldMerkle(leaves, hasher) + + return &MerkleTree{LeafHashes: leaves, Root: append([]byte(nil), root...), ChunkSize: chunkSize} +} + +// merkleKV is an internal pair used during tree construction & sync. +type merkleKV struct { + k string + v uint64 +} + +// DiffLeafRanges compares two trees and returns indexes of differing leaf chunks. +func (mt *MerkleTree) DiffLeafRanges(other *MerkleTree) []int { //nolint:ireturn + if mt == nil || other == nil { + return nil + } + + if len(mt.LeafHashes) != len(other.LeafHashes) { // size mismatch -> full resync + idxs := make([]int, len(mt.LeafHashes)) + for i := range idxs { + idxs[i] = i + } + + return idxs + } + + var diffs []int + for i := range mt.LeafHashes { + if !equalBytes(mt.LeafHashes[i], other.LeafHashes[i]) { + diffs = append(diffs, i) + } + } + + return diffs +} + +func equalBytes(a, b []byte) bool { // tiny helper + if len(a) != len(b) { + return false + } + + for i := range a { + if a[i] != b[i] { + return false + } + } + + return true +} + +// SyncWith performs Merkle anti-entropy against a remote node (pull newer versions for differing chunks). +func (dm *DistMemory) SyncWith(ctx context.Context, nodeID string) error { //nolint:ireturn + if dm.transport == nil { + return errNoTransport + } + + startFetch := time.Now() + + remoteTree, err := dm.transport.FetchMerkle(ctx, nodeID) + if err != nil { + return err + } + + fetchDur := time.Since(startFetch) + atomic.StoreInt64(&dm.metrics.merkleFetchNanos, fetchDur.Nanoseconds()) + + startBuild := time.Now() + localTree := dm.BuildMerkleTree() + buildDur := time.Since(startBuild) + atomic.StoreInt64(&dm.metrics.merkleBuildNanos, buildDur.Nanoseconds()) + + entries := dm.sortedMerkleEntries() + startDiff := time.Now() + diffs := localTree.DiffLeafRanges(remoteTree) + diffDur := time.Since(startDiff) + atomic.StoreInt64(&dm.metrics.merkleDiffNanos, diffDur.Nanoseconds()) + + missing := dm.resolveMissingKeys(ctx, nodeID, entries) + + dm.applyMerkleDiffs(ctx, nodeID, entries, diffs, localTree.ChunkSize) + + for k := range missing { // missing = remote-only keys + dm.fetchAndAdopt(ctx, nodeID, k) + } + + if len(diffs) == 0 && len(missing) == 0 { + return nil + } + + atomic.AddInt64(&dm.metrics.merkleSyncs, 1) + + return nil +} + // WithDistCapacity sets logical capacity (not strictly enforced yet). func WithDistCapacity(capacity int) DistMemoryOption { return func(dm *DistMemory) { dm.capacity = capacity } @@ -115,6 +361,9 @@ func WithDistTransport(t DistTransport) DistMemoryOption { return func(dm *DistMemory) { dm.transport = t } } +// SetTransport sets the transport post-construction (testing helper). +func (dm *DistMemory) SetTransport(t DistTransport) { dm.transport = t } + // WithDistHeartbeat configures heartbeat interval and suspect/dead thresholds. // If interval <= 0 heartbeat is disabled. func WithDistHeartbeat(interval, suspectAfter, deadAfter time.Duration) DistMemoryOption { @@ -143,6 +392,35 @@ func WithDistVirtualNodes(n int) DistMemoryOption { } } +// WithDistHintTTL sets TTL for hinted handoff entries. +func WithDistHintTTL(d time.Duration) DistMemoryOption { + return func(dm *DistMemory) { dm.hintTTL = d } +} + +// WithDistHintReplayInterval sets how often to attempt replay of hints. +func WithDistHintReplayInterval(d time.Duration) DistMemoryOption { + return func(dm *DistMemory) { dm.hintReplayInt = d } +} + +// WithDistHintMaxPerNode caps number of queued hints per target node. +func WithDistHintMaxPerNode(n int) DistMemoryOption { + return func(dm *DistMemory) { + if n > 0 { + dm.hintMaxPerNode = n + } + } +} + +// WithDistParallelReads enables parallel quorum/all read fan-out. +func WithDistParallelReads(enable bool) DistMemoryOption { + return func(dm *DistMemory) { dm.parallelReads = enable } +} + +// WithDistGossipInterval enables simple membership gossip at provided interval. +func WithDistGossipInterval(d time.Duration) DistMemoryOption { + return func(dm *DistMemory) { dm.gossipInterval = d } +} + // WithDistNode identity (id optional; derived from address if empty). Address used for future RPC. func WithDistNode(id, address string) DistMemoryOption { return func(dm *DistMemory) { @@ -177,6 +455,10 @@ func NewDistMemory(ctx context.Context, opts ...DistMemoryOption) (IBackend[Dist dm.initMembershipIfNeeded() dm.tryStartHTTP(ctx) dm.startHeartbeatIfEnabled(ctx) + dm.startHintReplayIfEnabled(ctx) + dm.startGossipIfEnabled() + dm.startAutoSyncIfEnabled(ctx) + dm.startTombstoneSweeper() return dm, nil } @@ -227,6 +509,10 @@ func (dm *DistMemory) Get(ctx context.Context, key string) (*cache.Item, bool) { return dm.getOne(ctx, key, owners) } + if dm.parallelReads { + return dm.getWithConsistencyParallel(ctx, key, owners) + } + return dm.getWithConsistency(ctx, key, owners) } @@ -347,12 +633,32 @@ func (dm *DistMemory) DebugInject(it *cache.Item) { //nolint:ireturn return } - dm.shardFor(it.Key).items.Set(it.Key, it) + sh := dm.shardFor(it.Key) + // test helper: injecting a concrete item implies intent to resurrect; clear any tombstone so normal version comparison applies + delete(sh.tombs, it.Key) + sh.items.Set(it.Key, it) } // LocalNodeID returns this instance's node ID (testing helper). func (dm *DistMemory) LocalNodeID() cluster.NodeID { return dm.localNode.ID } +// LocalNodeAddr returns the configured node address (host:port) used by HTTP server. +func (dm *DistMemory) LocalNodeAddr() string { //nolint:ireturn + return dm.nodeAddr +} + +// SetLocalNode manually sets the local node (testing helper before starting HTTP). +func (dm *DistMemory) SetLocalNode(node *cluster.Node) { //nolint:ireturn + dm.localNode = node + if dm.nodeAddr == "" && node != nil { + dm.nodeAddr = node.Address + } + + if dm.membership != nil && node != nil { + dm.membership.Upsert(node) + } +} + // DebugOwners returns current owners slice for a key (for tests). func (dm *DistMemory) DebugOwners(key string) []cluster.NodeID { if dm.ring == nil { @@ -368,6 +674,7 @@ type DistTransport interface { ForwardGet(ctx context.Context, nodeID string, key string) (*cache.Item, bool, error) ForwardRemove(ctx context.Context, nodeID string, key string, replicate bool) error Health(ctx context.Context, nodeID string) error + FetchMerkle(ctx context.Context, nodeID string) (*MerkleTree, error) } // InProcessTransport implements DistTransport for multiple DistMemory instances in the same process. @@ -436,6 +743,16 @@ func (t *InProcessTransport) Health(_ context.Context, nodeID string) error { // return nil } +// FetchMerkle returns a snapshot Merkle tree of the target backend. +func (t *InProcessTransport) FetchMerkle(_ context.Context, nodeID string) (*MerkleTree, error) { //nolint:ireturn + b, ok := t.backends[nodeID] + if !ok { + return nil, sentinel.ErrBackendNotFound + } + + return b.BuildMerkleTree(), nil +} + // distMetrics holds internal counters (best-effort, not atomic snapshot consistent). type distMetrics struct { forwardGet int64 @@ -451,6 +768,18 @@ type distMetrics struct { versionConflicts int64 // times a newer version (or tie-broken origin) replaced previous candidate versionTieBreaks int64 // subset of conflicts decided by origin tie-break readPrimaryPromote int64 // times read path skipped unreachable primary and promoted next owner + hintedQueued int64 // hints queued + hintedReplayed int64 // hints successfully replayed + hintedExpired int64 // hints expired before delivery + hintedDropped int64 // hints dropped due to non-not-found transport errors + merkleSyncs int64 // merkle sync operations completed + merkleKeysPulled int64 // keys applied during sync + merkleBuildNanos int64 // last build duration (ns) + merkleDiffNanos int64 // last diff duration (ns) + merkleFetchNanos int64 // last remote fetch duration (ns) + autoSyncLoops int64 // number of auto-sync ticks executed + tombstonesActive int64 // approximate active tombstones + tombstonesPurged int64 // cumulative purged tombstones } // DistMetrics snapshot. @@ -468,10 +797,31 @@ type DistMetrics struct { VersionConflicts int64 VersionTieBreaks int64 ReadPrimaryPromote int64 + HintedQueued int64 + HintedReplayed int64 + HintedExpired int64 + HintedDropped int64 + MerkleSyncs int64 + MerkleKeysPulled int64 + MerkleBuildNanos int64 + MerkleDiffNanos int64 + MerkleFetchNanos int64 + AutoSyncLoops int64 + LastAutoSyncNanos int64 + LastAutoSyncError string + TombstonesActive int64 + TombstonesPurged int64 } // Metrics returns a snapshot of distributed metrics. func (dm *DistMemory) Metrics() DistMetrics { + lastErr := "" + if v := dm.lastAutoSyncError.Load(); v != nil { + if s, ok := v.(string); ok { + lastErr = s + } + } + return DistMetrics{ ForwardGet: atomic.LoadInt64(&dm.metrics.forwardGet), ForwardSet: atomic.LoadInt64(&dm.metrics.forwardSet), @@ -486,6 +836,20 @@ func (dm *DistMemory) Metrics() DistMetrics { VersionConflicts: atomic.LoadInt64(&dm.metrics.versionConflicts), VersionTieBreaks: atomic.LoadInt64(&dm.metrics.versionTieBreaks), ReadPrimaryPromote: atomic.LoadInt64(&dm.metrics.readPrimaryPromote), + HintedQueued: atomic.LoadInt64(&dm.metrics.hintedQueued), + HintedReplayed: atomic.LoadInt64(&dm.metrics.hintedReplayed), + HintedExpired: atomic.LoadInt64(&dm.metrics.hintedExpired), + HintedDropped: atomic.LoadInt64(&dm.metrics.hintedDropped), + MerkleSyncs: atomic.LoadInt64(&dm.metrics.merkleSyncs), + MerkleKeysPulled: atomic.LoadInt64(&dm.metrics.merkleKeysPulled), + MerkleBuildNanos: atomic.LoadInt64(&dm.metrics.merkleBuildNanos), + MerkleDiffNanos: atomic.LoadInt64(&dm.metrics.merkleDiffNanos), + MerkleFetchNanos: atomic.LoadInt64(&dm.metrics.merkleFetchNanos), + AutoSyncLoops: atomic.LoadInt64(&dm.metrics.autoSyncLoops), + LastAutoSyncNanos: dm.lastAutoSyncDuration.Load(), + LastAutoSyncError: lastErr, + TombstonesActive: atomic.LoadInt64(&dm.metrics.tombstonesActive), + TombstonesPurged: atomic.LoadInt64(&dm.metrics.tombstonesPurged), } } @@ -495,6 +859,18 @@ func (dm *DistMemory) Stop(ctx context.Context) error { //nolint:ireturn close(dm.stopCh) } + if dm.hintStopCh != nil { + close(dm.hintStopCh) + } + + if dm.gossipStopCh != nil { + close(dm.gossipStopCh) + } + + if dm.autoSyncStopCh != nil { + close(dm.autoSyncStopCh) + } + if dm.httpServer != nil { err := dm.httpServer.stop(ctx) // best-effort if err != nil { @@ -502,9 +878,281 @@ func (dm *DistMemory) Stop(ctx context.Context) error { //nolint:ireturn } } + if dm.tombStopCh != nil { // stop tomb sweeper + close(dm.tombStopCh) + } + return nil } +// --- Sync helper methods (placed after exported methods to satisfy ordering linter) --- + +// sortedMerkleEntries returns merkle entries sorted by key. +func (dm *DistMemory) sortedMerkleEntries() []merkleKV { //nolint:ireturn + entries := dm.merkleEntries() + sort.Slice(entries, func(i, j int) bool { return entries[i].k < entries[j].k }) + + return entries +} + +// resolveMissingKeys enumerates remote-only keys using in-process or HTTP listing. +func (dm *DistMemory) resolveMissingKeys(ctx context.Context, nodeID string, entries []merkleKV) map[string]struct{} { //nolint:ireturn + missing := dm.enumerateRemoteOnlyKeys(nodeID, entries) + if len(missing) != 0 { + return missing + } + + httpT, ok := dm.transport.(*DistHTTPTransport) + if !ok { + return missing + } + + keys, kerr := httpT.ListKeys(ctx, nodeID) + if kerr != nil || len(keys) == 0 { + return missing + } + + if dm.listKeysMax > 0 && len(keys) > dm.listKeysMax { // cap enforcement + keys = keys[:dm.listKeysMax] + } + + mset := make(map[string]struct{}, len(keys)) + for _, k := range keys { // populate + mset[k] = struct{}{} + } + + for _, e := range entries { // remove existing + delete(mset, e.k) + } + + // track number of remote-only keys discovered via fallback + if len(mset) > 0 { + atomic.AddInt64(&dm.metrics.merkleKeysPulled, int64(len(mset))) + } + + return mset +} + +// applyMerkleDiffs fetches and adopts keys for differing Merkle chunks. +func (dm *DistMemory) applyMerkleDiffs(ctx context.Context, nodeID string, entries []merkleKV, diffs []int, chunkSize int) { //nolint:ireturn + for _, ci := range diffs { + start := ci * chunkSize + if start >= len(entries) { + continue + } + + end := start + chunkSize + if end > len(entries) { + end = len(entries) + } + + for _, e := range entries[start:end] { + dm.fetchAndAdopt(ctx, nodeID, e.k) + } + } +} + +// enumerateRemoteOnlyKeys returns keys present only on the remote side (best-effort, in-process only). +func (dm *DistMemory) enumerateRemoteOnlyKeys(nodeID string, local []merkleKV) map[string]struct{} { //nolint:ireturn + missing := make(map[string]struct{}) + + ip, ok := dm.transport.(*InProcessTransport) + if !ok { + return missing + } + + remote, ok := ip.backends[nodeID] + if !ok { + return missing + } + + for _, shard := range remote.shards { + if shard == nil { + continue + } + + rch := shard.items.IterBuffered() + for t := range rch { + missing[t.Key] = struct{}{} + } + } + + for _, e := range local { // remove any that we already have + delete(missing, e.k) + } + + return missing +} + +// fetchAndAdopt pulls a key from a remote node and adopts it if it's newer or absent locally. +func (dm *DistMemory) fetchAndAdopt(ctx context.Context, nodeID, key string) { + it, ok, gerr := dm.transport.ForwardGet(ctx, nodeID, key) + if gerr != nil { // remote failure: ignore + return + } + + sh := dm.shardFor(key) + + if !ok { // remote missing key (could be delete) -> adopt tombstone locally if we still have it + if _, hasTomb := sh.tombs[key]; hasTomb { // already have tombstone + return + } + + if cur, okLocal := sh.items.Get(key); okLocal { // create tombstone advancing version + sh.items.Remove(key) + + nextVer := cur.Version + 1 + + sh.tombs[key] = tombstone{version: nextVer, origin: string(dm.localNode.ID), at: time.Now()} + atomic.StoreInt64(&dm.metrics.tombstonesActive, dm.countTombstones()) + } + + return + } + + if tomb, hasTomb := sh.tombs[key]; hasTomb { + // If tombstone version newer or equal, do not resurrect + if tomb.version >= it.Version { + return + } + // remote has newer version; clear tombstone (key resurrected intentionally) + delete(sh.tombs, key) + atomic.StoreInt64(&dm.metrics.tombstonesActive, dm.countTombstones()) + } + + if cur, okLocal := sh.items.Get(key); !okLocal || it.Version > cur.Version { + dm.applySet(ctx, it, false) + atomic.AddInt64(&dm.metrics.merkleKeysPulled, 1) + } +} + +// merkleEntries gathers key/version pairs from all shards. +func (dm *DistMemory) merkleEntries() []merkleKV { + entries := make([]merkleKV, 0, merklePreallocEntries) + + for _, shard := range dm.shards { + if shard == nil { + continue + } + + ch := shard.items.IterBuffered() + for t := range ch { + entries = append(entries, merkleKV{k: t.Key, v: t.Val.Version}) + } + + for k, ts := range shard.tombs { // include tombstones + entries = append(entries, merkleKV{k: k, v: ts.version}) + } + } + + return entries +} + +// startTombstoneSweeper launches periodic compaction if configured. +func (dm *DistMemory) startTombstoneSweeper() { //nolint:ireturn + if dm.tombstoneTTL <= 0 || dm.tombstoneSweepInt <= 0 { + return + } + + dm.tombStopCh = make(chan struct{}) + go func() { + ticker := time.NewTicker(dm.tombstoneSweepInt) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + purged := dm.compactTombstones() + if purged > 0 { + atomic.AddInt64(&dm.metrics.tombstonesPurged, purged) + } + + atomic.StoreInt64(&dm.metrics.tombstonesActive, dm.countTombstones()) + + case <-dm.tombStopCh: + return + } + } + }() +} + +// compactTombstones removes expired tombstones based on TTL, returns count purged. +func (dm *DistMemory) compactTombstones() int64 { //nolint:ireturn + if dm.tombstoneTTL <= 0 { + return 0 + } + + now := time.Now() + + var purged int64 + for _, sh := range dm.shards { + if sh == nil { + continue + } + + for k, ts := range sh.tombs { + if now.Sub(ts.at) >= dm.tombstoneTTL { + delete(sh.tombs, k) + + purged++ + } + } + } + + return purged +} + +// countTombstones returns approximate current count. +func (dm *DistMemory) countTombstones() int64 { //nolint:ireturn + var total int64 + for _, sh := range dm.shards { + if sh == nil { + continue + } + + total += int64(len(sh.tombs)) + } + + return total +} + +func encodeUint64BigEndian(buf []byte, v uint64) { + for i := merkleVersionBytes - 1; i >= 0; i-- { // big endian for deterministic hashing + buf[i] = byte(v) + + v >>= shiftPerByte + } +} + +// foldMerkle reduces leaf hashes into a single root using a binary tree. +func foldMerkle(leaves [][]byte, hasher hash.Hash) []byte { //nolint:ireturn + if len(leaves) == 0 { + return nil + } + + level := leaves + for len(level) > 1 { + next := make([][]byte, 0, (len(level)+1)/2) + for i := 0; i < len(level); i += 2 { + if i+1 == len(level) { // odd node promoted + next = append(next, append([]byte(nil), level[i]...)) + + break + } + + hasher.Reset() + + _, _ = hasher.Write(level[i]) + _, _ = hasher.Write(level[i+1]) + next = append(next, append([]byte(nil), hasher.Sum(nil)...)) + } + + level = next + } + + return level[0] +} + // ensureShardConfig initializes shards respecting configured shardCount. func (dm *DistMemory) ensureShardConfig() { //nolint:ireturn if dm.shardCount <= 0 { @@ -512,7 +1160,7 @@ func (dm *DistMemory) ensureShardConfig() { //nolint:ireturn } for range dm.shardCount { - dm.shards = append(dm.shards, &distShard{items: cache.New()}) + dm.shards = append(dm.shards, &distShard{items: cache.New(), tombs: make(map[string]tombstone)}) } } @@ -677,24 +1325,77 @@ func (dm *DistMemory) getWithConsistency(ctx context.Context, key string, owners var chosen *cache.Item + // gather results sequentially until quorum reached, tracking stale owners + staleOwners := dm.collectQuorum(ctx, key, owners, needed, &chosen, &acks) + if acks < needed || chosen == nil { + return nil, false + } + + dm.repairStaleOwners(ctx, key, chosen, staleOwners) + dm.repairReplicas(ctx, key, chosen, owners) + + return chosen, true +} + +// collectQuorum iterates owners, updates chosen item and acks, returns owners needing repair. +func (dm *DistMemory) collectQuorum( + ctx context.Context, + key string, + owners []cluster.NodeID, + needed int, + chosen **cache.Item, + acks *int, +) []cluster.NodeID { //nolint:ireturn + stale := make([]cluster.NodeID, 0, len(owners)) for idx, oid := range owners { it, ok := dm.fetchOwner(ctx, key, idx, oid) if !ok { continue } - chosen = dm.chooseNewer(chosen, it) - acks++ + prev := *chosen + + *chosen = dm.chooseNewer(*chosen, it) + *acks++ + + if prev != nil && *chosen != prev { + stale = append(stale, oid) + } + + if *acks >= needed && *chosen != nil { // early break + break + } } - if acks < needed || chosen == nil { - return nil, false + return stale +} + +// repairStaleOwners issues best-effort targeted repairs for owners identified as stale. +func (dm *DistMemory) repairStaleOwners( + ctx context.Context, + key string, + chosen *cache.Item, + staleOwners []cluster.NodeID, +) { //nolint:ireturn + if dm.transport == nil || chosen == nil { + return } - // version-based read repair across all owners if stale/missing - dm.repairReplicas(ctx, key, chosen, owners) + for _, oid := range staleOwners { + if oid == dm.localNode.ID { // local handled in repairReplicas + continue + } - return chosen, true + it, ok, err := dm.transport.ForwardGet(ctx, string(oid), key) + if err != nil { // skip unreachable + continue + } + + if !ok || it.Version < chosen.Version || (it.Version == chosen.Version && it.Origin > chosen.Origin) { + _ = dm.transport.ForwardSet(ctx, string(oid), chosen, false) //nolint:errcheck + atomic.AddInt64(&dm.metrics.readRepair, 1) + } + } } // fetchOwner attempts to fetch item from given owner (local or remote) updating metrics. @@ -730,20 +1431,364 @@ func (dm *DistMemory) fetchOwner(ctx context.Context, key string, idx int, oid c // replicateTo sends writes to replicas (best-effort) returning ack count. func (dm *DistMemory) replicateTo(ctx context.Context, item *cache.Item, replicas []cluster.NodeID) int { //nolint:ireturn acks := 0 - for _, oid := range replicas { if oid == dm.localNode.ID { continue } - if dm.transport != nil && dm.transport.ForwardSet(ctx, string(oid), item, false) == nil { - acks++ + if dm.transport != nil { + err := dm.transport.ForwardSet(ctx, string(oid), item, false) + if err == nil { + acks++ + + continue + } + + if errors.Is(err, sentinel.ErrBackendNotFound) { // queue hint for unreachable replica + dm.queueHint(string(oid), item) + } } } return acks } +// getWithConsistencyParallel performs parallel owner fan-out until quorum/all reached. +func (dm *DistMemory) getWithConsistencyParallel(ctx context.Context, key string, owners []cluster.NodeID) (*cache.Item, bool) { //nolint:ireturn + needed := dm.requiredAcks(len(owners), dm.readConsistency) + + type res struct { + owner cluster.NodeID + it *cache.Item + ok bool + } + + ch := make(chan res, len(owners)) + + ctxFetch, cancel := context.WithCancel(ctx) + defer cancel() + + for idx, oid := range owners { // launch all with proper capture (Go <1.22 style) + idxLocal, oidLocal := idx, oid + go func(i int, o cluster.NodeID) { + it, ok := dm.fetchOwner(ctxFetch, key, i, o) + ch <- res{owner: o, it: it, ok: ok} + }(idxLocal, oidLocal) + } + + acks := 0 + + var chosen *cache.Item + + staleOwners := make([]cluster.NodeID, 0, len(owners)) + + for range owners { + resp := <-ch + if !resp.ok { + continue + } + + prev := chosen + + chosen = dm.chooseNewer(chosen, resp.it) + acks++ + + if prev != nil && chosen != prev { // newer version found, mark new owner for targeted check (mirrors sequential path semantics) + staleOwners = append(staleOwners, resp.owner) + } + + if acks >= needed && chosen != nil { // early satisfied + cancel() + + break + } + } + + if acks < needed || chosen == nil { + return nil, false + } + + // targeted repairs for owners involved in version advancement (best-effort) + dm.repairStaleOwners(ctx, key, chosen, staleOwners) + + // full repair across all owners to ensure convergence + dm.repairReplicas(ctx, key, chosen, owners) + + return chosen, true +} + +// --- Hinted handoff implementation ---. +func (dm *DistMemory) queueHint(nodeID string, item *cache.Item) { + if dm.hintTTL <= 0 { // disabled + return + } + + dm.hintsMu.Lock() + + if dm.hints == nil { + dm.hints = make(map[string][]hintedEntry) + } + + queueHints := dm.hints[nodeID] + if dm.hintMaxPerNode > 0 && len(queueHints) >= dm.hintMaxPerNode { // drop oldest + queueHints = queueHints[1:] + } + + cloned := *item + + queueHints = append(queueHints, hintedEntry{item: &cloned, expire: time.Now().Add(dm.hintTTL)}) + dm.hints[nodeID] = queueHints + dm.hintsMu.Unlock() + atomic.AddInt64(&dm.metrics.hintedQueued, 1) +} + +func (dm *DistMemory) startHintReplayIfEnabled(ctx context.Context) { + if dm.hintReplayInt <= 0 || dm.hintTTL <= 0 { + return + } + + dm.hintStopCh = make(chan struct{}) + go dm.hintReplayLoop(ctx) +} + +func (dm *DistMemory) hintReplayLoop(ctx context.Context) { //nolint:ireturn + ticker := time.NewTicker(dm.hintReplayInt) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + dm.replayHints(ctx) + case <-dm.hintStopCh: + return + case <-ctx.Done(): + return + } + } +} + +func (dm *DistMemory) replayHints(ctx context.Context) { //nolint:ireturn + if dm.transport == nil { + return + } + + now := time.Now() + + dm.hintsMu.Lock() + + for nodeID, q := range dm.hints { + out := q[:0] + for _, hint := range q { + if now.After(hint.expire) { // expired + atomic.AddInt64(&dm.metrics.hintedExpired, 1) + + continue + } + + err := dm.transport.ForwardSet(ctx, nodeID, hint.item, false) // best-effort + if err == nil { // success + atomic.AddInt64(&dm.metrics.hintedReplayed, 1) + + continue + } + + if errors.Is(err, sentinel.ErrBackendNotFound) { // keep + out = append(out, hint) + + continue + } + + atomic.AddInt64(&dm.metrics.hintedDropped, 1) + } + + if len(out) == 0 { + delete(dm.hints, nodeID) + } else { + dm.hints[nodeID] = out + } + } + + dm.hintsMu.Unlock() +} + +// --- Simple gossip (in-process only) ---. +func (dm *DistMemory) startGossipIfEnabled() { //nolint:ireturn + if dm.gossipInterval <= 0 { + return + } + + dm.gossipStopCh = make(chan struct{}) + go dm.gossipLoop() +} + +// startAutoSyncIfEnabled launches periodic merkle syncs to all other members. +func (dm *DistMemory) startAutoSyncIfEnabled(ctx context.Context) { //nolint:ireturn + if dm.autoSyncInterval <= 0 || dm.membership == nil { + return + } + + if dm.autoSyncStopCh != nil { // already running + return + } + + dm.autoSyncStopCh = make(chan struct{}) + + interval := dm.autoSyncInterval + go dm.autoSyncLoop(ctx, interval) +} + +func (dm *DistMemory) autoSyncLoop(ctx context.Context, interval time.Duration) { //nolint:ireturn + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-dm.autoSyncStopCh: + return + case <-ticker.C: + dm.runAutoSyncTick(ctx) + } + } +} + +// runAutoSyncTick performs one auto-sync cycle; separated for lower complexity. +func (dm *DistMemory) runAutoSyncTick(ctx context.Context) { //nolint:ireturn + start := time.Now() + + var lastErr error + + members := dm.membership.List() + limit := dm.autoSyncPeersPerInterval + synced := 0 + + for _, member := range members { + if member == nil || string(member.ID) == dm.nodeID { // skip self + continue + } + + if limit > 0 && synced >= limit { + break + } + + err := dm.SyncWith(ctx, string(member.ID)) + if err != nil { // capture last error only + lastErr = err + } + + synced++ + } + + dm.lastAutoSyncDuration.Store(time.Since(start).Nanoseconds()) + + if lastErr != nil { + dm.lastAutoSyncError.Store(lastErr.Error()) + } else { + dm.lastAutoSyncError.Store("") + } + + atomic.AddInt64(&dm.metrics.autoSyncLoops, 1) +} + +func (dm *DistMemory) gossipLoop() { //nolint:ireturn + ticker := time.NewTicker(dm.gossipInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + dm.runGossipTick() + case <-dm.gossipStopCh: + return + } + } +} + +func (dm *DistMemory) runGossipTick() { //nolint:ireturn + if dm.membership == nil || dm.transport == nil { + return + } + + peers := dm.membership.List() + if len(peers) <= 1 { + return + } + + var candidates []*cluster.Node + for _, n := range peers { + if n.ID != dm.localNode.ID { + candidates = append(candidates, n) + } + } + + if len(candidates) == 0 { + return + } + + // secure random selection (not strictly required but avoids G404 warning) + n := len(candidates) + + idxBig, err := rand.Int(rand.Reader, big.NewInt(int64(n))) // #nosec G404 - cryptographic randomness acceptable here + if err != nil { + return + } + + target := candidates[idxBig.Int64()] + + ip, ok := dm.transport.(*InProcessTransport) + if !ok { + return + } + + remote, ok2 := ip.backends[string(target.ID)] + if !ok2 { + return + } + + snapshot := dm.membership.List() + remote.acceptGossip(snapshot) +} + +func (dm *DistMemory) acceptGossip(nodes []*cluster.Node) { //nolint:ireturn + if dm.membership == nil { + return + } + + for _, node := range nodes { + if node.ID == dm.localNode.ID { + continue + } + + existing := false + for _, cur := range dm.membership.List() { + if cur.ID == node.ID { + existing = true + + if node.Incarnation > cur.Incarnation { + dm.membership.Upsert(&cluster.Node{ + ID: node.ID, + Address: node.Address, + State: node.State, + Incarnation: node.Incarnation, + LastSeen: time.Now(), + }) + } + + break + } + } + + if !existing { + dm.membership.Upsert(&cluster.Node{ + ID: node.ID, + Address: node.Address, + State: node.State, + Incarnation: node.Incarnation, + LastSeen: time.Now(), + }) + } + } +} + // chooseNewer picks the item with higher version; on version tie uses lexicographically smaller Origin as winner. func (dm *DistMemory) chooseNewer(itemA, itemB *cache.Item) *cache.Item { //nolint:ireturn if itemA == nil { @@ -967,7 +2012,23 @@ func (dm *DistMemory) applySet(ctx context.Context, item *cache.Item, replicate // applyRemove deletes locally and optionally fan-outs removal to replicas. func (dm *DistMemory) applyRemove(ctx context.Context, key string, replicate bool) { - dm.shardFor(key).items.Remove(key) + sh := dm.shardFor(key) + // capture version from existing item (if any) and increment for tombstone + var nextVer uint64 + if it, ok := sh.items.Get(key); ok && it != nil { + ci := it // already *cache.Item (ConcurrentMap stores *cache.Item) + + nextVer = ci.Version + 1 + } + + if nextVer == 0 { // no prior item seen; allocate monotonic tomb version + nextVer = dm.tombVersionCounter.Add(1) + } + + sh.items.Remove(key) + + sh.tombs[key] = tombstone{version: nextVer, origin: string(dm.localNode.ID), at: time.Now()} + atomic.StoreInt64(&dm.metrics.tombstonesActive, dm.countTombstones()) if !replicate || dm.ring == nil || dm.transport == nil { return @@ -985,7 +2046,7 @@ func (dm *DistMemory) applyRemove(ctx context.Context, key string, replicate boo continue } - _ = dm.transport.ForwardRemove(ctx, string(oid), key, false) //nolint:errcheck // best-effort + _ = dm.transport.ForwardRemove(ctx, string(oid), key, false) //nolint:errcheck // best-effort (tombstone inferred remotely) } } diff --git a/tests/hypercache_distmemory_integration_test.go b/tests/hypercache_distmemory_integration_test.go index 54e594c..abf5730 100644 --- a/tests/hypercache_distmemory_integration_test.go +++ b/tests/hypercache_distmemory_integration_test.go @@ -55,6 +55,7 @@ func TestDistMemoryForwardingReplication(t *testing.T) { } item := &cachev2.Item{Key: k, Value: k} + err := item.Valid() if err != nil { t.Fatalf("item valid %s: %v", k, err) diff --git a/tests/hypercache_distmemory_remove_readrepair_test.go b/tests/hypercache_distmemory_remove_readrepair_test.go index b536834..e998b84 100644 --- a/tests/hypercache_distmemory_remove_readrepair_test.go +++ b/tests/hypercache_distmemory_remove_readrepair_test.go @@ -47,6 +47,7 @@ func TestDistMemoryRemoveReplication(t *testing.T) { } item := &cachev2.Item{Key: key, Value: "val"} + err := item.Valid() if err != nil { t.Fatalf("valid: %v", err) @@ -104,6 +105,7 @@ func TestDistMemoryReadRepair(t *testing.T) { } item := &cachev2.Item{Key: key, Value: "val"} + err := item.Valid() if err != nil { t.Fatalf("valid: %v", err) diff --git a/tests/hypercache_distmemory_stale_quorum_test.go b/tests/hypercache_distmemory_stale_quorum_test.go new file mode 100644 index 0000000..8a82a6f --- /dev/null +++ b/tests/hypercache_distmemory_stale_quorum_test.go @@ -0,0 +1,102 @@ +package tests + +import ( + "context" + "testing" + "time" + + "github.com/hyp3rd/hypercache/internal/cluster" + "github.com/hyp3rd/hypercache/pkg/backend" + cachev2 "github.com/hyp3rd/hypercache/pkg/cache/v2" +) + +// TestDistMemoryStaleQuorum ensures quorum read returns newest version and repairs stale replicas. +func TestDistMemoryStaleQuorum(t *testing.T) { + ring := cluster.NewRing(cluster.WithReplication(3)) + membership := cluster.NewMembership(ring) + transport := backend.NewInProcessTransport() + + n1 := cluster.NewNode("", "n1:0") + n2 := cluster.NewNode("", "n2:0") + n3 := cluster.NewNode("", "n3:0") + + b1i, _ := backend.NewDistMemory(context.TODO(), backend.WithDistMembership(membership, n1), backend.WithDistTransport(transport), backend.WithDistReadConsistency(backend.ConsistencyQuorum)) + b2i, _ := backend.NewDistMemory(context.TODO(), backend.WithDistMembership(membership, n2), backend.WithDistTransport(transport), backend.WithDistReadConsistency(backend.ConsistencyQuorum)) + b3i, _ := backend.NewDistMemory(context.TODO(), backend.WithDistMembership(membership, n3), backend.WithDistTransport(transport), backend.WithDistReadConsistency(backend.ConsistencyQuorum)) + + b1 := b1i.(*backend.DistMemory) //nolint:forcetypeassert + b2 := b2i.(*backend.DistMemory) //nolint:forcetypeassert + b3 := b3i.(*backend.DistMemory) //nolint:forcetypeassert + + transport.Register(b1) + transport.Register(b2) + transport.Register(b3) + + key := "sq-key" + + owners := ring.Lookup(key) + if len(owners) != 3 { + t.Skip("replication factor !=3") + } + + // Write initial version via primary + primary := owners[0] + item := &cachev2.Item{Key: key, Value: "v1"} + + _ = item.Valid() + if primary == b1.LocalNodeID() { + _ = b1.Set(context.Background(), item) + } else if primary == b2.LocalNodeID() { + _ = b2.Set(context.Background(), item) + } else { + _ = b3.Set(context.Background(), item) + } + + // Manually bump version on one replica to simulate a newer write that others missed + // Pick owners[1] as ahead replica + aheadID := owners[1] + ahead := map[cluster.NodeID]*backend.DistMemory{b1.LocalNodeID(): b1, b2.LocalNodeID(): b2, b3.LocalNodeID(): b3}[aheadID] + ahead.DebugInject(&cachev2.Item{Key: key, Value: "v2", Version: 5, Origin: string(ahead.LocalNodeID()), LastUpdated: time.Now()}) + + // Drop local copy on owners[2] to simulate stale/missing + lagID := owners[2] + lag := map[cluster.NodeID]*backend.DistMemory{b1.LocalNodeID(): b1, b2.LocalNodeID(): b2, b3.LocalNodeID(): b3}[lagID] + lag.DebugDropLocal(key) + + // Issue quorum read from a non-ahead node (choose primary if not ahead, else third) + requester := b1 + if requester.LocalNodeID() == aheadID { + requester = b2 + } + + if requester.LocalNodeID() == aheadID { + requester = b3 + } + + got, ok := requester.Get(context.Background(), key) + if !ok { + t.Fatalf("quorum get failed") + } + // Value stored as interface{} may be string (not []byte) in this test + if sval, okCast := got.Value.(string); !okCast || sval != "v2" { + t.Fatalf("expected quorum to return newer version v2, got=%v (type %T)", got.Value, got.Value) + } + + // Allow brief repair propagation + time.Sleep(50 * time.Millisecond) + + // All owners should now have v2 (version 5) + for _, oid := range owners { + inst := map[cluster.NodeID]*backend.DistMemory{b1.LocalNodeID(): b1, b2.LocalNodeID(): b2, b3.LocalNodeID(): b3}[oid] + + it, ok2 := inst.Get(context.Background(), key) + if !ok2 || it.Version != 5 { + t.Fatalf("owner %s not repaired to v2 (v5) -> (%v,%v)", oid, ok2, it) + } + } + + // ReadRepair metric should have incremented somewhere + if b1.Metrics().ReadRepair+b2.Metrics().ReadRepair+b3.Metrics().ReadRepair == 0 { + t.Fatalf("expected read repair metric >0") + } +} diff --git a/tests/hypercache_distmemory_versioning_test.go b/tests/hypercache_distmemory_versioning_test.go index eca4875..49f08f2 100644 --- a/tests/hypercache_distmemory_versioning_test.go +++ b/tests/hypercache_distmemory_versioning_test.go @@ -65,6 +65,7 @@ func TestDistMemoryVersioningQuorum(t *testing.T) { //nolint:paralleltest // Write key via primary. item1 := &cachev2.Item{Key: key, Value: "v1"} + err := b1.Set(context.Background(), item1) if err != nil { t.Fatalf("initial set: %v", err) @@ -94,6 +95,7 @@ func TestDistMemoryVersioningQuorum(t *testing.T) { //nolint:paralleltest transport.Unregister(string(n3.ID)) item2 := &cachev2.Item{Key: key, Value: "v2"} + err = b1.Set(context.Background(), item2) if err != nil && !errors.Is(err, sentinel.ErrQuorumFailed) { t.Fatalf("unexpected error after replica loss: %v", err) diff --git a/tests/hypercache_http_merkle_test.go b/tests/hypercache_http_merkle_test.go new file mode 100644 index 0000000..a259f3c --- /dev/null +++ b/tests/hypercache_http_merkle_test.go @@ -0,0 +1,93 @@ +package tests + +import ( + "context" + "net/http" + "testing" + "time" + + "github.com/hyp3rd/hypercache/internal/cluster" + "github.com/hyp3rd/hypercache/pkg/backend" + cachev2 "github.com/hyp3rd/hypercache/pkg/cache/v2" +) + +// TestHTTPFetchMerkle ensures HTTP transport can fetch a remote Merkle tree and SyncWith works over HTTP. +func TestHTTPFetchMerkle(t *testing.T) { + ctx := context.Background() + + // shared ring/membership + ring := cluster.NewRing(cluster.WithReplication(1)) + membership := cluster.NewMembership(ring) + + // create two nodes with HTTP server enabled (addresses) + n1 := cluster.NewNode("", "127.0.0.1:9201") + + b1i, err := backend.NewDistMemory(ctx, + backend.WithDistMembership(membership, n1), + backend.WithDistNode("n1", "127.0.0.1:9201"), + backend.WithDistMerkleChunkSize(2), + ) + if err != nil { + t.Fatalf("b1: %v", err) + } + + n2 := cluster.NewNode("", "127.0.0.1:9202") + + b2i, err := backend.NewDistMemory(ctx, + backend.WithDistMembership(membership, n2), + backend.WithDistNode("n2", "127.0.0.1:9202"), + backend.WithDistMerkleChunkSize(2), + ) + if err != nil { + t.Fatalf("b2: %v", err) + } + + b1 := b1i.(*backend.DistMemory) //nolint:forcetypeassert + b2 := b2i.(*backend.DistMemory) //nolint:forcetypeassert + + // HTTP transport resolver maps node IDs to http base URLs. + resolver := func(id string) (string, bool) { + switch id { // node IDs same as provided + case "n1": + return "http://" + b1.LocalNodeAddr(), true + case "n2": + return "http://" + b2.LocalNodeAddr(), true + } + + return "", false + } + transport := backend.NewDistHTTPTransport(2*time.Second, resolver) + b1.SetTransport(transport) + b2.SetTransport(transport) + + // ensure membership has both before writes (already upserted in constructors) + // write some keys to b1 only + for i := range 5 { // direct inject to sidestep replication/forwarding complexity + item := &cachev2.Item{Key: httpKey(i), Value: []byte("v"), Version: uint64(i + 1), Origin: "n1", LastUpdated: time.Now()} + b1.DebugInject(item) + } + // ensure HTTP merkle endpoint reachable + resp, err := http.Get("http://" + b1.LocalNodeAddr() + "/internal/merkle") + if err != nil { + t.Fatalf("merkle http get: %v", err) + } + + _ = resp.Body.Close() + if resp.StatusCode != http.StatusOK { + t.Fatalf("unexpected status %d", resp.StatusCode) + } + + // b2 sync from b1 via HTTP transport + if err := b2.SyncWith(ctx, "n1"); err != nil { + t.Fatalf("sync: %v", err) + } + + // validate keys present on b2 + for i := range 5 { + if _, ok := b2.Get(ctx, httpKey(i)); !ok { + t.Fatalf("missing key %d post-sync", i) + } + } +} + +func httpKey(i int) string { return "hkey:" + string(rune('a'+i)) } diff --git a/tests/merkle_delete_tombstone_test.go b/tests/merkle_delete_tombstone_test.go new file mode 100644 index 0000000..d9707a6 --- /dev/null +++ b/tests/merkle_delete_tombstone_test.go @@ -0,0 +1,67 @@ +package tests + +import ( + "context" + "testing" + "time" + + backend "github.com/hyp3rd/hypercache/pkg/backend" + cache "github.com/hyp3rd/hypercache/pkg/cache/v2" +) + +// TestMerkleDeleteTombstone ensures a deleted key does not resurrect via sync. +func TestMerkleDeleteTombstone(t *testing.T) { + ctx := context.Background() + transport := backend.NewInProcessTransport() + + a, _ := backend.NewDistMemory(ctx, backend.WithDistNode("A", "127.0.0.1:9501"), backend.WithDistReplication(1), backend.WithDistMerkleChunkSize(2)) + b, _ := backend.NewDistMemory(ctx, backend.WithDistNode("B", "127.0.0.1:9502"), backend.WithDistReplication(1), backend.WithDistMerkleChunkSize(2)) + + da := any(a).(*backend.DistMemory) + db := any(b).(*backend.DistMemory) + + da.SetTransport(transport) + db.SetTransport(transport) + transport.Register(da) + transport.Register(db) + + it := &cache.Item{Key: "del", Value: []byte("v"), Version: 1, Origin: "A", LastUpdated: time.Now()} + da.DebugInject(it) + err := db.SyncWith(ctx, string(da.LocalNodeID())) + if err != nil { + t.Fatalf("initial sync: %v", err) + } + + // Now delete on A + _ = da.Remove(ctx, "del") + + // Ensure local A removed + if val, _ := da.Get(ctx, "del"); val != nil { + t.Fatalf("expected A delete") + } + + // Remote (B) pulls from A to learn about deletion (pull-based anti-entropy) + err = db.SyncWith(ctx, string(da.LocalNodeID())) + if err != nil { + t.Fatalf("tomb sync pull: %v", err) + } + + // Ensure B removed or will not resurrect on next sync + if val, _ := db.Get(ctx, "del"); val != nil { + t.Fatalf("expected B delete after tombstone") + } + + // Re-add older version on B (simulate stale write) + stale := &cache.Item{Key: "del", Value: []byte("stale"), Version: 1, Origin: "B", LastUpdated: time.Now()} + db.DebugInject(stale) + + // Sync B with A again; B should keep deletion (not resurrect) + err = db.SyncWith(ctx, string(da.LocalNodeID())) + if err != nil { + t.Fatalf("resync: %v", err) + } + + if val, _ := db.Get(ctx, "del"); val != nil { + t.Fatalf("tombstone failed; key resurrected") + } +} diff --git a/tests/merkle_empty_tree_test.go b/tests/merkle_empty_tree_test.go new file mode 100644 index 0000000..3ac3aab --- /dev/null +++ b/tests/merkle_empty_tree_test.go @@ -0,0 +1,34 @@ +package tests + +import ( + "context" + "testing" + + backend "github.com/hyp3rd/hypercache/pkg/backend" +) + +// TestMerkleEmptyTrees ensures diff between two empty trees is empty and SyncWith is no-op. +func TestMerkleEmptyTrees(t *testing.T) { + ctx := context.Background() + transport := backend.NewInProcessTransport() + + a, _ := backend.NewDistMemory(ctx, backend.WithDistNode("A", "127.0.0.1:9201"), backend.WithDistReplication(1), backend.WithDistMerkleChunkSize(2)) + b, _ := backend.NewDistMemory(ctx, backend.WithDistNode("B", "127.0.0.1:9202"), backend.WithDistReplication(1), backend.WithDistMerkleChunkSize(2)) + + da := any(a).(*backend.DistMemory) + db := any(b).(*backend.DistMemory) + + da.SetTransport(transport) + db.SetTransport(transport) + transport.Register(da) + transport.Register(db) + + err := da.SyncWith(ctx, string(db.LocalNodeID())) + if err != nil { + t.Fatalf("sync empty: %v", err) + } + err = db.SyncWith(ctx, string(da.LocalNodeID())) + if err != nil { + t.Fatalf("sync empty reverse: %v", err) + } +} diff --git a/tests/merkle_no_diff_test.go b/tests/merkle_no_diff_test.go new file mode 100644 index 0000000..2b839aa --- /dev/null +++ b/tests/merkle_no_diff_test.go @@ -0,0 +1,45 @@ +package tests + +import ( + "context" + "testing" + "time" + + backend "github.com/hyp3rd/hypercache/pkg/backend" + cache "github.com/hyp3rd/hypercache/pkg/cache/v2" +) + +// TestMerkleNoDiff ensures SyncWith returns quickly when trees are identical. +func TestMerkleNoDiff(t *testing.T) { + ctx := context.Background() + transport := backend.NewInProcessTransport() + + a, _ := backend.NewDistMemory(ctx, backend.WithDistNode("A", "127.0.0.1:9401"), backend.WithDistReplication(1), backend.WithDistMerkleChunkSize(4)) + b, _ := backend.NewDistMemory(ctx, backend.WithDistNode("B", "127.0.0.1:9402"), backend.WithDistReplication(1), backend.WithDistMerkleChunkSize(4)) + + da := any(a).(*backend.DistMemory) + db := any(b).(*backend.DistMemory) + + da.SetTransport(transport) + db.SetTransport(transport) + transport.Register(da) + transport.Register(db) + + // Inject identical data + for i := range 10 { + itA := &cache.Item{Key: keyf("nd", i), Value: []byte("v"), Version: uint64(i + 1), Origin: "A", LastUpdated: time.Now()} + itB := &cache.Item{Key: keyf("nd", i), Value: []byte("v"), Version: uint64(i + 1), Origin: "B", LastUpdated: time.Now()} + + da.DebugInject(itA) + db.DebugInject(itB) + } + + err := da.SyncWith(ctx, string(db.LocalNodeID())) + if err != nil { + t.Fatalf("sync: %v", err) + } + err = db.SyncWith(ctx, string(da.LocalNodeID())) + if err != nil { + t.Fatalf("sync2: %v", err) + } +} diff --git a/tests/merkle_single_missing_key_test.go b/tests/merkle_single_missing_key_test.go new file mode 100644 index 0000000..7d3e2ba --- /dev/null +++ b/tests/merkle_single_missing_key_test.go @@ -0,0 +1,45 @@ +package tests + +import ( + "context" + "testing" + "time" + + backend "github.com/hyp3rd/hypercache/pkg/backend" + cache "github.com/hyp3rd/hypercache/pkg/cache/v2" +) + +// TestMerkleSingleMissingKey ensures a single remote-only key is detected and pulled. +func TestMerkleSingleMissingKey(t *testing.T) { + ctx := context.Background() + transport := backend.NewInProcessTransport() + + a, _ := backend.NewDistMemory(ctx, backend.WithDistNode("A", "127.0.0.1:9301"), backend.WithDistReplication(1), backend.WithDistMerkleChunkSize(2)) + b, _ := backend.NewDistMemory(ctx, backend.WithDistNode("B", "127.0.0.1:9302"), backend.WithDistReplication(1), backend.WithDistMerkleChunkSize(2)) + + da := any(a).(*backend.DistMemory) + db := any(b).(*backend.DistMemory) + + da.SetTransport(transport) + db.SetTransport(transport) + transport.Register(da) + transport.Register(db) + + // Inject one key only into A + it := &cache.Item{Key: "k1", Value: []byte("v1"), Version: 1, Origin: "A", LastUpdated: time.Now()} + da.DebugInject(it) + + err := db.SyncWith(ctx, string(da.LocalNodeID())) + if err != nil { + t.Fatalf("sync single: %v", err) + } + + got, _ := db.Get(ctx, "k1") + if got == nil { + t.Fatalf("expected key pulled") + } + + if bs, ok := got.Value.([]byte); !ok || string(bs) != "v1" { + t.Fatalf("unexpected value %v", got.Value) + } +} diff --git a/tests/merkle_sync_test.go b/tests/merkle_sync_test.go new file mode 100644 index 0000000..71020d1 --- /dev/null +++ b/tests/merkle_sync_test.go @@ -0,0 +1,82 @@ +package tests + +import ( + "context" + "testing" + "time" + + backend "github.com/hyp3rd/hypercache/pkg/backend" + cache "github.com/hyp3rd/hypercache/pkg/cache/v2" +) + +// TestMerkleSyncConvergence ensures SyncWith pulls newer keys from remote. +func TestMerkleSyncConvergence(t *testing.T) { + ctx := context.Background() + transport := backend.NewInProcessTransport() + + bA, err := backend.NewDistMemory(ctx, + backend.WithDistNode("A", "127.0.0.1:9101"), + backend.WithDistReplication(1), + backend.WithDistMerkleChunkSize(2), + ) + if err != nil { + t.Fatalf("new dist memory A: %v", err) + } + + dmA := any(bA).(*backend.DistMemory) + + bB, err := backend.NewDistMemory(ctx, + backend.WithDistNode("B", "127.0.0.1:9102"), + backend.WithDistReplication(1), + backend.WithDistMerkleChunkSize(2), + ) + if err != nil { + t.Fatalf("new dist memory B: %v", err) + } + + dmB := any(bB).(*backend.DistMemory) + + dmA.SetTransport(transport) + dmB.SetTransport(transport) + + // register for in-process lookups + transport.Register(dmA) + transport.Register(dmB) + + // inject divergent data (A has extra/newer) + for i := range 5 { + it := &cache.Item{Key: keyf("k", i), Value: []byte("vA"), Version: uint64(i + 1), Origin: "A", LastUpdated: time.Now()} + dmA.DebugInject(it) + } + + // B shares only first 2 keys older versions + for i := range 2 { + it := &cache.Item{Key: keyf("k", i), Value: []byte("old"), Version: uint64(i), Origin: "B", LastUpdated: time.Now()} + dmB.DebugInject(it) + } + + // Run sync B->A to pull newer + if err := dmB.SyncWith(ctx, string(dmA.LocalNodeID())); err != nil { + // HTTP transport fetch merkle unsupported; we rely on in-process + if testing.Verbose() { + t.Logf("sync error: %v", err) + } + } + + // Validate B now has all 5 keys with correct versions (>= A's) + for i := range 5 { + k := keyf("k", i) + itA, _ := dmA.Get(ctx, k) + + itB, _ := dmB.Get(ctx, k) + if itA == nil || itB == nil { + t.Fatalf("missing key %s after sync", k) + } + + if itB.Version < itA.Version { + t.Fatalf("expected B version >= A version for %s", k) + } + } +} + +func keyf(prefix string, i int) string { return prefix + ":" + string(rune('a'+i)) }