Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 225 additions & 0 deletions pkg/backend/dist_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,20 @@ type DistMemory struct {

// latency histograms for core ops (Phase 1)
latency *distLatencyCollector

// rebalancing (Phase 3)
rebalanceInterval time.Duration
rebalanceBatchSize int
rebalanceMaxConcurrent int
rebalanceStopCh chan struct{}
lastRebalanceVersion atomic.Uint64
}

const (
defaultRebalanceBatchSize = 128
defaultRebalanceMaxConcurrent = 2
)

var errUnexpectedBackendType = errors.New("backend: unexpected backend type") // stable error (no dynamic wrapping needed)

// hintedEntry represents a deferred replica write.
Expand Down Expand Up @@ -220,6 +232,31 @@ func WithDistListKeysCap(n int) DistMemoryOption {
}
}

// WithDistRebalanceInterval enables periodic ownership rebalancing checks (<=0 disables).
func WithDistRebalanceInterval(d time.Duration) DistMemoryOption {
return func(dm *DistMemory) {
dm.rebalanceInterval = d
}
}

// WithDistRebalanceBatchSize sets max keys per transfer batch.
func WithDistRebalanceBatchSize(n int) DistMemoryOption {
return func(dm *DistMemory) {
if n > 0 {
dm.rebalanceBatchSize = n
}
}
}

// WithDistRebalanceMaxConcurrent limits concurrent batch transfers.
func WithDistRebalanceMaxConcurrent(n int) DistMemoryOption {
return func(dm *DistMemory) {
if n > 0 {
dm.rebalanceMaxConcurrent = n
}
}
}

// --- Merkle tree anti-entropy structures ---

// MerkleTree represents a binary hash tree over key/version pairs.
Expand Down Expand Up @@ -507,6 +544,7 @@ func NewDistMemory(ctx context.Context, opts ...DistMemoryOption) (IBackend[Dist
dm.startGossipIfEnabled()
dm.startAutoSyncIfEnabled(ctx)
dm.startTombstoneSweeper()
dm.startRebalancerIfEnabled(ctx)

return dm, nil
}
Expand Down Expand Up @@ -860,6 +898,10 @@ type distMetrics struct {
writeQuorumFailures int64 // number of write operations that failed quorum
writeAcks int64 // cumulative replica write acks (includes primary)
writeAttempts int64 // total write operations attempted (Set)
rebalancedKeys int64 // keys migrated during rebalancing
rebalanceBatches int64 // number of batches processed
rebalanceThrottle int64 // times rebalance was throttled due to concurrency limits
rebalanceLastNanos int64 // duration of last full rebalance scan (ns)
}

// DistMetrics snapshot.
Expand Down Expand Up @@ -898,6 +940,14 @@ type DistMetrics struct {
WriteQuorumFailures int64
WriteAcks int64
WriteAttempts int64
RebalancedKeys int64
RebalanceBatches int64
RebalanceThrottle int64
RebalanceLastNanos int64
MembershipVersion uint64 // current membership version (incremented on changes)
MembersAlive int64 // current alive members
MembersSuspect int64 // current suspect members
MembersDead int64 // current dead members
}

// Metrics returns a snapshot of distributed metrics.
Expand All @@ -909,6 +959,25 @@ func (dm *DistMemory) Metrics() DistMetrics {
}
}

var mv uint64

var alive, suspect, dead int64

if dm.membership != nil {
mv = dm.membership.Version()
for _, n := range dm.membership.List() {
switch n.State.String() {
case "alive":
alive++
case "suspect":
suspect++
case "dead":
dead++
default: // ignore future states
}
}
}

return DistMetrics{
ForwardGet: atomic.LoadInt64(&dm.metrics.forwardGet),
ForwardSet: atomic.LoadInt64(&dm.metrics.forwardSet),
Expand Down Expand Up @@ -944,6 +1013,14 @@ func (dm *DistMemory) Metrics() DistMetrics {
WriteQuorumFailures: atomic.LoadInt64(&dm.metrics.writeQuorumFailures),
WriteAcks: atomic.LoadInt64(&dm.metrics.writeAcks),
WriteAttempts: atomic.LoadInt64(&dm.metrics.writeAttempts),
RebalancedKeys: atomic.LoadInt64(&dm.metrics.rebalancedKeys),
RebalanceBatches: atomic.LoadInt64(&dm.metrics.rebalanceBatches),
RebalanceThrottle: atomic.LoadInt64(&dm.metrics.rebalanceThrottle),
RebalanceLastNanos: atomic.LoadInt64(&dm.metrics.rebalanceLastNanos),
MembershipVersion: mv,
MembersAlive: alive,
MembersSuspect: suspect,
MembersDead: dead,
}
}

