From 54f6df65c3f08b9b55bae2a153bc015e0590f338 Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Sun, 3 May 2026 13:05:46 -0500 Subject: [PATCH 1/2] cxo/skyobject: fix Cache.cleanDown early-returning on first wanted/filling item The filter loop's "skip wanted" / "skip filling" comments said skip, but the code returned, aborting the entire ranking on the first non-evictable entry encountered during map iteration. On a busy visor there is virtually always something filling or being awaited, so the cache never evicted anything: putItem invoked cleanDown once the budget was exceeded, cleanDown built a len(c.is)-sized rank slice and immediately discarded it, and putItem then inserted the new item on top of an already-over-budget cache. The discarded slice is what the alloc profile pinned at 95% of total bytes allocated under cleanDown, and the un-evicted backlog is what pinned the live heap under putItem. continue, not return. While here, switch the rank slice from []*rankItem to []rankItem so we don't allocate a tiny heap object per cache entry. --- pkg/cxo/skyobject/cache.go | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/pkg/cxo/skyobject/cache.go b/pkg/cxo/skyobject/cache.go index 33002eca2b..c8f4437283 100644 --- a/pkg/cxo/skyobject/cache.go +++ b/pkg/cxo/skyobject/cache.go @@ -333,19 +333,22 @@ func (c *Cache) cleanDown(vol int) (err error) { it *item } - var rank = make([]*rankItem, 0, len(c.is)) // rank items + // Value slice (not []*rankItem) so we don't allocate a tiny heap + // object per cache entry; this turns one make+N small allocs into + // a single backing array per cleanDown call. + var rank = make([]rankItem, 0, len(c.is)) for key, it := range c.is { if it.isWanted() == true { //nolint:staticcheck - return err // skip wanted + continue // skip wanted } if it.isFilling() == true { //nolint:staticcheck - return err // skip filling (where val is nil) + continue // skip filling (where val is nil) } - rank = append(rank, &rankItem{key, it}) + rank = append(rank, rankItem{key, it}) } @@ -361,8 +364,8 @@ func (c *Cache) cleanDown(vol int) (err error) { if c.amount+1 > c.c.conf.CacheMaxAmount { var ( - i int // to reduce the rank slice - ri *rankItem // for the range (we need the i) + i int // to reduce the rank slice + ri rankItem // for the range (we need the i) ) for i, ri = range rank { @@ -376,8 +379,6 @@ func (c *Cache) cleanDown(vol int) (err error) { return err // fail on first error } - rank[i].it = nil // GC - } rank = rank[i:] // shift @@ -390,7 +391,7 @@ func (c *Cache) cleanDown(vol int) (err error) { return err // enough } - for i, ri := range rank { + for _, ri := range rank { if c.volume+vol <= c.volumec { break @@ -400,8 +401,6 @@ func (c *Cache) cleanDown(vol int) (err error) { return err // fail on first error } - rank[i].it = nil // GC - } return err From 6755582bec9f3d7292459640e32750a5c2deafa7 Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Sun, 3 May 2026 13:06:00 -0500 Subject: [PATCH 2/2] cxo/treestore: cache encoded sub-node hashes across publishes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Publisher.publishRoot re-walked the entire in-memory tree on every dirty batch and called encoder.Serialize + entry.Sub.SetValue at every level, even when only a single leaf had changed. With the visor stats publisher running at 1-min cadence this dominated CPU (via GC pressure from the Serialize churn) and live heap (the encoded TreeNode bytes piling up in Cache.putItem). Add a per-memNode encode-cache (pubHash + cached) that is populated after each successful Save and invalidated by putAt / deleteAt / pruneAt along the touched path. encodeNode now places the cached hash directly into the parent's TreeEntry for any unchanged sub-node and skips the recursive walk plus the encoder.Serialize / Cache.Set chain entirely. Container.Save's reference walk increments the rc on the cached hash without descending into it (skyobject/unpack.go:156-167), so the underlying CXO objects stay alive across publishes as long as each successive Root references the same hash — exactly the content-addressed behavior the package doc already promised. Cache promotion is deferred until after Save returns nil so an aborted publish doesn't leave the cache pointing at hashes that up.Close has rolled back. Adds TestEncodeCacheInvalidatesOnlyAffectedPath to pin the sibling-preservation contract. --- pkg/cxo/treestore/publisher.go | 101 +++++++++++++++++++++++++--- pkg/cxo/treestore/publisher_test.go | 88 ++++++++++++++++++++++++ 2 files changed, 181 insertions(+), 8 deletions(-) diff --git a/pkg/cxo/treestore/publisher.go b/pkg/cxo/treestore/publisher.go index 3feebe367a..6e6bc7d02e 100644 --- a/pkg/cxo/treestore/publisher.go +++ b/pkg/cxo/treestore/publisher.go @@ -65,6 +65,22 @@ type Publisher struct { type memNode struct { leaves map[string][]byte // name → leaf value bytes (nil = not a leaf) subs map[string]*memNode // name → child node (nil = not a sub-tree) + + // Per-publish encoding cache. pubHash is the hash this sub-node + // resolved to in its last successful publish. cached is true when + // the in-memory state has not changed since that publish, so the + // next publish can place pubHash directly into the parent's + // TreeEntry and skip both the recursive encodeNode walk and the + // encoder.Serialize/Cache.Set chain entirely. Container.Save's + // reference walk increments the rc on the cached hash without + // recursing into it (lines 156-167 of skyobject/unpack.go), so the + // underlying CXO objects stay alive across publishes as long as + // each successive Root references the same hash. + // + // Any mutation that touches this node OR any descendant clears + // cached on every node along the path from the root. + pubHash skycipher.SHA256 + cached bool } func newMemNode() *memNode { @@ -516,7 +532,12 @@ func (p *Publisher) publishRoot(root *memNode) error { // Build the root TreeNode bottom-up. encodeNode writes leaf // objects and TreeEntries via up.Add, returning an unsaved // TreeNode value that we then wrap in a Dynamic for Root.Refs. - rootNode, err := encodeNode(up, root) + // freshSubs collects every sub-node we just freshly encoded so we + // can promote them into the per-memNode cache after Save succeeds + // — never before, otherwise an aborted publish would leave the + // cache pointing at hashes that up.Close has rolled back. + var freshSubs []freshSub + rootNode, err := encodeNode(up, root, &freshSubs) if err != nil { return err } @@ -548,13 +569,39 @@ func (p *Publisher) publishRoot(root *memNode) error { return err } p.cxoNode.Publish(r) + for _, fs := range freshSubs { + fs.n.pubHash = fs.hash + fs.n.cached = true + } return nil } +// freshSub records a sub-node whose TreeNode was freshly serialized +// during the current publishRoot call. After Save succeeds the +// publisher promotes these into the per-memNode encode-cache so the +// next publish can skip both the recursive walk and the +// encoder.Serialize / Cache.Set chain for any subtree that has not +// been mutated since. +type freshSub struct { + n *memNode + hash skycipher.SHA256 +} + // encodeNode walks a memNode and produces a TreeNode whose Children // Refs contains TreeEntries for every leaf and sub-node, sorted by // name for deterministic encoding. -func encodeNode(up registry.Pack, n *memNode) (TreeNode, error) { +// +// For each sub-node it consults memNode.cached: if the sub-node has +// not been mutated since its last successful publish, encodeNode reuses +// the cached pubHash directly in the parent's TreeEntry without +// recursing or re-serializing. Container.Save's reference walk then +// increments the rc on the cached hash without descending into it +// (skyobject/unpack.go:156-167), so the underlying CXO objects stay +// alive across publishes as long as each successive Root references +// the same hash. Mutated sub-trees are re-encoded via the slow path +// and recorded in freshSubs so the caller can promote them after +// Save returns nil. +func encodeNode(up registry.Pack, n *memNode, freshSubs *[]freshSub) (TreeNode, error) { var node TreeNode names := sortedNames(n) @@ -568,12 +615,18 @@ func encodeNode(up registry.Pack, n *memNode) (TreeNode, error) { if leaf, ok := n.leaves[name]; ok { entry.Leaf = append([]byte(nil), leaf...) } else if sub, ok := n.subs[name]; ok { - subNode, err := encodeNode(up, sub) - if err != nil { - return TreeNode{}, err - } - if err := entry.Sub.SetValue(up, &subNode); err != nil { - return TreeNode{}, err + if sub.cached { + // Fast path: subtree unchanged since last publish. + entry.Sub = registry.Ref{Hash: sub.pubHash} + } else { + subNode, err := encodeNode(up, sub, freshSubs) + if err != nil { + return TreeNode{}, err + } + if err := entry.Sub.SetValue(up, &subNode); err != nil { + return TreeNode{}, err + } + *freshSubs = append(*freshSubs, freshSub{n: sub, hash: entry.Sub.Hash}) } } else { // Should not happen — memNode invariants keep names in @@ -607,8 +660,20 @@ func sortedNames(n *memNode) []string { // installs the leaf value at the final segment. Returns // ErrPathConflict if the final segment is held by a sub-tree, or if // any intermediate segment is held by a leaf. +// +// On a successful put, the encode-cache (memNode.cached) is cleared on +// every node along the touched path so the next publish re-encodes +// only the changed branch. func putAt(root *memNode, segs []string, value []byte) error { cur := root + chain := make([]*memNode, 0, len(segs)) + chain = append(chain, cur) + mutated := false + defer func() { + if mutated { + invalidatePath(chain) + } + }() for i, seg := range segs[:len(segs)-1] { if _, isLeaf := cur.leaves[seg]; isLeaf { return &PathConflictError{Path: joinSegs(segs[:i+1]), Existing: "leaf"} @@ -617,14 +682,19 @@ func putAt(root *memNode, segs []string, value []byte) error { if !ok { next = newMemNode() cur.subs[seg] = next + // Creating a child structurally mutates the chain even if + // a deeper conflict aborts the put — invalidate. + mutated = true } cur = next + chain = append(chain, cur) } last := segs[len(segs)-1] if _, isSub := cur.subs[last]; isSub { return &PathConflictError{Path: joinSegs(segs), Existing: "sub-tree"} } cur.leaves[last] = value + mutated = true return nil } @@ -674,19 +744,23 @@ func deleteAt(root *memNode, segs []string) bool { parent := chain[i-1] delete(parent.subs, segs[i-1]) } + invalidatePath(chain) return true } // pruneAt removes the entire sub-tree (and any leaf at the same // position) addressed by segs. Returns true if anything was removed. func pruneAt(root *memNode, segs []string) bool { + chain := make([]*memNode, 0, len(segs)) parent := root + chain = append(chain, parent) for _, seg := range segs[:len(segs)-1] { next, ok := parent.subs[seg] if !ok { return false } parent = next + chain = append(chain, parent) } last := segs[len(segs)-1] _, hadLeaf := parent.leaves[last] @@ -696,9 +770,20 @@ func pruneAt(root *memNode, segs []string) bool { } delete(parent.leaves, last) delete(parent.subs, last) + invalidatePath(chain) return true } +// invalidatePath clears the encode-cache flag on every node in chain. +// Called by the mutators after any successful structural change so the +// next publish re-encodes the affected nodes (and re-uses cached +// hashes on every untouched sibling subtree). +func invalidatePath(chain []*memNode) { + for _, n := range chain { + n.cached = false + } +} + // walkLeaves visits every leaf at-or-under the given node, prefixing // reported paths with the supplied path string. func walkLeaves(n *memNode, path string, fn func(string, []byte) bool) bool { diff --git a/pkg/cxo/treestore/publisher_test.go b/pkg/cxo/treestore/publisher_test.go index 148a77c837..e679e96844 100644 --- a/pkg/cxo/treestore/publisher_test.go +++ b/pkg/cxo/treestore/publisher_test.go @@ -268,6 +268,94 @@ func TestPublisherPathConflictReporting(t *testing.T) { } } +// TestEncodeCacheInvalidatesOnlyAffectedPath pins the per-memNode +// encode-cache contract: after a publish, every sub-node along an +// untouched branch must remain `cached==true` so the next publish +// can short-circuit the recursive encodeNode walk and skip the +// encoder.Serialize / Cache.Set chain for that branch. A Put under +// one branch must clear cached on its ancestors only, leaving sibling +// branches reusable. +func TestEncodeCacheInvalidatesOnlyAffectedPath(t *testing.T) { + p, _ := newTestPublisher(t) + + for _, path := range []string{ + "a/x/1", + "a/y/1", + "b/x/1", + "b/y/1", + } { + if err := p.Put(path, []byte("v")); err != nil { + t.Fatalf("Put %s: %v", path, err) + } + } + if err := p.Flush(); err != nil { + t.Fatalf("Flush: %v", err) + } + + p.mu.Lock() + root := p.root + a := root.subs["a"] + b := root.subs["b"] + ax := a.subs["x"] + ay := a.subs["y"] + bx := b.subs["x"] + by := b.subs["y"] + for name, n := range map[string]*memNode{"a": a, "b": b, "a/x": ax, "a/y": ay, "b/x": bx, "b/y": by} { + if !n.cached { + t.Fatalf("after first Flush, %s.cached = false; want true", name) + } + } + p.mu.Unlock() + + // Mutate only under a/x. Expect cached to clear on the path + // root → a → a/x but stay set on every sibling. + if err := p.Put("a/x/2", []byte("w")); err != nil { + t.Fatalf("Put a/x/2: %v", err) + } + + p.mu.Lock() + if a.cached { + t.Fatalf("a.cached = true after mutation under a/x; want false") + } + if ax.cached { + t.Fatalf("a/x.cached = true after mutation under a/x; want false") + } + if !ay.cached { + t.Fatalf("a/y.cached = false after unrelated mutation; want true (sibling)") + } + if !b.cached { + t.Fatalf("b.cached = false after unrelated mutation; want true (sibling subtree)") + } + if !bx.cached || !by.cached { + t.Fatalf("b/x.cached=%v b/y.cached=%v; want both true (unrelated subtree)", bx.cached, by.cached) + } + bxHashBefore := bx.pubHash + byHashBefore := by.pubHash + p.mu.Unlock() + + if err := p.Flush(); err != nil { + t.Fatalf("second Flush: %v", err) + } + + // After the second publish, sibling subtree hashes must be + // preserved bit-for-bit — the publisher reused them via the cache + // path rather than re-encoding. + p.mu.Lock() + defer p.mu.Unlock() + if bx.pubHash != bxHashBefore { + t.Fatalf("b/x.pubHash changed across an unrelated publish: %x → %x", bxHashBefore, bx.pubHash) + } + if by.pubHash != byHashBefore { + t.Fatalf("b/y.pubHash changed across an unrelated publish: %x → %x", byHashBefore, by.pubHash) + } + // And every sub-node should be cached again post-publish. + for name, n := range map[string]*memNode{"a": a, "b": b, "a/x": ax, "a/y": ay, "b/x": bx, "b/y": by} { + if !n.cached { + t.Fatalf("after second Flush, %s.cached = false; want true", name) + } + } +} + func TestPublisherConcurrentPutsAreSafe(t *testing.T) { p, _ := newTestPublisher(t) var wg sync.WaitGroup