Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions pkg/cxo/skyobject/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,19 +333,22 @@ func (c *Cache) cleanDown(vol int) (err error) {
it *item
}

var rank = make([]*rankItem, 0, len(c.is)) // rank items
// Value slice (not []*rankItem) so we don't allocate a tiny heap
// object per cache entry; this turns one make+N small allocs into
// a single backing array per cleanDown call.
var rank = make([]rankItem, 0, len(c.is))

for key, it := range c.is {

if it.isWanted() == true { //nolint:staticcheck
return err // skip wanted
continue // skip wanted
}

if it.isFilling() == true { //nolint:staticcheck
return err // skip filling (where val is nil)
continue // skip filling (where val is nil)
}

rank = append(rank, &rankItem{key, it})
rank = append(rank, rankItem{key, it})

}

Expand All @@ -361,8 +364,8 @@ func (c *Cache) cleanDown(vol int) (err error) {
if c.amount+1 > c.c.conf.CacheMaxAmount {

var (
i int // to reduce the rank slice
ri *rankItem // for the range (we need the i)
i int // to reduce the rank slice
ri rankItem // for the range (we need the i)
)

for i, ri = range rank {
Expand All @@ -376,8 +379,6 @@ func (c *Cache) cleanDown(vol int) (err error) {
return err // fail on first error
}

rank[i].it = nil // GC

}

rank = rank[i:] // shift
Expand All @@ -390,7 +391,7 @@ func (c *Cache) cleanDown(vol int) (err error) {
return err // enough
}

for i, ri := range rank {
for _, ri := range rank {

if c.volume+vol <= c.volumec {
break
Expand All @@ -400,8 +401,6 @@ func (c *Cache) cleanDown(vol int) (err error) {
return err // fail on first error
}

rank[i].it = nil // GC

}

return err
Expand Down
101 changes: 93 additions & 8 deletions pkg/cxo/treestore/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,22 @@ type Publisher struct {
type memNode struct {
leaves map[string][]byte // name → leaf value bytes (nil = not a leaf)
subs map[string]*memNode // name → child node (nil = not a sub-tree)

// Per-publish encoding cache. pubHash is the hash this sub-node
// resolved to in its last successful publish. cached is true when
// the in-memory state has not changed since that publish, so the
// next publish can place pubHash directly into the parent's
// TreeEntry and skip both the recursive encodeNode walk and the
// encoder.Serialize/Cache.Set chain entirely. Container.Save's
// reference walk increments the rc on the cached hash without
// recursing into it (lines 156-167 of skyobject/unpack.go), so the
// underlying CXO objects stay alive across publishes as long as
// each successive Root references the same hash.
//
// Any mutation that touches this node OR any descendant clears
// cached on every node along the path from the root.
pubHash skycipher.SHA256
cached bool
}

func newMemNode() *memNode {
Expand Down Expand Up @@ -516,7 +532,12 @@ func (p *Publisher) publishRoot(root *memNode) error {
// Build the root TreeNode bottom-up. encodeNode writes leaf
// objects and TreeEntries via up.Add, returning an unsaved
// TreeNode value that we then wrap in a Dynamic for Root.Refs.
rootNode, err := encodeNode(up, root)
// freshSubs collects every sub-node we just freshly encoded so we
// can promote them into the per-memNode cache after Save succeeds
// — never before, otherwise an aborted publish would leave the
// cache pointing at hashes that up.Close has rolled back.
var freshSubs []freshSub
rootNode, err := encodeNode(up, root, &freshSubs)
if err != nil {
return err
}
Expand Down Expand Up @@ -548,13 +569,39 @@ func (p *Publisher) publishRoot(root *memNode) error {
return err
}
p.cxoNode.Publish(r)
for _, fs := range freshSubs {
fs.n.pubHash = fs.hash
fs.n.cached = true
}
return nil
}

// freshSub records a sub-node whose TreeNode was freshly serialized
// during the current publishRoot call. After Save succeeds the
// publisher promotes these into the per-memNode encode-cache so the
// next publish can skip both the recursive walk and the
// encoder.Serialize / Cache.Set chain for any subtree that has not
// been mutated since.
type freshSub struct {
n *memNode
hash skycipher.SHA256
}

// encodeNode walks a memNode and produces a TreeNode whose Children
// Refs contains TreeEntries for every leaf and sub-node, sorted by
// name for deterministic encoding.
func encodeNode(up registry.Pack, n *memNode) (TreeNode, error) {
//
// For each sub-node it consults memNode.cached: if the sub-node has
// not been mutated since its last successful publish, encodeNode reuses
// the cached pubHash directly in the parent's TreeEntry without
// recursing or re-serializing. Container.Save's reference walk then
// increments the rc on the cached hash without descending into it
// (skyobject/unpack.go:156-167), so the underlying CXO objects stay
// alive across publishes as long as each successive Root references
// the same hash. Mutated sub-trees are re-encoded via the slow path
// and recorded in freshSubs so the caller can promote them after
// Save returns nil.
func encodeNode(up registry.Pack, n *memNode, freshSubs *[]freshSub) (TreeNode, error) {
var node TreeNode

names := sortedNames(n)
Expand All @@ -568,12 +615,18 @@ func encodeNode(up registry.Pack, n *memNode) (TreeNode, error) {
if leaf, ok := n.leaves[name]; ok {
entry.Leaf = append([]byte(nil), leaf...)
} else if sub, ok := n.subs[name]; ok {
subNode, err := encodeNode(up, sub)
if err != nil {
return TreeNode{}, err
}
if err := entry.Sub.SetValue(up, &subNode); err != nil {
return TreeNode{}, err
if sub.cached {
// Fast path: subtree unchanged since last publish.
entry.Sub = registry.Ref{Hash: sub.pubHash}
} else {
subNode, err := encodeNode(up, sub, freshSubs)
if err != nil {
return TreeNode{}, err
}
if err := entry.Sub.SetValue(up, &subNode); err != nil {
return TreeNode{}, err
}
*freshSubs = append(*freshSubs, freshSub{n: sub, hash: entry.Sub.Hash})
}
} else {
// Should not happen — memNode invariants keep names in
Expand Down Expand Up @@ -607,8 +660,20 @@ func sortedNames(n *memNode) []string {
// installs the leaf value at the final segment. Returns
// ErrPathConflict if the final segment is held by a sub-tree, or if
// any intermediate segment is held by a leaf.
//
// On a successful put, the encode-cache (memNode.cached) is cleared on
// every node along the touched path so the next publish re-encodes
// only the changed branch.
func putAt(root *memNode, segs []string, value []byte) error {
cur := root
chain := make([]*memNode, 0, len(segs))
chain = append(chain, cur)
mutated := false
defer func() {
if mutated {
invalidatePath(chain)
}
}()
for i, seg := range segs[:len(segs)-1] {
if _, isLeaf := cur.leaves[seg]; isLeaf {
return &PathConflictError{Path: joinSegs(segs[:i+1]), Existing: "leaf"}
Expand All @@ -617,14 +682,19 @@ func putAt(root *memNode, segs []string, value []byte) error {
if !ok {
next = newMemNode()
cur.subs[seg] = next
// Creating a child structurally mutates the chain even if
// a deeper conflict aborts the put — invalidate.
mutated = true
}
cur = next
chain = append(chain, cur)
}
last := segs[len(segs)-1]
if _, isSub := cur.subs[last]; isSub {
return &PathConflictError{Path: joinSegs(segs), Existing: "sub-tree"}
}
cur.leaves[last] = value
mutated = true
return nil
}

Expand Down Expand Up @@ -674,19 +744,23 @@ func deleteAt(root *memNode, segs []string) bool {
parent := chain[i-1]
delete(parent.subs, segs[i-1])
}
invalidatePath(chain)
return true
}

// pruneAt removes the entire sub-tree (and any leaf at the same
// position) addressed by segs. Returns true if anything was removed.
func pruneAt(root *memNode, segs []string) bool {
chain := make([]*memNode, 0, len(segs))
parent := root
chain = append(chain, parent)
for _, seg := range segs[:len(segs)-1] {
next, ok := parent.subs[seg]
if !ok {
return false
}
parent = next
chain = append(chain, parent)
}
last := segs[len(segs)-1]
_, hadLeaf := parent.leaves[last]
Expand All @@ -696,9 +770,20 @@ func pruneAt(root *memNode, segs []string) bool {
}
delete(parent.leaves, last)
delete(parent.subs, last)
invalidatePath(chain)
return true
}

// invalidatePath clears the encode-cache flag on every node in chain.
// Called by the mutators after any successful structural change so the
// next publish re-encodes the affected nodes (and re-uses cached
// hashes on every untouched sibling subtree).
func invalidatePath(chain []*memNode) {
for _, n := range chain {
n.cached = false
}
}

// walkLeaves visits every leaf at-or-under the given node, prefixing
// reported paths with the supplied path string.
func walkLeaves(n *memNode, path string, fn func(string, []byte) bool) bool {
Expand Down
88 changes: 88 additions & 0 deletions pkg/cxo/treestore/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,94 @@ func TestPublisherPathConflictReporting(t *testing.T) {
}
}

// TestEncodeCacheInvalidatesOnlyAffectedPath pins the per-memNode
// encode-cache contract: after a publish, every sub-node along an
// untouched branch must remain `cached==true` so the next publish
// can short-circuit the recursive encodeNode walk and skip the
// encoder.Serialize / Cache.Set chain for that branch. A Put under
// one branch must clear cached on its ancestors only, leaving sibling
// branches reusable.
func TestEncodeCacheInvalidatesOnlyAffectedPath(t *testing.T) {
p, _ := newTestPublisher(t)

for _, path := range []string{
"a/x/1",
"a/y/1",
"b/x/1",
"b/y/1",
} {
if err := p.Put(path, []byte("v")); err != nil {
t.Fatalf("Put %s: %v", path, err)
}
}
if err := p.Flush(); err != nil {
t.Fatalf("Flush: %v", err)
}

p.mu.Lock()
root := p.root
a := root.subs["a"]
b := root.subs["b"]
ax := a.subs["x"]
ay := a.subs["y"]
bx := b.subs["x"]
by := b.subs["y"]
for name, n := range map[string]*memNode{"a": a, "b": b, "a/x": ax, "a/y": ay, "b/x": bx, "b/y": by} {
if !n.cached {
t.Fatalf("after first Flush, %s.cached = false; want true", name)
}
}
p.mu.Unlock()

// Mutate only under a/x. Expect cached to clear on the path
// root → a → a/x but stay set on every sibling.
if err := p.Put("a/x/2", []byte("w")); err != nil {
t.Fatalf("Put a/x/2: %v", err)
}

p.mu.Lock()
if a.cached {
t.Fatalf("a.cached = true after mutation under a/x; want false")
}
if ax.cached {
t.Fatalf("a/x.cached = true after mutation under a/x; want false")
}
if !ay.cached {
t.Fatalf("a/y.cached = false after unrelated mutation; want true (sibling)")
}
if !b.cached {
t.Fatalf("b.cached = false after unrelated mutation; want true (sibling subtree)")
}
if !bx.cached || !by.cached {
t.Fatalf("b/x.cached=%v b/y.cached=%v; want both true (unrelated subtree)", bx.cached, by.cached)
}
bxHashBefore := bx.pubHash
byHashBefore := by.pubHash
p.mu.Unlock()

if err := p.Flush(); err != nil {
t.Fatalf("second Flush: %v", err)
}

// After the second publish, sibling subtree hashes must be
// preserved bit-for-bit — the publisher reused them via the cache
// path rather than re-encoding.
p.mu.Lock()
defer p.mu.Unlock()
if bx.pubHash != bxHashBefore {
t.Fatalf("b/x.pubHash changed across an unrelated publish: %x → %x", bxHashBefore, bx.pubHash)
}
if by.pubHash != byHashBefore {
t.Fatalf("b/y.pubHash changed across an unrelated publish: %x → %x", byHashBefore, by.pubHash)
}
// And every sub-node should be cached again post-publish.
for name, n := range map[string]*memNode{"a": a, "b": b, "a/x": ax, "a/y": ay, "b/x": bx, "b/y": by} {
if !n.cached {
t.Fatalf("after second Flush, %s.cached = false; want true", name)
}
}
}

func TestPublisherConcurrentPutsAreSafe(t *testing.T) {
p, _ := newTestPublisher(t)
var wg sync.WaitGroup
Expand Down
Loading