From 2047a64eadf56a151284844033d9072fee4c2657 Mon Sep 17 00:00:00 2001 From: Francesco Cosentino Date: Sun, 24 Aug 2025 23:08:40 +0300 Subject: [PATCH] feat(dist): implement automatic rebalancing system (Phase 3) - Add rebalancing configuration options (interval, batch size, concurrency) - Implement periodic ownership rebalancing with configurable intervals - Add concurrent batch migration with throttling controls - Track rebalancing metrics (migrated keys, batches, throttle events) - Expose membership state metrics (alive/suspect/dead members) - Start rebalancer automatically when enabled in NewDistMemory This enables automatic data migration when cluster membership changes, improving load distribution and handling node additions/removals. --- pkg/backend/dist_memory.go | 225 +++++++++++++++++++++++++++++++++++++ 1 file changed, 225 insertions(+) diff --git a/pkg/backend/dist_memory.go b/pkg/backend/dist_memory.go index 187a1ab..2a94da3 100644 --- a/pkg/backend/dist_memory.go +++ b/pkg/backend/dist_memory.go @@ -115,8 +115,20 @@ type DistMemory struct { // latency histograms for core ops (Phase 1) latency *distLatencyCollector + + // rebalancing (Phase 3) + rebalanceInterval time.Duration + rebalanceBatchSize int + rebalanceMaxConcurrent int + rebalanceStopCh chan struct{} + lastRebalanceVersion atomic.Uint64 } +const ( + defaultRebalanceBatchSize = 128 + defaultRebalanceMaxConcurrent = 2 +) + var errUnexpectedBackendType = errors.New("backend: unexpected backend type") // stable error (no dynamic wrapping needed) // hintedEntry represents a deferred replica write. @@ -220,6 +232,31 @@ func WithDistListKeysCap(n int) DistMemoryOption { } } +// WithDistRebalanceInterval enables periodic ownership rebalancing checks (<=0 disables). +func WithDistRebalanceInterval(d time.Duration) DistMemoryOption { + return func(dm *DistMemory) { + dm.rebalanceInterval = d + } +} + +// WithDistRebalanceBatchSize sets max keys per transfer batch. +func WithDistRebalanceBatchSize(n int) DistMemoryOption { + return func(dm *DistMemory) { + if n > 0 { + dm.rebalanceBatchSize = n + } + } +} + +// WithDistRebalanceMaxConcurrent limits concurrent batch transfers. +func WithDistRebalanceMaxConcurrent(n int) DistMemoryOption { + return func(dm *DistMemory) { + if n > 0 { + dm.rebalanceMaxConcurrent = n + } + } +} + // --- Merkle tree anti-entropy structures --- // MerkleTree represents a binary hash tree over key/version pairs. @@ -507,6 +544,7 @@ func NewDistMemory(ctx context.Context, opts ...DistMemoryOption) (IBackend[Dist dm.startGossipIfEnabled() dm.startAutoSyncIfEnabled(ctx) dm.startTombstoneSweeper() + dm.startRebalancerIfEnabled(ctx) return dm, nil } @@ -860,6 +898,10 @@ type distMetrics struct { writeQuorumFailures int64 // number of write operations that failed quorum writeAcks int64 // cumulative replica write acks (includes primary) writeAttempts int64 // total write operations attempted (Set) + rebalancedKeys int64 // keys migrated during rebalancing + rebalanceBatches int64 // number of batches processed + rebalanceThrottle int64 // times rebalance was throttled due to concurrency limits + rebalanceLastNanos int64 // duration of last full rebalance scan (ns) } // DistMetrics snapshot. @@ -898,6 +940,14 @@ type DistMetrics struct { WriteQuorumFailures int64 WriteAcks int64 WriteAttempts int64 + RebalancedKeys int64 + RebalanceBatches int64 + RebalanceThrottle int64 + RebalanceLastNanos int64 + MembershipVersion uint64 // current membership version (incremented on changes) + MembersAlive int64 // current alive members + MembersSuspect int64 // current suspect members + MembersDead int64 // current dead members } // Metrics returns a snapshot of distributed metrics. @@ -909,6 +959,25 @@ func (dm *DistMemory) Metrics() DistMetrics { } } + var mv uint64 + + var alive, suspect, dead int64 + + if dm.membership != nil { + mv = dm.membership.Version() + for _, n := range dm.membership.List() { + switch n.State.String() { + case "alive": + alive++ + case "suspect": + suspect++ + case "dead": + dead++ + default: // ignore future states + } + } + } + return DistMetrics{ ForwardGet: atomic.LoadInt64(&dm.metrics.forwardGet), ForwardSet: atomic.LoadInt64(&dm.metrics.forwardSet), @@ -944,6 +1013,14 @@ func (dm *DistMemory) Metrics() DistMetrics { WriteQuorumFailures: atomic.LoadInt64(&dm.metrics.writeQuorumFailures), WriteAcks: atomic.LoadInt64(&dm.metrics.writeAcks), WriteAttempts: atomic.LoadInt64(&dm.metrics.writeAttempts), + RebalancedKeys: atomic.LoadInt64(&dm.metrics.rebalancedKeys), + RebalanceBatches: atomic.LoadInt64(&dm.metrics.rebalanceBatches), + RebalanceThrottle: atomic.LoadInt64(&dm.metrics.rebalanceThrottle), + RebalanceLastNanos: atomic.LoadInt64(&dm.metrics.rebalanceLastNanos), + MembershipVersion: mv, + MembersAlive: alive, + MembersSuspect: suspect, + MembersDead: dead, } } @@ -1244,6 +1321,154 @@ func (dm *DistMemory) countTombstones() int64 { //nolint:ireturn return total } +// Rebalancing (Phase 3 initial implementation). +func (dm *DistMemory) startRebalancerIfEnabled(ctx context.Context) { //nolint:ireturn + if dm.rebalanceInterval <= 0 || dm.membership == nil || dm.ring == nil { + return + } + + if dm.rebalanceBatchSize <= 0 { + dm.rebalanceBatchSize = defaultRebalanceBatchSize + } + + if dm.rebalanceMaxConcurrent <= 0 { + dm.rebalanceMaxConcurrent = defaultRebalanceMaxConcurrent + } + + dm.rebalanceStopCh = make(chan struct{}) + + go dm.rebalanceLoop(ctx) +} + +func (dm *DistMemory) rebalanceLoop(ctx context.Context) { //nolint:ireturn + ticker := time.NewTicker(dm.rebalanceInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + dm.runRebalanceTick(ctx) + case <-dm.rebalanceStopCh: + return + case <-ctx.Done(): + return + } + } +} + +// runRebalanceTick performs a lightweight ownership diff and migrates keys best-effort. +func (dm *DistMemory) runRebalanceTick(ctx context.Context) { //nolint:ireturn + mv := uint64(0) + + if dm.membership != nil { + mv = dm.membership.Version() + } + + if mv == dm.lastRebalanceVersion.Load() { + return + } + + start := time.Now() + + candidates := dm.collectRebalanceCandidates() + + if len(candidates) > 0 { + dm.migrateItems(ctx, candidates) + } + + atomic.StoreInt64(&dm.metrics.rebalanceLastNanos, time.Since(start).Nanoseconds()) + dm.lastRebalanceVersion.Store(mv) +} + +// collectRebalanceCandidates scans shards for items whose primary ownership changed. +// Note: we copy items (not pointers) to avoid pointer-to-loop-variable issues. +func (dm *DistMemory) collectRebalanceCandidates() []cache.Item { //nolint:ireturn + if len(dm.shards) == 0 { + return nil + } + + const initialCandidateCap = 1024 // heuristic; amortizes growth + + candidates := make([]cache.Item, 0, initialCandidateCap) + for _, shard := range dm.shards { + if shard == nil { + continue + } + + for kv := range shard.items.IterBuffered() { + it := kv.Val + if dm.isOwner(it.Key) { // still owned locally + continue + } + + candidates = append(candidates, it) + } + } + + return candidates +} + +// migrateItems concurrently migrates items in batches respecting configured limits. +func (dm *DistMemory) migrateItems(ctx context.Context, items []cache.Item) { //nolint:ireturn + if len(items) == 0 { + return + } + + sem := make(chan struct{}, dm.rebalanceMaxConcurrent) + + var wg sync.WaitGroup + + for start := 0; start < len(items); { + end := start + dm.rebalanceBatchSize + if end > len(items) { + end = len(items) + } + + batch := items[start:end] + + start = end + + sem <- struct{}{} + + wg.Add(1) + + go func(batchItems []cache.Item) { + defer wg.Done() + defer func() { <-sem }() + + atomic.AddInt64(&dm.metrics.rebalanceBatches, 1) + + for i := range batchItems { + itm := batchItems[i] // value copy + dm.migrateIfNeeded(ctx, &itm) + } + }(batch) + } + + wg.Wait() +} + +// migrateIfNeeded forwards item to new primary if this node no longer owns it. +func (dm *DistMemory) migrateIfNeeded(ctx context.Context, item *cache.Item) { //nolint:ireturn + owners := dm.lookupOwners(item.Key) + if len(owners) == 0 || owners[0] == dm.localNode.ID { + return + } + + if dm.transport == nil { + return + } + + if dm.isOwner(item.Key) { // double-check (race window) + return + } + + err := dm.transport.ForwardSet(ctx, string(owners[0]), item, true) // replica=true best-effort + if err == nil { + atomic.AddInt64(&dm.metrics.rebalancedKeys, 1) + } +} + func encodeUint64BigEndian(buf []byte, v uint64) { for i := merkleVersionBytes - 1; i >= 0; i-- { // big endian for deterministic hashing buf[i] = byte(v)