diff --git a/pkg/cxo/skyobject/cache.go b/pkg/cxo/skyobject/cache.go index 33002eca2b..c8f4437283 100644 --- a/pkg/cxo/skyobject/cache.go +++ b/pkg/cxo/skyobject/cache.go @@ -333,19 +333,22 @@ func (c *Cache) cleanDown(vol int) (err error) { it *item } - var rank = make([]*rankItem, 0, len(c.is)) // rank items + // Value slice (not []*rankItem) so we don't allocate a tiny heap + // object per cache entry; this turns one make+N small allocs into + // a single backing array per cleanDown call. + var rank = make([]rankItem, 0, len(c.is)) for key, it := range c.is { if it.isWanted() == true { //nolint:staticcheck - return err // skip wanted + continue // skip wanted } if it.isFilling() == true { //nolint:staticcheck - return err // skip filling (where val is nil) + continue // skip filling (where val is nil) } - rank = append(rank, &rankItem{key, it}) + rank = append(rank, rankItem{key, it}) } @@ -361,8 +364,8 @@ func (c *Cache) cleanDown(vol int) (err error) { if c.amount+1 > c.c.conf.CacheMaxAmount { var ( - i int // to reduce the rank slice - ri *rankItem // for the range (we need the i) + i int // to reduce the rank slice + ri rankItem // for the range (we need the i) ) for i, ri = range rank { @@ -376,8 +379,6 @@ func (c *Cache) cleanDown(vol int) (err error) { return err // fail on first error } - rank[i].it = nil // GC - } rank = rank[i:] // shift @@ -390,7 +391,7 @@ func (c *Cache) cleanDown(vol int) (err error) { return err // enough } - for i, ri := range rank { + for _, ri := range rank { if c.volume+vol <= c.volumec { break @@ -400,8 +401,6 @@ func (c *Cache) cleanDown(vol int) (err error) { return err // fail on first error } - rank[i].it = nil // GC - } return err diff --git a/pkg/cxo/treestore/publisher.go b/pkg/cxo/treestore/publisher.go index 3feebe367a..6e6bc7d02e 100644 --- a/pkg/cxo/treestore/publisher.go +++ b/pkg/cxo/treestore/publisher.go @@ -65,6 +65,22 @@ type Publisher struct { type memNode struct { leaves map[string][]byte // name → leaf value bytes (nil = not a leaf) subs map[string]*memNode // name → child node (nil = not a sub-tree) + + // Per-publish encoding cache. pubHash is the hash this sub-node + // resolved to in its last successful publish. cached is true when + // the in-memory state has not changed since that publish, so the + // next publish can place pubHash directly into the parent's + // TreeEntry and skip both the recursive encodeNode walk and the + // encoder.Serialize/Cache.Set chain entirely. Container.Save's + // reference walk increments the rc on the cached hash without + // recursing into it (lines 156-167 of skyobject/unpack.go), so the + // underlying CXO objects stay alive across publishes as long as + // each successive Root references the same hash. + // + // Any mutation that touches this node OR any descendant clears + // cached on every node along the path from the root. + pubHash skycipher.SHA256 + cached bool } func newMemNode() *memNode { @@ -516,7 +532,12 @@ func (p *Publisher) publishRoot(root *memNode) error { // Build the root TreeNode bottom-up. encodeNode writes leaf // objects and TreeEntries via up.Add, returning an unsaved // TreeNode value that we then wrap in a Dynamic for Root.Refs. - rootNode, err := encodeNode(up, root) + // freshSubs collects every sub-node we just freshly encoded so we + // can promote them into the per-memNode cache after Save succeeds + // — never before, otherwise an aborted publish would leave the + // cache pointing at hashes that up.Close has rolled back. + var freshSubs []freshSub + rootNode, err := encodeNode(up, root, &freshSubs) if err != nil { return err } @@ -548,13 +569,39 @@ func (p *Publisher) publishRoot(root *memNode) error { return err } p.cxoNode.Publish(r) + for _, fs := range freshSubs { + fs.n.pubHash = fs.hash + fs.n.cached = true + } return nil } +// freshSub records a sub-node whose TreeNode was freshly serialized +// during the current publishRoot call. After Save succeeds the +// publisher promotes these into the per-memNode encode-cache so the +// next publish can skip both the recursive walk and the +// encoder.Serialize / Cache.Set chain for any subtree that has not +// been mutated since. +type freshSub struct { + n *memNode + hash skycipher.SHA256 +} + // encodeNode walks a memNode and produces a TreeNode whose Children // Refs contains TreeEntries for every leaf and sub-node, sorted by // name for deterministic encoding. -func encodeNode(up registry.Pack, n *memNode) (TreeNode, error) { +// +// For each sub-node it consults memNode.cached: if the sub-node has +// not been mutated since its last successful publish, encodeNode reuses +// the cached pubHash directly in the parent's TreeEntry without +// recursing or re-serializing. Container.Save's reference walk then +// increments the rc on the cached hash without descending into it +// (skyobject/unpack.go:156-167), so the underlying CXO objects stay +// alive across publishes as long as each successive Root references +// the same hash. Mutated sub-trees are re-encoded via the slow path +// and recorded in freshSubs so the caller can promote them after +// Save returns nil. +func encodeNode(up registry.Pack, n *memNode, freshSubs *[]freshSub) (TreeNode, error) { var node TreeNode names := sortedNames(n) @@ -568,12 +615,18 @@ func encodeNode(up registry.Pack, n *memNode) (TreeNode, error) { if leaf, ok := n.leaves[name]; ok { entry.Leaf = append([]byte(nil), leaf...) } else if sub, ok := n.subs[name]; ok { - subNode, err := encodeNode(up, sub) - if err != nil { - return TreeNode{}, err - } - if err := entry.Sub.SetValue(up, &subNode); err != nil { - return TreeNode{}, err + if sub.cached { + // Fast path: subtree unchanged since last publish. + entry.Sub = registry.Ref{Hash: sub.pubHash} + } else { + subNode, err := encodeNode(up, sub, freshSubs) + if err != nil { + return TreeNode{}, err + } + if err := entry.Sub.SetValue(up, &subNode); err != nil { + return TreeNode{}, err + } + *freshSubs = append(*freshSubs, freshSub{n: sub, hash: entry.Sub.Hash}) } } else { // Should not happen — memNode invariants keep names in @@ -607,8 +660,20 @@ func sortedNames(n *memNode) []string { // installs the leaf value at the final segment. Returns // ErrPathConflict if the final segment is held by a sub-tree, or if // any intermediate segment is held by a leaf. +// +// On a successful put, the encode-cache (memNode.cached) is cleared on +// every node along the touched path so the next publish re-encodes +// only the changed branch. func putAt(root *memNode, segs []string, value []byte) error { cur := root + chain := make([]*memNode, 0, len(segs)) + chain = append(chain, cur) + mutated := false + defer func() { + if mutated { + invalidatePath(chain) + } + }() for i, seg := range segs[:len(segs)-1] { if _, isLeaf := cur.leaves[seg]; isLeaf { return &PathConflictError{Path: joinSegs(segs[:i+1]), Existing: "leaf"} @@ -617,14 +682,19 @@ func putAt(root *memNode, segs []string, value []byte) error { if !ok { next = newMemNode() cur.subs[seg] = next + // Creating a child structurally mutates the chain even if + // a deeper conflict aborts the put — invalidate. + mutated = true } cur = next + chain = append(chain, cur) } last := segs[len(segs)-1] if _, isSub := cur.subs[last]; isSub { return &PathConflictError{Path: joinSegs(segs), Existing: "sub-tree"} } cur.leaves[last] = value + mutated = true return nil } @@ -674,19 +744,23 @@ func deleteAt(root *memNode, segs []string) bool { parent := chain[i-1] delete(parent.subs, segs[i-1]) } + invalidatePath(chain) return true } // pruneAt removes the entire sub-tree (and any leaf at the same // position) addressed by segs. Returns true if anything was removed. func pruneAt(root *memNode, segs []string) bool { + chain := make([]*memNode, 0, len(segs)) parent := root + chain = append(chain, parent) for _, seg := range segs[:len(segs)-1] { next, ok := parent.subs[seg] if !ok { return false } parent = next + chain = append(chain, parent) } last := segs[len(segs)-1] _, hadLeaf := parent.leaves[last] @@ -696,9 +770,20 @@ func pruneAt(root *memNode, segs []string) bool { } delete(parent.leaves, last) delete(parent.subs, last) + invalidatePath(chain) return true } +// invalidatePath clears the encode-cache flag on every node in chain. +// Called by the mutators after any successful structural change so the +// next publish re-encodes the affected nodes (and re-uses cached +// hashes on every untouched sibling subtree). +func invalidatePath(chain []*memNode) { + for _, n := range chain { + n.cached = false + } +} + // walkLeaves visits every leaf at-or-under the given node, prefixing // reported paths with the supplied path string. func walkLeaves(n *memNode, path string, fn func(string, []byte) bool) bool { diff --git a/pkg/cxo/treestore/publisher_test.go b/pkg/cxo/treestore/publisher_test.go index 148a77c837..e679e96844 100644 --- a/pkg/cxo/treestore/publisher_test.go +++ b/pkg/cxo/treestore/publisher_test.go @@ -268,6 +268,94 @@ func TestPublisherPathConflictReporting(t *testing.T) { } } +// TestEncodeCacheInvalidatesOnlyAffectedPath pins the per-memNode +// encode-cache contract: after a publish, every sub-node along an +// untouched branch must remain `cached==true` so the next publish +// can short-circuit the recursive encodeNode walk and skip the +// encoder.Serialize / Cache.Set chain for that branch. A Put under +// one branch must clear cached on its ancestors only, leaving sibling +// branches reusable. +func TestEncodeCacheInvalidatesOnlyAffectedPath(t *testing.T) { + p, _ := newTestPublisher(t) + + for _, path := range []string{ + "a/x/1", + "a/y/1", + "b/x/1", + "b/y/1", + } { + if err := p.Put(path, []byte("v")); err != nil { + t.Fatalf("Put %s: %v", path, err) + } + } + if err := p.Flush(); err != nil { + t.Fatalf("Flush: %v", err) + } + + p.mu.Lock() + root := p.root + a := root.subs["a"] + b := root.subs["b"] + ax := a.subs["x"] + ay := a.subs["y"] + bx := b.subs["x"] + by := b.subs["y"] + for name, n := range map[string]*memNode{"a": a, "b": b, "a/x": ax, "a/y": ay, "b/x": bx, "b/y": by} { + if !n.cached { + t.Fatalf("after first Flush, %s.cached = false; want true", name) + } + } + p.mu.Unlock() + + // Mutate only under a/x. Expect cached to clear on the path + // root → a → a/x but stay set on every sibling. + if err := p.Put("a/x/2", []byte("w")); err != nil { + t.Fatalf("Put a/x/2: %v", err) + } + + p.mu.Lock() + if a.cached { + t.Fatalf("a.cached = true after mutation under a/x; want false") + } + if ax.cached { + t.Fatalf("a/x.cached = true after mutation under a/x; want false") + } + if !ay.cached { + t.Fatalf("a/y.cached = false after unrelated mutation; want true (sibling)") + } + if !b.cached { + t.Fatalf("b.cached = false after unrelated mutation; want true (sibling subtree)") + } + if !bx.cached || !by.cached { + t.Fatalf("b/x.cached=%v b/y.cached=%v; want both true (unrelated subtree)", bx.cached, by.cached) + } + bxHashBefore := bx.pubHash + byHashBefore := by.pubHash + p.mu.Unlock() + + if err := p.Flush(); err != nil { + t.Fatalf("second Flush: %v", err) + } + + // After the second publish, sibling subtree hashes must be + // preserved bit-for-bit — the publisher reused them via the cache + // path rather than re-encoding. + p.mu.Lock() + defer p.mu.Unlock() + if bx.pubHash != bxHashBefore { + t.Fatalf("b/x.pubHash changed across an unrelated publish: %x → %x", bxHashBefore, bx.pubHash) + } + if by.pubHash != byHashBefore { + t.Fatalf("b/y.pubHash changed across an unrelated publish: %x → %x", byHashBefore, by.pubHash) + } + // And every sub-node should be cached again post-publish. + for name, n := range map[string]*memNode{"a": a, "b": b, "a/x": ax, "a/y": ay, "b/x": bx, "b/y": by} { + if !n.cached { + t.Fatalf("after second Flush, %s.cached = false; want true", name) + } + } +} + func TestPublisherConcurrentPutsAreSafe(t *testing.T) { p, _ := newTestPublisher(t) var wg sync.WaitGroup