Expand Down Expand Up @@ -1244,6 +1321,154 @@ func (dm *DistMemory) countTombstones() int64 { //nolint:ireturn
return total
}

// Rebalancing (Phase 3 initial implementation).
func (dm *DistMemory) startRebalancerIfEnabled(ctx context.Context) { //nolint:ireturn
if dm.rebalanceInterval <= 0 || dm.membership == nil || dm.ring == nil {
return
}

if dm.rebalanceBatchSize <= 0 {
dm.rebalanceBatchSize = defaultRebalanceBatchSize
}

if dm.rebalanceMaxConcurrent <= 0 {
dm.rebalanceMaxConcurrent = defaultRebalanceMaxConcurrent
}

dm.rebalanceStopCh = make(chan struct{})

go dm.rebalanceLoop(ctx)
}

func (dm *DistMemory) rebalanceLoop(ctx context.Context) { //nolint:ireturn
ticker := time.NewTicker(dm.rebalanceInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
dm.runRebalanceTick(ctx)
case <-dm.rebalanceStopCh:
return
case <-ctx.Done():
return
}
}
}

// runRebalanceTick performs a lightweight ownership diff and migrates keys best-effort.
func (dm *DistMemory) runRebalanceTick(ctx context.Context) { //nolint:ireturn
mv := uint64(0)

if dm.membership != nil {
mv = dm.membership.Version()
}

if mv == dm.lastRebalanceVersion.Load() {
return
}

start := time.Now()

candidates := dm.collectRebalanceCandidates()

if len(candidates) > 0 {
dm.migrateItems(ctx, candidates)
}

atomic.StoreInt64(&dm.metrics.rebalanceLastNanos, time.Since(start).Nanoseconds())
dm.lastRebalanceVersion.Store(mv)
}

// collectRebalanceCandidates scans shards for items whose primary ownership changed.
// Note: we copy items (not pointers) to avoid pointer-to-loop-variable issues.
func (dm *DistMemory) collectRebalanceCandidates() []cache.Item { //nolint:ireturn
if len(dm.shards) == 0 {
return nil
}

const initialCandidateCap = 1024 // heuristic; amortizes growth

candidates := make([]cache.Item, 0, initialCandidateCap)
for _, shard := range dm.shards {
if shard == nil {
continue
}

for kv := range shard.items.IterBuffered() {
it := kv.Val
if dm.isOwner(it.Key) { // still owned locally
continue
}

candidates = append(candidates, it)
}
}

return candidates
}

// migrateItems concurrently migrates items in batches respecting configured limits.
func (dm *DistMemory) migrateItems(ctx context.Context, items []cache.Item) { //nolint:ireturn
if len(items) == 0 {
return
}

sem := make(chan struct{}, dm.rebalanceMaxConcurrent)

var wg sync.WaitGroup

for start := 0; start < len(items); {
end := start + dm.rebalanceBatchSize
if end > len(items) {
end = len(items)
}

batch := items[start:end]

start = end

sem <- struct{}{}

wg.Add(1)

go func(batchItems []cache.Item) {
defer wg.Done()
defer func() { <-sem }()

atomic.AddInt64(&dm.metrics.rebalanceBatches, 1)

for i := range batchItems {
itm := batchItems[i] // value copy
dm.migrateIfNeeded(ctx, &itm)
}
}(batch)
}

wg.Wait()
}

// migrateIfNeeded forwards item to new primary if this node no longer owns it.
func (dm *DistMemory) migrateIfNeeded(ctx context.Context, item *cache.Item) { //nolint:ireturn
owners := dm.lookupOwners(item.Key)
if len(owners) == 0 || owners[0] == dm.localNode.ID {
return
}

if dm.transport == nil {
return
}

if dm.isOwner(item.Key) { // double-check (race window)
return
}

err := dm.transport.ForwardSet(ctx, string(owners[0]), item, true) // replica=true best-effort
if err == nil {
atomic.AddInt64(&dm.metrics.rebalancedKeys, 1)
Comment on lines +1467 to +1468
Copy link

Copilot AI Aug 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Silent failure handling for migration errors could mask important issues. Consider logging migration failures or adding error metrics to help with debugging rebalancing problems.

Suggested change
if err == nil {
atomic.AddInt64(&dm.metrics.rebalancedKeys, 1)
atomic.AddInt64(&dm.metrics.rebalancedKeys, 1)
} else {
log.Printf("failed to migrate key %q to node %q: %v", item.Key, owners[0], err)

Copilot uses AI. Check for mistakes.
}
}

func encodeUint64BigEndian(buf []byte, v uint64) {
for i := merkleVersionBytes - 1; i >= 0; i-- { // big endian for deterministic hashing
buf[i] = byte(v)
Expand Down
Loading