diff --git a/CHANGELOG.md b/CHANGELOG.md index aae74ac26..30d95e58a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,11 +16,13 @@ The following emojis are used to highlight certain changes: ### Added +- `dag/walker`: new package for memory-efficient DAG traversal with deduplication. `VisitedTracker` interface with `BloomTracker` (scalable bloom filter chain, ~4 bytes/CID vs ~75 bytes for a map) and `MapTracker` (exact, for tests). `WalkDAG` provides iterative DFS traversal with integrated dedup, supporting dag-pb, dag-cbor, raw, and other registered codecs. ~2x faster than the legacy go-ipld-prime selector-based traversal. `WalkEntityRoots` emits only entity roots (files, directories, HAMT shards) instead of every block, skipping internal file chunks. [#1124](https://github.com/ipfs/boxo/pull/1124) - `routing/http/client`: `WithProviderInfoFunc` option resolves provider addresses at provide-time instead of client construction time. This only impacts legacy HTTP-only custom routing setups that depend on [IPIP-526](https://github.com/ipfs/specs/pull/526) and were sending unresolved `0.0.0.0` addresses in provider records instead of actual interface addresses. [#1115](https://github.com/ipfs/boxo/pull/1115) - `chunker`: added `Register` function to allow custom chunkers to be registered for use with `FromString`. ### Changed +- `provider`: `NewPrioritizedProvider` now continues to the next stream when one fails instead of stopping all streams. `NewConcatProvider` added for pre-deduplicated streams. [#1124](https://github.com/ipfs/boxo/pull/1124) - `chunker`: `FromString` now rejects malformed `size-` strings with extra parameters (e.g. `size-123-extra` was previously silently accepted). - upgrade to `go-libp2p` [v0.48.0](https://github.com/libp2p/go-libp2p/releases/tag/v0.48.0) @@ -32,6 +34,7 @@ The following emojis are used to highlight certain changes: ### Fixed +- `pinner`: `NewUniquePinnedProvider` and `NewPinnedEntityRootsProvider` now log and skip corrupted pin entries instead of aborting the entire provide cycle, allowing remaining pins to still be provided. [#1124](https://github.com/ipfs/boxo/pull/1124) - `bitswap/server`: incoming identity CIDs in wantlist messages are now silently ignored instead of killing the connection to the remote peer. Some IPFS implementations naively send identity CIDs, and disconnecting them for it caused unnecessary churn. [#1117](https://github.com/ipfs/boxo/pull/1117) - `bitswap/network`: `ExtractHTTPAddress` now infers default ports for portless HTTP multiaddrs (e.g. `/dns/host/https` without `/tcp/443`). [#1123](https://github.com/ipfs/boxo/pull/1123) diff --git a/dag/walker/doc.go b/dag/walker/doc.go new file mode 100644 index 000000000..975092eaf --- /dev/null +++ b/dag/walker/doc.go @@ -0,0 +1,21 @@ +// Package walker provides memory-efficient DAG traversal with +// deduplication. Optimized for the IPFS provide system, but useful +// anywhere repeated DAG walks need to skip already-visited subtrees. +// +// The primary entry point is [WalkDAG], which walks a DAG rooted at a +// given CID, emitting each visited CID to a callback. When combined +// with a [VisitedTracker] (e.g. [BloomTracker]), entire subtrees +// already seen are skipped in O(1). +// +// For entity-aware traversal that only emits file/directory/HAMT roots +// instead of every block, see [WalkEntityRoots]. +// +// Blocks are decoded using the codecs registered in the process via +// the global multicodec registry. In a standard kubo build this +// includes dag-pb, dag-cbor, dag-json, cbor, json, and raw. +// +// Use [LinksFetcherFromBlockstore] to create a fetcher backed by a +// local blockstore. For custom link extraction (e.g. a different codec +// registry or non-blockstore storage), pass your own [LinksFetcher] +// function directly to [WalkDAG]. +package walker diff --git a/dag/walker/entity.go b/dag/walker/entity.go new file mode 100644 index 000000000..94b446b57 --- /dev/null +++ b/dag/walker/entity.go @@ -0,0 +1,192 @@ +package walker + +import ( + "context" + "slices" + + blockstore "github.com/ipfs/boxo/blockstore" + "github.com/ipfs/boxo/ipld/unixfs" + cid "github.com/ipfs/go-cid" + ipld "github.com/ipld/go-ipld-prime" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + basicnode "github.com/ipld/go-ipld-prime/node/basic" + mh "github.com/multiformats/go-multihash" +) + +// EntityType represents the semantic type of a DAG entity. +type EntityType int + +const ( + EntityUnknown EntityType = iota + EntityFile // UnixFS file root (not its chunks) + EntityDirectory // UnixFS flat directory + EntityHAMTShard // UnixFS HAMT sharded directory bucket + EntitySymlink // UnixFS symbolic link +) + +// NodeFetcher returns child link CIDs and entity type for a given CID. +// Used by [WalkEntityRoots] which needs UnixFS type detection to decide +// whether to descend into children (directories, HAMT shards) or stop +// (files, symlinks). +type NodeFetcher func(ctx context.Context, c cid.Cid) (linkCIDs []cid.Cid, entityType EntityType, err error) + +// NodeFetcherFromBlockstore creates a [NodeFetcher] backed by a local +// blockstore. Like [LinksFetcherFromBlockstore], it decodes blocks via +// ipld-prime's global multicodec registry (dag-pb, dag-cbor, raw, etc.) +// and handles identity CIDs transparently via [blockstore.NewIdStore]. +// +// Entity type detection: +// - dag-pb with valid UnixFS Data: file, directory, HAMT shard, or symlink +// - dag-pb without valid UnixFS Data: EntityUnknown +// - raw codec: EntityFile (small file stored as a single raw block) +// - all other codecs (dag-cbor, dag-json, etc.): EntityUnknown +func NodeFetcherFromBlockstore(bs blockstore.Blockstore) NodeFetcher { + ls := linkSystemForBlockstore(bs) + + return func(ctx context.Context, c cid.Cid) ([]cid.Cid, EntityType, error) { + lnk := cidlink.Link{Cid: c} + nd, err := ls.Load(ipld.LinkContext{Ctx: ctx}, lnk, basicnode.Prototype.Any) + if err != nil { + return nil, EntityUnknown, err + } + + links := collectLinks(c, nd) + entityType := detectEntityType(c, nd) + return links, entityType, nil + } +} + +// detectEntityType infers the UnixFS entity type from an ipld-prime +// decoded node. For dag-pb nodes, it reads the "Data" field and parses +// it as UnixFS protobuf. For raw codec nodes, it returns EntityFile. +// For everything else, it returns EntityUnknown. +func detectEntityType(c cid.Cid, nd ipld.Node) EntityType { + codec := c.Prefix().Codec + + // raw codec: small file stored as a single block + if codec == cid.Raw { + return EntityFile + } + + // only dag-pb has UnixFS semantics; other codecs are unknown + if codec != cid.DagProtobuf { + return EntityUnknown + } + + // dag-pb: try to read the "Data" field for UnixFS type + dataField, err := nd.LookupByString("Data") + if err != nil || dataField.IsAbsent() || dataField.IsNull() { + return EntityUnknown + } + + dataBytes, err := dataField.AsBytes() + if err != nil { + return EntityUnknown + } + + fsn, err := unixfs.FSNodeFromBytes(dataBytes) + if err != nil { + return EntityUnknown + } + + switch fsn.Type() { + case unixfs.TFile, unixfs.TRaw: + return EntityFile + case unixfs.TDirectory: + return EntityDirectory + case unixfs.THAMTShard: + return EntityHAMTShard + case unixfs.TSymlink: + return EntitySymlink + default: + return EntityUnknown + } +} + +// WalkEntityRoots traverses a DAG calling emit for each entity root. +// +// Entity roots are semantic boundaries in the DAG: +// - File/symlink roots: emitted, children (chunks) NOT traversed +// - Directory roots: emitted, children recursed +// - HAMT shard nodes: emitted (needed for directory enumeration), +// children recursed +// - Non-UnixFS nodes (dag-cbor, dag-json, etc.): emitted AND children +// recursed to discover further content. The +entities optimization +// (skip chunks) only applies to UnixFS files; for all other codecs, +// every reachable CID is emitted. +// - Raw leaf nodes: emitted (no children to recurse) +// +// Same traversal order as [WalkDAG]: pre-order DFS with left-to-right +// sibling visiting. Uses the same option types: [WithVisitedTracker] +// for bloom/map dedup across walks, [WithLocality] for MFS locality +// checks. +func WalkEntityRoots( + ctx context.Context, + root cid.Cid, + fetch NodeFetcher, + emit func(cid.Cid) bool, + opts ...Option, +) error { + cfg := &walkConfig{} + for _, o := range opts { + o(cfg) + } + + stack := []cid.Cid{root} + + for len(stack) > 0 { + if ctx.Err() != nil { + return ctx.Err() + } + + // pop + c := stack[len(stack)-1] + stack = stack[:len(stack)-1] + + // dedup via tracker + if cfg.tracker != nil && !cfg.tracker.Visit(c) { + continue + } + + // locality check + if cfg.locality != nil { + local, err := cfg.locality(ctx, c) + if err != nil { + log.Errorf("entity walk: locality check %s: %s", c, err) + continue + } + if !local { + continue + } + } + + // fetch block and detect entity type + children, entityType, err := fetch(ctx, c) + if err != nil { + log.Errorf("entity walk: fetch %s: %s", c, err) + continue + } + + // decide whether to descend into children + descend := entityType != EntityFile && entityType != EntitySymlink + if descend { + // reverse so first link is popped next (left-to-right + // sibling order, matching WalkDAG and legacy BlockAll) + slices.Reverse(children) + stack = append(stack, children...) + } + + // skip identity CIDs: content is inline, no need to provide. + // we still descend (above) so an inlined dag-pb directory's + // normal children get provided. + if c.Prefix().MhType == mh.IDENTITY { + continue + } + + if !emit(c) { + return nil + } + } + + return nil +} diff --git a/dag/walker/entity_test.go b/dag/walker/entity_test.go new file mode 100644 index 000000000..058a6b77b --- /dev/null +++ b/dag/walker/entity_test.go @@ -0,0 +1,519 @@ +package walker_test + +import ( + "bytes" + "context" + "fmt" + "io" + "testing" + + "github.com/ipfs/boxo/blockservice" + blockstore "github.com/ipfs/boxo/blockstore" + "github.com/ipfs/boxo/dag/walker" + "github.com/ipfs/boxo/exchange/offline" + "github.com/ipfs/boxo/ipld/merkledag" + ft "github.com/ipfs/boxo/ipld/unixfs" + "github.com/ipfs/boxo/ipld/unixfs/hamt" + blocks "github.com/ipfs/go-block-format" + cid "github.com/ipfs/go-cid" + format "github.com/ipfs/go-ipld-format" + ipld "github.com/ipld/go-ipld-prime" + _ "github.com/ipld/go-ipld-prime/codec/dagcbor" + "github.com/ipld/go-ipld-prime/fluent/qp" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + basicnode "github.com/ipld/go-ipld-prime/node/basic" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func newTestDAGService(bs blockstore.Blockstore) format.DAGService { + bserv := blockservice.New(bs, offline.Exchange(bs)) + return merkledag.NewDAGService(bserv) +} + +func fileNodeWithData(t *testing.T, data []byte) *merkledag.ProtoNode { + t.Helper() + fsn := ft.NewFSNode(ft.TFile) + fsn.SetData(data) + nodeData, err := fsn.GetBytes() + require.NoError(t, err) + return merkledag.NodeWithData(nodeData) +} + +// putDagCBOR builds a dag-cbor block {name: string, links: [CID...]} +// and stores it in the blockstore. Returns its CID. +func putDagCBOR(t *testing.T, bs blockstore.Blockstore, name string, linkCIDs ...cid.Cid) cid.Cid { + t.Helper() + ls := cidlink.DefaultLinkSystem() + ls.StorageWriteOpener = func(_ ipld.LinkContext) (io.Writer, ipld.BlockWriteCommitter, error) { + buf := bytes.Buffer{} + return &buf, func(lnk ipld.Link) error { + cl := lnk.(cidlink.Link) + blk, err := blocks.NewBlockWithCid(buf.Bytes(), cl.Cid) + if err != nil { + return err + } + return bs.Put(context.Background(), blk) + }, nil + } + lp := cidlink.LinkPrototype{Prefix: cid.Prefix{ + Version: 1, Codec: cid.DagCBOR, MhType: 0x12, MhLength: 32, + }} + nd, err := qp.BuildMap(basicnode.Prototype.Any, -1, func(ma ipld.MapAssembler) { + qp.MapEntry(ma, "name", qp.String(name)) + qp.MapEntry(ma, "links", qp.List(-1, func(la ipld.ListAssembler) { + for _, c := range linkCIDs { + qp.ListEntry(la, qp.Link(cidlink.Link{Cid: c})) + } + })) + }) + require.NoError(t, err) + lnk, err := ls.Store(ipld.LinkContext{}, lp, nd) + require.NoError(t, err) + return lnk.(cidlink.Link).Cid +} + +func collectEntityWalk(t *testing.T, bs blockstore.Blockstore, root cid.Cid, opts ...walker.Option) []cid.Cid { + t.Helper() + var visited []cid.Cid + fetch := walker.NodeFetcherFromBlockstore(bs) + err := walker.WalkEntityRoots(t.Context(), root, fetch, func(c cid.Cid) bool { + visited = append(visited, c) + return true + }, opts...) + require.NoError(t, err) + return visited +} + +// --- UnixFS entity type detection --- +// +// These tests verify that WalkEntityRoots correctly identifies UnixFS +// entity types and treats them appropriately: files stop traversal +// (chunks not descended), directories and HAMT shards recurse. + +func TestEntityWalk_UnixFSFile(t *testing.T) { + // A single UnixFS file is an entity root. WalkEntityRoots should + // emit it and stop -- no children to recurse into. + bs := newTestBlockstore() + dserv := newTestDAGService(bs) + file := fileNodeWithData(t, []byte("content")) + require.NoError(t, dserv.Add(t.Context(), file)) + + visited := collectEntityWalk(t, bs, file.Cid()) + assert.Len(t, visited, 1) + assert.Equal(t, file.Cid(), visited[0]) +} + +func TestEntityWalk_ChunkedFileDoesNotDescend(t *testing.T) { + // A chunked file has child links (chunks). This is the core + // +entities optimization: the file root is emitted but its chunks + // are NOT traversed. Without this, every chunk CID would be + // provided to the DHT, which is wasteful. + bs := newTestBlockstore() + dserv := newTestDAGService(bs) + + chunk1 := merkledag.NewRawNode([]byte("chunk1")) + chunk2 := merkledag.NewRawNode([]byte("chunk2")) + require.NoError(t, dserv.Add(t.Context(), chunk1)) + require.NoError(t, dserv.Add(t.Context(), chunk2)) + + fsn := ft.NewFSNode(ft.TFile) + fsn.AddBlockSize(6) + fsn.AddBlockSize(6) + fileData, err := fsn.GetBytes() + require.NoError(t, err) + fileNode := merkledag.NodeWithData(fileData) + fileNode.AddNodeLink("", chunk1) + fileNode.AddNodeLink("", chunk2) + require.NoError(t, dserv.Add(t.Context(), fileNode)) + + visited := collectEntityWalk(t, bs, fileNode.Cid()) + assert.Len(t, visited, 1, "only file root, NOT chunks") + assert.Equal(t, fileNode.Cid(), visited[0]) + assert.NotContains(t, visited, chunk1.Cid()) + assert.NotContains(t, visited, chunk2.Cid()) +} + +func TestEntityWalk_RawNodeIsFile(t *testing.T) { + // Raw codec blocks (CIDv1 small files) are treated as files. + // They have no children and should be emitted as a single entity. + bs := newTestBlockstore() + raw := merkledag.NewRawNode([]byte("small file")) + require.NoError(t, bs.Put(t.Context(), raw)) + + visited := collectEntityWalk(t, bs, raw.Cid()) + assert.Len(t, visited, 1) + assert.Equal(t, raw.Cid(), visited[0]) +} + +func TestEntityWalk_Symlink(t *testing.T) { + // A UnixFS symlink is a leaf entity, like a file. It is emitted + // but its children (none, by definition) are not descended. + bs := newTestBlockstore() + dserv := newTestDAGService(bs) + + fsn := ft.NewFSNode(ft.TSymlink) + fsn.SetData([]byte("/some/target/path")) + symData, err := fsn.GetBytes() + require.NoError(t, err) + symNode := merkledag.NodeWithData(symData) + require.NoError(t, dserv.Add(t.Context(), symNode)) + + visited := collectEntityWalk(t, bs, symNode.Cid()) + assert.Len(t, visited, 1) + assert.Equal(t, symNode.Cid(), visited[0]) +} + +func TestEntityWalk_DirectoryWithSymlink(t *testing.T) { + // A directory containing a symlink. Both the directory and the + // symlink should be emitted (symlink is a leaf entity). + bs := newTestBlockstore() + dserv := newTestDAGService(bs) + + fsn := ft.NewFSNode(ft.TSymlink) + fsn.SetData([]byte("../other")) + symData, err := fsn.GetBytes() + require.NoError(t, err) + symNode := merkledag.NodeWithData(symData) + require.NoError(t, dserv.Add(t.Context(), symNode)) + + file := fileNodeWithData(t, []byte("real-file")) + require.NoError(t, dserv.Add(t.Context(), file)) + + dir := ft.EmptyDirNode() + dir.AddNodeLink("link.txt", symNode) + dir.AddNodeLink("real.txt", file) + require.NoError(t, dserv.Add(t.Context(), dir)) + + visited := collectEntityWalk(t, bs, dir.Cid()) + assert.Len(t, visited, 3, "dir + symlink + file") + assert.Equal(t, dir.Cid(), visited[0], "directory emitted first") + assert.Contains(t, visited, symNode.Cid()) + assert.Contains(t, visited, file.Cid()) +} + +func TestEntityWalk_Directory(t *testing.T) { + // A UnixFS directory is a container entity. It is emitted and its + // children (files) are recursed into. Each file is also emitted. + bs := newTestBlockstore() + dserv := newTestDAGService(bs) + + file1 := fileNodeWithData(t, []byte("file1")) + file2 := fileNodeWithData(t, []byte("file2")) + require.NoError(t, dserv.Add(t.Context(), file1)) + require.NoError(t, dserv.Add(t.Context(), file2)) + + dir := ft.EmptyDirNode() + dir.AddNodeLink("file1", file1) + dir.AddNodeLink("file2", file2) + require.NoError(t, dserv.Add(t.Context(), dir)) + + visited := collectEntityWalk(t, bs, dir.Cid()) + assert.Len(t, visited, 3, "dir + 2 files") + assert.Equal(t, dir.Cid(), visited[0], "directory emitted first") + assert.Contains(t, visited, file1.Cid()) + assert.Contains(t, visited, file2.Cid()) +} + +func TestEntityWalk_SiblingOrder(t *testing.T) { + // Siblings must be visited in left-to-right link order, matching + // the legacy BlockAll traversal and WalkDAG's order. + bs := newTestBlockstore() + dserv := newTestDAGService(bs) + + fileA := fileNodeWithData(t, []byte("aaa")) + fileB := fileNodeWithData(t, []byte("bbb")) + fileC := fileNodeWithData(t, []byte("ccc")) + require.NoError(t, dserv.Add(t.Context(), fileA)) + require.NoError(t, dserv.Add(t.Context(), fileB)) + require.NoError(t, dserv.Add(t.Context(), fileC)) + + dir := ft.EmptyDirNode() + dir.AddNodeLink("a.txt", fileA) + dir.AddNodeLink("b.txt", fileB) + dir.AddNodeLink("c.txt", fileC) + require.NoError(t, dserv.Add(t.Context(), dir)) + + visited := collectEntityWalk(t, bs, dir.Cid()) + require.Len(t, visited, 4) + assert.Equal(t, dir.Cid(), visited[0], "dir first (pre-order)") + assert.Equal(t, fileA.Cid(), visited[1], "first link visited first") + assert.Equal(t, fileB.Cid(), visited[2], "second link visited second") + assert.Equal(t, fileC.Cid(), visited[3], "third link visited third") +} + +func TestEntityWalk_DirectoryWithRawFiles(t *testing.T) { + // Directory containing raw codec files. Both the directory and the + // raw files should be emitted (raw = file entity). + bs := newTestBlockstore() + dserv := newTestDAGService(bs) + + raw1 := merkledag.NewRawNode([]byte("raw 1")) + raw2 := merkledag.NewRawNode([]byte("raw 2")) + require.NoError(t, dserv.Add(t.Context(), raw1)) + require.NoError(t, dserv.Add(t.Context(), raw2)) + + dir := ft.EmptyDirNode() + dir.AddNodeLink("r1.bin", raw1) + dir.AddNodeLink("r2.bin", raw2) + require.NoError(t, dserv.Add(t.Context(), dir)) + + visited := collectEntityWalk(t, bs, dir.Cid()) + assert.Len(t, visited, 3, "dir + 2 raw files") + assert.Contains(t, visited, dir.Cid()) + assert.Contains(t, visited, raw1.Cid()) + assert.Contains(t, visited, raw2.Cid()) +} + +// --- HAMT sharded directories --- + +func TestEntityWalk_HAMTDirectory(t *testing.T) { + // HAMT sharded directories store entries across multiple internal + // shard nodes. WalkEntityRoots must emit ALL shard nodes (needed + // for peers to enumerate the directory) AND all file entries, but + // must NOT descend into file chunks. + bs := newTestBlockstore() + dserv := newTestDAGService(bs) + + shard, err := hamt.NewShard(dserv, 256) + require.NoError(t, err) + + numFiles := 100 + fileCIDs := make(map[cid.Cid]struct{}) + for i := range numFiles { + file := fileNodeWithData(t, []byte(fmt.Sprintf("hamt-file-%d", i))) + require.NoError(t, dserv.Add(t.Context(), file)) + require.NoError(t, shard.Set(t.Context(), fmt.Sprintf("file%d.txt", i), file)) + fileCIDs[file.Cid()] = struct{}{} + } + rootNd, err := shard.Node() + require.NoError(t, err) + + tracker := walker.NewMapTracker() + visited := collectEntityWalk(t, bs, rootNd.Cid(), walker.WithVisitedTracker(tracker)) + + // all files emitted + for fc := range fileCIDs { + assert.Contains(t, visited, fc) + } + // root shard emitted + assert.Contains(t, visited, rootNd.Cid()) + // internal shard nodes exist beyond just files + root + shardCount := len(visited) - numFiles + assert.Greater(t, shardCount, 0, + "HAMT with %d entries must have internal shard nodes", numFiles) + t.Logf("HAMT: %d visited (%d files + %d shard nodes)", len(visited), numFiles, shardCount) +} + +// --- non-UnixFS codecs (dag-cbor) --- +// +// The +entities chunk-skip optimization only applies to UnixFS files. +// For all other codecs, every reachable CID is emitted AND its children +// are followed. This ensures that dag-cbor metadata wrapping UnixFS +// content is fully discoverable. + +func TestEntityWalk_DagCBORStandalone(t *testing.T) { + // A single dag-cbor block with no links. Should be emitted as an + // opaque entity root (non-UnixFS). + bs := newTestBlockstore() + c := putDagCBOR(t, bs, "standalone") + visited := collectEntityWalk(t, bs, c) + assert.Len(t, visited, 1) + assert.Equal(t, c, visited[0]) +} + +func TestEntityWalk_DagCBORChain(t *testing.T) { + // dag-cbor A -> B -> C. All three are non-UnixFS, so all must be + // emitted. The walk follows links in non-UnixFS nodes to discover + // further content. + bs := newTestBlockstore() + cC := putDagCBOR(t, bs, "C") + cB := putDagCBOR(t, bs, "B", cC) + cA := putDagCBOR(t, bs, "A", cB) + + visited := collectEntityWalk(t, bs, cA) + assert.Len(t, visited, 3, "all dag-cbor nodes emitted") + assert.Contains(t, visited, cA) + assert.Contains(t, visited, cB) + assert.Contains(t, visited, cC) +} + +func TestEntityWalk_DagCBORLinkingToUnixFS(t *testing.T) { + // dag-cbor root linking to a chunked UnixFS file. The dag-cbor + // root and the file root are emitted, but the file's chunks are + // NOT (entities optimization applies to the UnixFS file). + bs := newTestBlockstore() + dserv := newTestDAGService(bs) + + chunk := merkledag.NewRawNode([]byte("chunk")) + require.NoError(t, dserv.Add(t.Context(), chunk)) + fsn := ft.NewFSNode(ft.TFile) + fsn.AddBlockSize(5) + fileData, err := fsn.GetBytes() + require.NoError(t, err) + fileNode := merkledag.NodeWithData(fileData) + fileNode.AddNodeLink("", chunk) + require.NoError(t, dserv.Add(t.Context(), fileNode)) + + cborRoot := putDagCBOR(t, bs, "metadata", fileNode.Cid()) + + visited := collectEntityWalk(t, bs, cborRoot) + assert.Contains(t, visited, cborRoot, "dag-cbor root emitted") + assert.Contains(t, visited, fileNode.Cid(), "UnixFS file root emitted") + assert.NotContains(t, visited, chunk.Cid(), + "file chunks NOT emitted (entities optimization)") + assert.Len(t, visited, 2) +} + +// --- mixed codec DAG --- + +func TestEntityWalk_MixedCodecs(t *testing.T) { + // dag-cbor root -> UnixFS directory -> {file, raw leaf} + // All entity roots emitted, file chunks skipped. + bs := newTestBlockstore() + dserv := newTestDAGService(bs) + + raw := merkledag.NewRawNode([]byte("raw leaf")) + require.NoError(t, dserv.Add(t.Context(), raw)) + + file := fileNodeWithData(t, []byte("file content")) + require.NoError(t, dserv.Add(t.Context(), file)) + + dir := ft.EmptyDirNode() + dir.AddNodeLink("raw.bin", raw) + dir.AddNodeLink("file.txt", file) + require.NoError(t, dserv.Add(t.Context(), dir)) + + cborRoot := putDagCBOR(t, bs, "wrapper", dir.Cid()) + + visited := collectEntityWalk(t, bs, cborRoot) + assert.Len(t, visited, 4, "cbor root + dir + file + raw") + assert.Contains(t, visited, cborRoot) + assert.Contains(t, visited, dir.Cid()) + assert.Contains(t, visited, file.Cid()) + assert.Contains(t, visited, raw.Cid()) +} + +// --- dedup across walks --- + +func TestEntityWalk_SharedTrackerDedup(t *testing.T) { + // Two directories sharing a file. With a shared VisitedTracker, + // the shared file is emitted only once across both walks. This is + // the cross-pin dedup mechanism for the reprovide cycle. + bs := newTestBlockstore() + dserv := newTestDAGService(bs) + + shared := fileNodeWithData(t, []byte("shared")) + unique1 := fileNodeWithData(t, []byte("unique1")) + unique2 := fileNodeWithData(t, []byte("unique2")) + require.NoError(t, dserv.Add(t.Context(), shared)) + require.NoError(t, dserv.Add(t.Context(), unique1)) + require.NoError(t, dserv.Add(t.Context(), unique2)) + + dir1 := ft.EmptyDirNode() + dir1.AddNodeLink("shared", shared) + dir1.AddNodeLink("unique", unique1) + require.NoError(t, dserv.Add(t.Context(), dir1)) + + dir2 := ft.EmptyDirNode() + dir2.AddNodeLink("shared", shared) + dir2.AddNodeLink("unique", unique2) + require.NoError(t, dserv.Add(t.Context(), dir2)) + + tracker := walker.NewMapTracker() + fetch := walker.NodeFetcherFromBlockstore(bs) + var all []cid.Cid + + walker.WalkEntityRoots(t.Context(), dir1.Cid(), fetch, func(c cid.Cid) bool { + all = append(all, c) + return true + }, walker.WithVisitedTracker(tracker)) + + walker.WalkEntityRoots(t.Context(), dir2.Cid(), fetch, func(c cid.Cid) bool { + all = append(all, c) + return true + }, walker.WithVisitedTracker(tracker)) + + // dir1 + shared + unique1 + dir2 + unique2 = 5 + assert.Len(t, all, 5) + sharedCount := 0 + for _, c := range all { + if c == shared.Cid() { + sharedCount++ + } + } + assert.Equal(t, 1, sharedCount, "shared file emitted only once") +} + +// --- stop conditions --- + +func TestEntityWalk_EmitFalseStops(t *testing.T) { + // Returning false from emit must stop the walk immediately. + // Important for callers that want to limit results. + bs := newTestBlockstore() + dserv := newTestDAGService(bs) + + dir := ft.EmptyDirNode() + for i := range 5 { + f := fileNodeWithData(t, []byte(fmt.Sprintf("f%d", i))) + require.NoError(t, dserv.Add(t.Context(), f)) + dir.AddNodeLink(fmt.Sprintf("f%d", i), f) + } + require.NoError(t, dserv.Add(t.Context(), dir)) + + count := 0 + fetch := walker.NodeFetcherFromBlockstore(bs) + walker.WalkEntityRoots(t.Context(), dir.Cid(), fetch, func(c cid.Cid) bool { + count++ + return count < 3 + }) + assert.Equal(t, 3, count) +} + +func TestEntityWalk_ContextCancellation(t *testing.T) { + // Walk must respect context cancellation and return the error. + bs := newTestBlockstore() + dserv := newTestDAGService(bs) + + file := fileNodeWithData(t, []byte("file")) + require.NoError(t, dserv.Add(t.Context(), file)) + dir := ft.EmptyDirNode() + dir.AddNodeLink("file", file) + require.NoError(t, dserv.Add(t.Context(), dir)) + + ctx, cancel := context.WithCancel(t.Context()) + cancel() // cancel before walk starts + + fetch := walker.NodeFetcherFromBlockstore(bs) + err := walker.WalkEntityRoots(ctx, dir.Cid(), fetch, func(c cid.Cid) bool { + return true + }) + assert.ErrorIs(t, err, context.Canceled) +} + +// --- error handling --- + +func TestEntityWalk_FetchErrorSkips(t *testing.T) { + // Missing child blocks are skipped gracefully (best-effort). + // The walk continues with other branches. This prevents a single + // corrupt block from breaking the entire provide cycle. + bs := newTestBlockstore() + dserv := newTestDAGService(bs) + + exists := fileNodeWithData(t, []byte("exists")) + require.NoError(t, dserv.Add(t.Context(), exists)) + + missing := fileNodeWithData(t, []byte("missing")) + // intentionally NOT added to blockstore + + dir := ft.EmptyDirNode() + dir.AddNodeLink("exists", exists) + dir.AddNodeLink("missing", missing) + require.NoError(t, dserv.Add(t.Context(), dir)) + + visited := collectEntityWalk(t, bs, dir.Cid()) + assert.Contains(t, visited, dir.Cid()) + assert.Contains(t, visited, exists.Cid()) + assert.NotContains(t, visited, missing.Cid(), + "missing block should be skipped, not crash") +} diff --git a/dag/walker/identity_test.go b/dag/walker/identity_test.go new file mode 100644 index 000000000..7924d152d --- /dev/null +++ b/dag/walker/identity_test.go @@ -0,0 +1,156 @@ +package walker_test + +// Tests verifying that identity CIDs (multihash 0x00) are handled +// correctly across all walker and provider paths. Identity CIDs embed +// data inline, so providing them to the DHT is wasteful. The walker +// traverses through them (following links) but never emits them. +// +// This covers: +// - WalkDAG: identity root, identity child, identity dag-pb dir +// - WalkEntityRoots: identity file, identity dir, mixed normal+identity +// - IsIdentityCID: predicate correctness + +import ( + "testing" + + "github.com/ipfs/boxo/ipld/merkledag" + mdtest "github.com/ipfs/boxo/ipld/merkledag/test" + ft "github.com/ipfs/boxo/ipld/unixfs" + cid "github.com/ipfs/go-cid" + format "github.com/ipfs/go-ipld-format" + mh "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// makeIdentityCID creates a CIDv1 with an identity multihash (data +// inline in the CID). codec determines the CID codec prefix. +func makeIdentityCID(t *testing.T, data []byte, codec uint64) cid.Cid { + t.Helper() + hash, err := mh.Encode(data, mh.IDENTITY) + require.NoError(t, err) + return cid.NewCidV1(codec, hash) +} + +// --- WalkDAG identity tests --- + +func TestWalkDAG_IdentityCID(t *testing.T) { + t.Run("identity raw CID as root is not emitted", func(t *testing.T) { + bs := newTestBlockstore() + idCid := makeIdentityCID(t, []byte("inline"), cid.Raw) + + visited := collectWalk(t, bs, idCid) + assert.Empty(t, visited, + "identity CID root must not be emitted") + }) + + t.Run("dag-pb linking to identity child skips identity", func(t *testing.T) { + bs := newTestBlockstore() + dserv := merkledag.NewDAGService(mdtest.Bserv()) + + idChild := makeIdentityCID(t, []byte("inline-child"), cid.Raw) + + root := merkledag.NodeWithData([]byte("root")) + require.NoError(t, root.AddRawLink("inline", &format.Link{Cid: idChild})) + require.NoError(t, dserv.Add(t.Context(), root)) + require.NoError(t, bs.Put(t.Context(), root)) + + visited := collectWalk(t, bs, root.Cid()) + assert.Len(t, visited, 1, "only the non-identity root") + assert.Equal(t, root.Cid(), visited[0]) + assert.NotContains(t, visited, idChild, + "identity child must not be emitted") + }) + + t.Run("identity dag-pb directory with normal raw child", func(t *testing.T) { + // simulates `ipfs add --inline` producing a small dag-pb + // directory with identity multihash, linking to a normal + // raw block. The identity directory must not be emitted, + // but its normal child must be. + bs := newTestBlockstore() + + normalChild := putRawBlock(t, bs, []byte("normal-data")) + + // build a dag-pb directory node + dir := ft.EmptyDirNode() + require.NoError(t, dir.AddRawLink("child.bin", &format.Link{Cid: normalChild})) + + // re-encode the directory with an identity multihash to + // simulate what ipfs add --inline produces for small dirs + dirData := dir.RawData() + idHash, err := mh.Encode(dirData, mh.IDENTITY) + require.NoError(t, err) + idDirCid := cid.NewCidV1(cid.DagProtobuf, idHash) + // NOT stored in blockstore -- NewIdStore decodes from CID + + visited := collectWalk(t, bs, idDirCid) + assert.Len(t, visited, 1, "only the normal raw child") + assert.Equal(t, normalChild, visited[0], + "normal child reachable through identity dir must be emitted") + assert.NotContains(t, visited, idDirCid, + "identity directory must not be emitted") + }) +} + +// --- WalkEntityRoots identity tests --- + +func TestEntityWalk_IdentityFileNotEmitted(t *testing.T) { + bs := newTestBlockstore() + idFile := makeIdentityCID(t, []byte("tiny"), cid.Raw) + + visited := collectEntityWalk(t, bs, idFile) + assert.Empty(t, visited, "identity file must not be emitted") +} + +func TestEntityWalk_IdentityDirWithNormalChildren(t *testing.T) { + // An identity dag-pb directory (like `ipfs add --inline` produces + // for small dirs) linking to normal files. The identity directory + // must not be emitted, but its normal children must be. + bs := newTestBlockstore() + dserv := newTestDAGService(bs) + + file1 := fileNodeWithData(t, []byte("file1")) + file2 := fileNodeWithData(t, []byte("file2")) + require.NoError(t, dserv.Add(t.Context(), file1)) + require.NoError(t, dserv.Add(t.Context(), file2)) + + dir := ft.EmptyDirNode() + dir.AddNodeLink("f1.txt", file1) + dir.AddNodeLink("f2.txt", file2) + + // re-encode with identity multihash + dirData := dir.RawData() + idHash, err := mh.Encode(dirData, mh.IDENTITY) + require.NoError(t, err) + idDirCid := cid.NewCidV1(cid.DagProtobuf, idHash) + + visited := collectEntityWalk(t, bs, idDirCid) + assert.Len(t, visited, 2, "both normal files emitted") + assert.Contains(t, visited, file1.Cid()) + assert.Contains(t, visited, file2.Cid()) + assert.NotContains(t, visited, idDirCid, + "identity directory must not be emitted") +} + +func TestEntityWalk_NormalDirWithIdentityChild(t *testing.T) { + // A normal directory containing an identity CID child. The + // directory is emitted, the identity child is not. + bs := newTestBlockstore() + dserv := newTestDAGService(bs) + + idChild := makeIdentityCID(t, []byte("inline-file"), cid.Raw) + + normalFile := fileNodeWithData(t, []byte("normal")) + require.NoError(t, dserv.Add(t.Context(), normalFile)) + + dir := ft.EmptyDirNode() + require.NoError(t, dir.AddRawLink("inline.bin", &format.Link{Cid: idChild})) + dir.AddNodeLink("normal.txt", normalFile) + require.NoError(t, dserv.Add(t.Context(), dir)) + + visited := collectEntityWalk(t, bs, dir.Cid()) + assert.Contains(t, visited, dir.Cid(), "normal directory emitted") + assert.Contains(t, visited, normalFile.Cid(), "normal file emitted") + assert.NotContains(t, visited, idChild, + "identity child must not be emitted") +} diff --git a/dag/walker/visited.go b/dag/walker/visited.go new file mode 100644 index 000000000..5922b918e --- /dev/null +++ b/dag/walker/visited.go @@ -0,0 +1,353 @@ +package walker + +import ( + "crypto/rand" + "encoding/binary" + "errors" + "fmt" + "math" + + bbloom "github.com/ipfs/bbloom" + cid "github.com/ipfs/go-cid" +) + +// Default bloom filter parameters. +// +// See [NewBloomTracker] for creating a tracker with a specific FP rate. +const ( + // DefaultBloomFPRate is the target false positive rate, expressed as + // 1/N (one false positive per N lookups). At the default value of + // ~1 in 4.75 million (~0.00002%), each CID costs ~4 bytes (32 bits) + // before ipfs/bbloom's power-of-two rounding. + // + // This is low enough for most IPFS deployments. IPFS content + // typically has multiple providers, so a single node's false + // positive has no impact on content availability. Any CID skipped + // by a false positive is caught in the next reprovide cycle (22h). + // + // Actual memory depends on how [BloomTracker] chains blooms; see + // the scaling table in its documentation. As a rough guide, a + // single bloom sized for N items uses N*32 bits rounded up to the + // next power of two (e.g. 2M CIDs -> ~8 MB, 10M CIDs -> ~64 MB). + // + // Lowering this value (e.g. 1_000_000) uses less memory per CID + // but skips more CIDs. Raising it (e.g. 10_000_000) uses more + // memory but skips fewer. + DefaultBloomFPRate = 4_750_000 + + // DefaultBloomInitialCapacity is the number of expected items for + // the first bloom filter when no persisted count from a previous + // cycle exists. 2M items produces an ~8 MB bloom at the default FP + // rate, covering repos up to ~2M CIDs without chain growth. + DefaultBloomInitialCapacity = 2_000_000 + + // BloomGrowthMargin is multiplied with the persisted CID count from + // the previous reprovide cycle to size the initial bloom. The 1.5x + // margin provides headroom for repo growth between cycles so that + // a stable repo does not trigger chain growth on every cycle. + BloomGrowthMargin = 1.5 + + // BloomGrowthFactor determines how much larger each new bloom in the + // chain is compared to the previous one. 4x keeps the chain short + // (fewer blooms = less Has() overhead) while converging quickly to + // the actual repo size. + BloomGrowthFactor = 4 + + // MinBloomCapacity is the smallest expectedItems value accepted by + // [NewBloomTracker]. ipfs/bbloom derives k probe positions from a + // single SipHash via double hashing (h + i*l mod size). For small + // bitsets the stride patterns overlap, pushing the actual FP rate + // far above the designed target. Empirically, at capacity=1000 + // (32K-bit bitset) with k=22 the FP rate is ~50x worse than + // theory; at 10000 (512K bits) it matches. 10000 uses ~64 KB of + // memory while ensuring the actual FP rate matches the design. + MinBloomCapacity = 10_000 +) + +// BloomParams derives ipfs/bbloom parameters (bits per element, hash +// location count) from a target false positive rate expressed as 1/N. +// +// The number of hash functions k is derived as round(log2(N)), and +// bits per element as k / ln(2). Because k must be a positive integer, +// not every FP rate is exactly achievable -- the actual rate will be +// equal to or better than the target. Additionally, ipfs/bbloom rounds +// the total bitset to the next power of two, which further improves +// the actual rate. +func BloomParams(fpRate uint) (bitsPerElem uint, hashLocs uint) { + k := math.Round(math.Log2(float64(fpRate))) + if k < 1 { + k = 1 + } + bpe := k / math.Ln2 + return uint(math.Ceil(bpe)), uint(k) +} + +// VisitedTracker tracks which CIDs have been seen during DAG traversal. +// Implementations use c.Hash() (multihash bytes) as the key, so CIDv0 +// and CIDv1 of the same content are treated as the same entry. +// +// Implementations may be exact ([MapTracker]) or probabilistic +// ([BloomTracker]). Probabilistic implementations must keep the false +// positive rate negligible for the expected dataset size, or allow +// callers to adjust it (see [NewBloomTracker]). +// +// NOT safe for concurrent use. The provide pipeline runs on a single +// goroutine per reprovide cycle. Adding parallelism requires switching +// to thread-safe variants (bbloom AddTS/HasTS) or external +// synchronization. +type VisitedTracker interface { + // Visit marks a CID as visited. Returns true if it was NOT + // previously visited (first visit). + Visit(c cid.Cid) bool + // Has returns true if the CID was previously visited. + Has(c cid.Cid) bool +} + +var ( + _ VisitedTracker = (*cid.Set)(nil) + _ VisitedTracker = (*BloomTracker)(nil) + _ VisitedTracker = (*MapTracker)(nil) +) + +// MapTracker tracks visited CIDs using an in-memory map. Zero false +// positives. Useful for tests and small datasets. +// +// NOT safe for concurrent use. +type MapTracker struct { + set map[string]struct{} + deduplicated uint64 +} + +// NewMapTracker creates a new map-based visited tracker. +func NewMapTracker() *MapTracker { + return &MapTracker{set: make(map[string]struct{})} +} + +func (m *MapTracker) Visit(c cid.Cid) bool { + key := string(c.Hash()) + if _, ok := m.set[key]; ok { + m.deduplicated++ + return false + } + m.set[key] = struct{}{} + return true +} + +func (m *MapTracker) Has(c cid.Cid) bool { + _, ok := m.set[string(c.Hash())] + return ok +} + +// Deduplicated returns the number of Visit calls that returned false +// (CID already seen). Useful for logging how much dedup occurred. +func (m *MapTracker) Deduplicated() uint64 { return m.deduplicated } + +// BloomTracker tracks visited CIDs using a chain of bloom filters that +// grows automatically when the current filter becomes saturated. +// +// # Why it exists +// +// When the reprovide system walks pinned DAGs, many pins share the same +// sub-DAGs (e.g. append-only datasets where each version differs by a +// small delta). Without deduplication, the walker re-traverses every +// shared subtree for each pin -- O(pins * total_blocks) I/O. The bloom +// filter lets the walker skip already-visited subtrees in O(1), +// reducing work to O(unique_blocks). +// +// A single fixed-size bloom filter requires knowing the number of CIDs +// upfront. On the very first cycle (or after significant repo growth) +// this count is unknown. BloomTracker solves this by starting with a +// small bloom and automatically appending larger ones when the insert +// count reaches the current filter's designed capacity. +// +// # How it works +// +// BloomTracker maintains an ordered chain of bloom filters [b0, b1, ...]. +// Each filter's parameters (bits per element, hash count) are derived +// from the target false positive rate via [BloomParams]. +// +// - Has(c) checks ALL filters in the chain. If any filter reports the +// CID as present, it returns true. False positives are independent +// across filters because each uses unique random SipHash keys via +// [bbloom.NewWithKeys] (generated from crypto/rand). This also means +// different processes in a cluster hit different false positives, so +// a CID skipped by one node is still provided by others. +// - Visit(c) checks all filters first (like Has). If the CID is not +// found, it adds it to the latest filter and increments the insert +// counter. When inserts exceed the current filter's capacity, a new +// filter at BloomGrowthFactor times the capacity is appended. +// Saturation is detected via a simple integer comparison on every +// insert (O(1)). +// +// # Scaling behavior (at default FP rate) +// +// With DefaultBloomInitialCapacity = 2M and BloomGrowthFactor = 4x. +// Memory includes ipfs/bbloom's power-of-two rounding of each bitset. +// +// 2M CIDs: 1 bloom (~8 MB) +// 10M CIDs: 2 blooms (~42 MB) +// 40M CIDs: 3 blooms (~176 MB) +// 100M CIDs: 4 blooms (~713 MB) +// +// On subsequent cycles, the persisted count from the previous cycle +// sizes the initial bloom correctly (with BloomGrowthMargin headroom), +// so the chain typically stays at 1 bloom. +// +// # Concurrency +// +// NOT safe for concurrent use. See [VisitedTracker] for the +// single-goroutine invariant. +type BloomTracker struct { + chain []*bbloom.Bloom // oldest to newest + lastCap uint64 // designed capacity of the latest bloom + curInserts uint64 // inserts into current (latest) bloom + totalInserts uint64 // inserts across all blooms in chain + deduplicated uint64 // Visit calls that returned false + bitsPerElem uint // bits per element (derived from FP rate) + hashLocs uint // hash function count (derived from FP rate) +} + +// NewBloomTracker creates a bloom filter tracker sized for expectedItems +// at the given false positive rate (expressed as 1/N via fpRate). +// +// The bloom parameters (bits per element, hash count) are derived from +// fpRate via [BloomParams]. Because the hash count must be a positive +// integer, the actual FP rate may be slightly better than the target. +// ipfs/bbloom also rounds the bitset to the next power of two, further +// improving the actual rate. +// +// Returns an error if expectedItems is below [MinBloomCapacity], or +// fpRate is zero. +// +// When inserts exceed the current filter's capacity, a new filter at +// BloomGrowthFactor times the capacity is appended automatically. +// +// NOT safe for concurrent use. See [VisitedTracker] for the +// single-goroutine invariant. +func NewBloomTracker(expectedItems uint, fpRate uint) (*BloomTracker, error) { + if expectedItems < MinBloomCapacity { + return nil, fmt.Errorf("bloom tracker: expectedItems must be >= %d (got %d); "+ + "small blooms cause FP rates far above the design target "+ + "because ipfs/bbloom's double-hashing needs a large bitset", + MinBloomCapacity, expectedItems) + } + if fpRate == 0 { + return nil, errors.New("bloom tracker: fpRate must be > 0") + } + bpe, hlocs := BloomParams(fpRate) + b, err := newBloom(uint64(expectedItems), bpe, hlocs) + if err != nil { + return nil, fmt.Errorf("bloom tracker: %w", err) + } + log.Infow("bloom tracker created", + "capacity", expectedItems, + "fpRate", fmt.Sprintf("1 in %d (~%.6f%%)", fpRate, 100.0/float64(fpRate)), + "bitsPerElem", bpe, + "hashFunctions", hlocs) + return &BloomTracker{ + chain: []*bbloom.Bloom{b}, + lastCap: uint64(expectedItems), + bitsPerElem: bpe, + hashLocs: hlocs, + }, nil +} + +func (bt *BloomTracker) Has(c cid.Cid) bool { + key := []byte(c.Hash()) + for _, b := range bt.chain { + if b.Has(key) { + return true + } + } + return false +} + +func (bt *BloomTracker) Visit(c cid.Cid) bool { + key := []byte(c.Hash()) + + // Check earlier blooms for the CID. If any reports it as + // present (true positive from a prior growth epoch, or rare + // cross-bloom false positive), skip it. + earlier := bt.chain[:len(bt.chain)-1] + for _, b := range earlier { + if b.Has(key) { + bt.deduplicated++ + return false + } + } + + // Use AddIfNotHas on the current bloom: it atomically hashes, + // checks, and sets the bits in a single pass. This avoids the + // false-positive window that exists when Has() and Add() are + // called separately (a genuinely new CID could match already-set + // bits from other inserts, causing Has to return true and the CID + // to be silently skipped). + cur := bt.chain[len(bt.chain)-1] + if !cur.AddIfNotHas(key) { + bt.deduplicated++ + return false + } + bt.curInserts++ + bt.totalInserts++ + if bt.curInserts > bt.lastCap { + bt.grow() + } + return true +} + +// Count returns the total number of unique CIDs added across all blooms. +// Used to persist the cycle count for sizing the next cycle's bloom. +func (bt *BloomTracker) Count() uint64 { return bt.totalInserts } + +// Deduplicated returns the number of Visit calls that returned false +// (CID already seen or bloom false positive). Useful for logging how +// much dedup occurred in a reprovide cycle. +func (bt *BloomTracker) Deduplicated() uint64 { return bt.deduplicated } + +// grow appends a new bloom filter to the chain at BloomGrowthFactor +// times the previous capacity with fresh random SipHash keys. +// +// The grown capacity is always >= BloomGrowthFactor * MinBloomCapacity +// because NewBloomTracker enforces expectedItems >= MinBloomCapacity, +// so the double-hashing FP rate issue with small bitsets cannot occur. +func (bt *BloomTracker) grow() { + newCap := bt.lastCap * BloomGrowthFactor + b, err := newBloom(newCap, bt.bitsPerElem, bt.hashLocs) + if err != nil { + // bitsPerElem and hashLocs are validated at construction time, + // so this is unreachable unless something is deeply wrong. + panic(fmt.Sprintf("bloom grow: %v", err)) + } + log.Infow("bloom tracker autoscaled", + "prevCapacity", bt.lastCap, + "newCapacity", newCap, + "totalInserts", bt.totalInserts, + "chainLength", len(bt.chain)+1) + bt.chain = append(bt.chain, b) + bt.lastCap = newCap + bt.curInserts = 0 +} + +// newBloom creates a single bbloom filter with random SipHash keys. +func newBloom(capacity uint64, bitsPerElem, hashLocs uint) (*bbloom.Bloom, error) { + k0, k1 := randomSipHashKeys() + return bbloom.NewWithKeys(k0, k1, + float64(capacity*uint64(bitsPerElem)), + float64(hashLocs)) +} + +// randomSipHashKeys generates fresh random SipHash keys via crypto/rand. +// Every bloom instance (across chain growth AND across process restarts) +// gets unique keys. This ensures: +// - false positives are uncorrelated across blooms in the chain +// - in a cluster running multiple kubo instances, each node's bloom +// hits different false positives, so a CID skipped by one node is +// still provided by others +func randomSipHashKeys() (uint64, uint64) { + var buf [16]byte + if _, err := rand.Read(buf[:]); err != nil { + panic(fmt.Sprintf("bloom: crypto/rand failed: %v", err)) + } + return binary.LittleEndian.Uint64(buf[:8]), + binary.LittleEndian.Uint64(buf[8:]) +} diff --git a/dag/walker/visited_test.go b/dag/walker/visited_test.go new file mode 100644 index 000000000..f93c88ab6 --- /dev/null +++ b/dag/walker/visited_test.go @@ -0,0 +1,285 @@ +package walker + +import ( + "crypto/sha256" + "encoding/binary" + "testing" + + cid "github.com/ipfs/go-cid" + mh "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// makeCID creates a deterministic CIDv1 raw from an integer seed. +func makeCID(i int) cid.Cid { + var buf [4]byte + binary.LittleEndian.PutUint32(buf[:], uint32(i)) + h := sha256.Sum256(buf[:]) + hash, _ := mh.Encode(h[:], mh.SHA2_256) + return cid.NewCidV1(cid.Raw, hash) +} + +func makeCIDs(n int) []cid.Cid { + cids := make([]cid.Cid, n) + for i := range n { + cids[i] = makeCID(i) + } + return cids +} + +func TestBloomParams(t *testing.T) { + t.Run("default FP rate", func(t *testing.T) { + bpe, k := BloomParams(DefaultBloomFPRate) + // 1 in 4.75M -> k=round(log2(4750000))=22, bpe=ceil(22/ln2)=32 + assert.Equal(t, uint(32), bpe) + assert.Equal(t, uint(22), k) + }) + + t.Run("lower FP rate uses less memory", func(t *testing.T) { + bpeLow, _ := BloomParams(1_000_000) // 1 in 1M + bpeHigh, _ := BloomParams(DefaultBloomFPRate) + assert.Less(t, bpeLow, bpeHigh) + }) + + t.Run("higher FP rate uses more memory", func(t *testing.T) { + bpeDefault, _ := BloomParams(DefaultBloomFPRate) + bpeHigher, _ := BloomParams(10_000_000) + assert.Greater(t, bpeHigher, bpeDefault) + }) + + t.Run("fpRate=1 gives minimum params", func(t *testing.T) { + bpe, k := BloomParams(1) + assert.Equal(t, uint(1), k) + assert.GreaterOrEqual(t, bpe, uint(1)) + }) +} + +func TestMapTracker(t *testing.T) { + t.Run("visit and has", func(t *testing.T) { + m := NewMapTracker() + c := makeCID(0) + + assert.False(t, m.Has(c)) + assert.True(t, m.Visit(c)) // first visit + assert.True(t, m.Has(c)) + assert.False(t, m.Visit(c)) // second visit + }) + + t.Run("distinct CIDs are independent", func(t *testing.T) { + m := NewMapTracker() + c1 := makeCID(0) + c2 := makeCID(1) + + m.Visit(c1) + assert.True(t, m.Has(c1)) + assert.False(t, m.Has(c2)) + }) +} + +func TestBloomTracker(t *testing.T) { + t.Run("visit and has", func(t *testing.T) { + bt, err := NewBloomTracker(MinBloomCapacity, DefaultBloomFPRate) + require.NoError(t, err) + + c := makeCID(0) + + assert.False(t, bt.Has(c)) + assert.True(t, bt.Visit(c)) // first visit + assert.True(t, bt.Has(c)) + assert.False(t, bt.Visit(c)) // second visit + }) + + t.Run("count tracks unique inserts", func(t *testing.T) { + bt, err := NewBloomTracker(MinBloomCapacity, DefaultBloomFPRate) + require.NoError(t, err) + + cids := makeCIDs(100) + for _, c := range cids { + bt.Visit(c) + } + assert.Equal(t, uint64(100), bt.Count()) + + // revisiting doesn't increase count + for _, c := range cids { + bt.Visit(c) + } + assert.Equal(t, uint64(100), bt.Count()) + }) + + t.Run("chain growth on saturation", func(t *testing.T) { + bt, err := NewBloomTracker(MinBloomCapacity, DefaultBloomFPRate) + require.NoError(t, err) + assert.Len(t, bt.chain, 1) + + cids := makeCIDs(5 * MinBloomCapacity) + for _, c := range cids { + bt.Visit(c) + } + assert.Greater(t, len(bt.chain), 1, "chain should grow") + + // all inserted CIDs should still be found across the chain + for _, c := range cids { + assert.True(t, bt.Has(c), "CID should be found after chain growth") + } + }) + + t.Run("count survives chain growth", func(t *testing.T) { + // Insert just past capacity to trigger exactly one grow(). + // Minimal items in bloom2 keeps FP exposure negligible. + const total = MinBloomCapacity + 2 + bt, err := NewBloomTracker(MinBloomCapacity, DefaultBloomFPRate) + require.NoError(t, err) + + cids := makeCIDs(total) + for _, c := range cids { + bt.Visit(c) + } + assert.Greater(t, len(bt.chain), 1, "chain should have grown") + assert.Equal(t, uint64(total), bt.Count()) + }) + + t.Run("below MinBloomCapacity returns error", func(t *testing.T) { + _, err := NewBloomTracker(MinBloomCapacity-1, DefaultBloomFPRate) + require.Error(t, err) + assert.Contains(t, err.Error(), "expectedItems must be") + }) + + t.Run("zero fpRate returns error", func(t *testing.T) { + _, err := NewBloomTracker(MinBloomCapacity, 0) + require.Error(t, err) + assert.Contains(t, err.Error(), "fpRate must be > 0") + }) + + t.Run("FP regression at measurable rate", func(t *testing.T) { + // Use fpRate=1000 (1 in 1000) so we get enough FPs in 100K + // probes to be statistically measurable. + // + // Catches regressions in BloomParams derivation, bbloom + // behavior, or parameter coupling bugs. We allow 5x tolerance + // because bbloom's power-of-two rounding makes the actual rate + // better than target. + const ( + fpTarget = 1000 + n = 50_000 + probes = 100_000 + ) + bt, err := NewBloomTracker(uint(n), fpTarget) + require.NoError(t, err) + + for _, c := range makeCIDs(n) { + bt.Visit(c) + } + + fpCount := 0 + for i := n; i < n+probes; i++ { + if bt.Has(makeCID(i)) { + fpCount++ + } + } + observedRate := float64(fpCount) / float64(probes) + expectedRate := 1.0 / float64(fpTarget) + + t.Logf("FP regression: %d / %d = %.4f%% (target: %.4f%%, 1 in %d)", + fpCount, probes, observedRate*100, expectedRate*100, fpTarget) + + assert.Less(t, observedRate, expectedRate*5, + "FP rate %.4f%% is >5x worse than target %.4f%%", observedRate*100, expectedRate*100) + }) + + t.Run("FP regression at default rate", func(t *testing.T) { + // DefaultBloomFPRate is ~1 in 4.75M. With 100K probes the + // expected FP count is 100K/4.75M = ~0.02, so we should see + // exactly 0. Any non-zero result indicates a regression. + const n = 100_000 + bt, err := NewBloomTracker(uint(n), DefaultBloomFPRate) + require.NoError(t, err) + + for _, c := range makeCIDs(n) { + bt.Visit(c) + } + + fpCount := 0 + for i := n; i < 2*n; i++ { + if bt.Has(makeCID(i)) { + fpCount++ + } + } + t.Logf("default rate FPs: %d / %d (expected: 0 at ~1 in %d)", + fpCount, n, DefaultBloomFPRate) + assert.Equal(t, 0, fpCount, + "at ~1 in 4.75M FP rate, 100K probes should produce 0 FPs") + }) +} + +func TestBloomAndMapEquivalence(t *testing.T) { + bt, err := NewBloomTracker(MinBloomCapacity, DefaultBloomFPRate) + require.NoError(t, err) + mt := NewMapTracker() + + cids := makeCIDs(500) + + for _, c := range cids { + bv := bt.Visit(c) + mv := mt.Visit(c) + assert.Equal(t, mv, bv, "Visit mismatch for %s", c) + } + + for _, c := range cids { + bh := bt.Has(c) + mh := mt.Has(c) + assert.Equal(t, mh, bh, "Has mismatch for %s", c) + } +} + +func TestCidSetSatisfiesInterface(t *testing.T) { + var tracker VisitedTracker = cid.NewSet() + + c := makeCID(42) + assert.False(t, tracker.Has(c)) + assert.True(t, tracker.Visit(c)) + assert.True(t, tracker.Has(c)) + assert.False(t, tracker.Visit(c)) +} + +func TestBloomTrackerUniqueKeys(t *testing.T) { + bt1, err := NewBloomTracker(MinBloomCapacity, DefaultBloomFPRate) + require.NoError(t, err) + bt2, err := NewBloomTracker(MinBloomCapacity, DefaultBloomFPRate) + require.NoError(t, err) + + cids := makeCIDs(500) + for _, c := range cids { + bt1.Visit(c) + bt2.Visit(c) + } + + for _, c := range cids { + assert.True(t, bt1.Has(c)) + assert.True(t, bt2.Has(c)) + } + + // Check 10K non-inserted CIDs: false positives should differ + // between trackers since each uses independent random SipHash keys. + // With shared keys, fpBoth == fp1 == fp2 (correlated). + // With independent keys, P(both FP on same CID) is negligible. + var fp1, fp2, fpBoth int + for i := 500; i < 10500; i++ { + c := makeCID(i) + h1 := bt1.Has(c) + h2 := bt2.Has(c) + if h1 { + fp1++ + } + if h2 { + fp2++ + } + if h1 && h2 { + fpBoth++ + } + } + t.Logf("independent key FPs: bt1=%d, bt2=%d, both=%d (out of 10000 probes)", fp1, fp2, fpBoth) + if fp1 > 0 && fp2 > 0 { + assert.Less(t, fpBoth, fp1, "FPs should be uncorrelated between instances") + } +} diff --git a/dag/walker/walker.go b/dag/walker/walker.go new file mode 100644 index 000000000..c0821d34c --- /dev/null +++ b/dag/walker/walker.go @@ -0,0 +1,233 @@ +package walker + +import ( + "bytes" + "context" + "fmt" + "io" + "slices" + + blockstore "github.com/ipfs/boxo/blockstore" + cid "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" + ipld "github.com/ipld/go-ipld-prime" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + basicnode "github.com/ipld/go-ipld-prime/node/basic" + mh "github.com/multiformats/go-multihash" +) + +var log = logging.Logger("dagwalker") + +// LinksFetcher returns child link CIDs for a given CID. +// Used by [WalkDAG] which doesn't need entity type information. +type LinksFetcher func(ctx context.Context, c cid.Cid) ([]cid.Cid, error) + +// Option configures [WalkDAG] and [WalkEntityRoots]. +type Option func(*walkConfig) + +type walkConfig struct { + tracker VisitedTracker + locality func(context.Context, cid.Cid) (bool, error) +} + +// WithVisitedTracker sets the tracker used for cross-walk +// deduplication. When set, CIDs already visited (by this walk or a +// previous walk sharing the same tracker) are skipped along with +// their entire subtree. +func WithVisitedTracker(t VisitedTracker) Option { + return func(c *walkConfig) { c.tracker = t } +} + +// WithLocality sets a check function that determines whether a CID is +// locally available. When set, the walker only emits and descends into +// CIDs for which check returns true. Used by MFS providers to skip +// blocks not in the local blockstore (pass blockstore.Has directly). +// +// The locality check runs after the [VisitedTracker] check (which is +// a cheap in-memory operation), so already-visited CIDs never pay the +// locality I/O cost. +func WithLocality(check func(context.Context, cid.Cid) (bool, error)) Option { + return func(c *walkConfig) { c.locality = check } +} + +// WalkDAG performs an iterative depth-first walk of the DAG rooted at +// root, calling emit for each visited CID. Returns when the DAG is +// fully walked, emit returns false, or ctx is cancelled. +// +// The walk uses an explicit stack (not recursion) to avoid stack +// overflow on deep DAGs. For each CID: +// +// 1. [VisitedTracker].Visit -- if already visited, skip entire subtree. +// The CID is marked visited immediately (before fetch). If fetch +// later fails, the CID stays in the tracker and won't be retried +// this cycle -- caught in the next cycle (22h). This avoids a +// double bloom scan per CID. +// 2. If [WithLocality] is set, check locality -- if not local, skip. +// 3. Fetch block via fetch -- on error, log and skip. +// 4. Push child link CIDs to stack (deduped when popped at step 1). +// 5. Call emit(c) -- return false to stop the walk. +// +// # Traversal order +// +// Pre-order DFS with left-to-right sibling visiting: the root CID is +// always emitted first, and children are visited in the order they +// appear in the block's link list. This matches the legacy +// fetcherhelpers.BlockAll selector traversal and the conventional DFS +// order described in IPIP-0412. +func WalkDAG( + ctx context.Context, + root cid.Cid, + fetch LinksFetcher, + emit func(cid.Cid) bool, + opts ...Option, +) error { + cfg := &walkConfig{} + for _, o := range opts { + o(cfg) + } + + stack := []cid.Cid{root} + + for len(stack) > 0 { + if ctx.Err() != nil { + return ctx.Err() + } + + // pop + c := stack[len(stack)-1] + stack = stack[:len(stack)-1] + + // step 1: visit (mark + dedup in one call). If the CID was + // already visited (by this walk or a prior walk sharing the + // tracker), skip it and its entire subtree. + if cfg.tracker != nil && !cfg.tracker.Visit(c) { + continue + } + + // step 2: locality check + if cfg.locality != nil { + local, err := cfg.locality(ctx, c) + if err != nil { + log.Errorf("walk: locality check %s: %s", c, err) + continue + } + if !local { + continue + } + } + + // step 3: fetch + children, err := fetch(ctx, c) + if err != nil { + log.Errorf("walk: fetch %s: %s", c, err) + continue + } + + // step 4: push children in reverse order so the first link + // is on top of the stack and gets popped next. This gives + // left-to-right sibling visit order, matching the legacy + // BlockAll selector traversal and the conventional DFS order + // from IPIP-0412 (depth-first, pre-order, left-to-right). + slices.Reverse(children) + stack = append(stack, children...) + + // skip identity CIDs: content is inline, no need to provide + if c.Prefix().MhType == mh.IDENTITY { + continue + } + if !emit(c) { + return nil + } + } + + return nil +} + +// linkSystemForBlockstore creates an ipld.LinkSystem backed by a +// blockstore, used by both [LinksFetcherFromBlockstore] and +// [NodeFetcherFromBlockstore]. The blockstore is wrapped with +// [blockstore.NewIdStore] so identity CIDs (multihash code 0x00, +// data inline in the CID) are decoded transparently without +// requiring a datastore lookup. +func linkSystemForBlockstore(bs blockstore.Blockstore) ipld.LinkSystem { + idBS := blockstore.NewIdStore(bs) + ls := cidlink.DefaultLinkSystem() + ls.TrustedStorage = true + ls.StorageReadOpener = func(lctx ipld.LinkContext, lnk ipld.Link) (io.Reader, error) { + cl, ok := lnk.(cidlink.Link) + if !ok { + return nil, fmt.Errorf("unsupported link type: %T", lnk) + } + blk, err := idBS.Get(lctx.Ctx, cl.Cid) + if err != nil { + return nil, err + } + return bytes.NewReader(blk.RawData()), nil + } + return ls +} + +// LinksFetcherFromBlockstore creates a [LinksFetcher] backed by a +// local blockstore. Blocks are decoded using the codecs registered in +// the global multicodec registry (via ipld-prime's +// [cidlink.DefaultLinkSystem]). Identity CIDs are handled +// transparently via [blockstore.NewIdStore]. +// +// For custom link extraction, pass your own [LinksFetcher] to +// [WalkDAG] directly. +func LinksFetcherFromBlockstore(bs blockstore.Blockstore) LinksFetcher { + ls := linkSystemForBlockstore(bs) + + return func(ctx context.Context, c cid.Cid) ([]cid.Cid, error) { + lnk := cidlink.Link{Cid: c} + nd, err := ls.Load(ipld.LinkContext{Ctx: ctx}, lnk, basicnode.Prototype.Any) + if err != nil { + return nil, err + } + return collectLinks(c, nd), nil + } +} + +// collectLinks extracts all link CIDs from an ipld-prime node by +// recursively traversing maps, lists, and scalar link values. Only map +// values are inspected, not keys (no known IPLD codec uses link-typed +// map keys). parent is used for debug logging only. +func collectLinks(parent cid.Cid, nd ipld.Node) []cid.Cid { + var links []cid.Cid + collectLinksRecursive(parent, nd, &links) + return links +} + +func collectLinksRecursive(parent cid.Cid, nd ipld.Node, out *[]cid.Cid) { + switch nd.Kind() { + case ipld.Kind_Link: + lnk, err := nd.AsLink() + if err != nil { + log.Debugw("walk: link extraction failed", "cid", parent, "error", err) + return + } + if cl, ok := lnk.(cidlink.Link); ok { + *out = append(*out, cl.Cid) + } + case ipld.Kind_Map: + itr := nd.MapIterator() + for !itr.Done() { + _, v, err := itr.Next() + if err != nil { + log.Debugw("walk: map iteration failed", "cid", parent, "error", err) + break + } + collectLinksRecursive(parent, v, out) + } + case ipld.Kind_List: + itr := nd.ListIterator() + for !itr.Done() { + _, v, err := itr.Next() + if err != nil { + log.Debugw("walk: list iteration failed", "cid", parent, "error", err) + break + } + collectLinksRecursive(parent, v, out) + } + } +} diff --git a/dag/walker/walker_bench_test.go b/dag/walker/walker_bench_test.go new file mode 100644 index 000000000..2e689b835 --- /dev/null +++ b/dag/walker/walker_bench_test.go @@ -0,0 +1,381 @@ +package walker_test + +// Benchmarks comparing boxo/dag/walker against the legacy ipld-prime +// selector-based traversal (fetcherhelpers.BlockAll). +// +// Legacy path (kubo Provide.Strategy=pinned today): +// +// bsfetcher with dagpb.AddSupportToChooser -> BlockAll selector traversal +// (uses OfflineIPLDFetcher -- NO unixfsnode.Reify, that's MFS-only) +// +// New path: +// +// WalkDAG + LinksFetcherFromBlockstore -> iterative DFS with VisitedTracker +// (uses cidlink.DefaultLinkSystem from the blockstore) +// +// # Why the new walker is faster (even without dedup) +// +// The WalkerNoTracker variant has no bloom/map overhead yet is still +// ~2x faster than BlockAll. The speedup comes from architectural +// differences, not deduplication: +// +// - No selector overhead: BlockAll constructs and interprets an +// ipld-prime selector (ExploreRecursive + ExploreAll) for every +// traversal. This involves building selector nodes, matching them +// against the data model at each step, and maintaining selector +// state across recursion levels. WalkDAG uses a plain stack-based +// DFS with zero selector machinery. +// - Simpler node decoding: BlockAll goes through bsfetcher which +// wraps blockservice, creates per-session fetchers, and resolves +// nodes via the full ipld-prime linking/loading pipeline with +// prototype choosers. WalkDAG uses cidlink.DefaultLinkSystem +// directly against the blockstore, skipping the blockservice and +// session layers entirely. +// - Fewer allocations: the selector path allocates FetchResult +// structs, selector state, and intermediate node wrappers per +// visited node. WalkDAG allocates only a CID slice per node +// (the child links) and reuses the stack. This shows in the +// benchmarks as ~30-40% fewer allocs/op. +// +// Dedup (bloom/map) adds a small overhead (~8-15%) on single walks +// but pays off across multiple walks of overlapping DAGs, which is +// the primary use case in reprovide cycles. +// +// # DAG types benchmarked +// +// - dag-pb: standard UnixFS DAGs (most common pinned content) +// - dag-cbor: IPLD-native DAGs (supported by pinned strategy, not UnixFS) +// - mixed: dag-cbor root linking to dag-pb subtrees (realistic for +// applications that wrap UnixFS content in dag-cbor metadata) +// +// # Variants per DAG type +// +// - BlockAll: legacy ipld-prime selector traversal (baseline) +// - WalkerNoTracker: new walker without dedup (pure walk cost) +// - WalkerMapTracker: new walker with exact map-based dedup +// - WalkerBloomTracker: new walker with bloom filter dedup +// - BloomSecondWalk: bloom Has() skip speed when all CIDs already +// in bloom (simulates cross-pin shared subtree skipping) + +import ( + "bytes" + "context" + "fmt" + "io" + "testing" + + "github.com/ipfs/boxo/blockservice" + blockstore "github.com/ipfs/boxo/blockstore" + "github.com/ipfs/boxo/dag/walker" + "github.com/ipfs/boxo/exchange/offline" + "github.com/ipfs/boxo/fetcher" + fetcherhelpers "github.com/ipfs/boxo/fetcher/helpers" + bsfetcher "github.com/ipfs/boxo/fetcher/impl/blockservice" + "github.com/ipfs/boxo/ipld/merkledag" + mdtest "github.com/ipfs/boxo/ipld/merkledag/test" + blocks "github.com/ipfs/go-block-format" + cid "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + dagpb "github.com/ipld/go-codec-dagpb" + ipld "github.com/ipld/go-ipld-prime" + _ "github.com/ipld/go-ipld-prime/codec/dagcbor" + "github.com/ipld/go-ipld-prime/fluent/qp" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + basicnode "github.com/ipld/go-ipld-prime/node/basic" +) + +type benchDAG struct { + bs blockstore.Blockstore + bserv blockservice.BlockService + root cid.Cid + numNodes int + fetchFac fetcher.Factory // matches kubo's OfflineIPLDFetcher +} + +// makeBenchDAG_DagPB creates a pure dag-pb tree. +func makeBenchDAG_DagPB(b *testing.B, fanout, depth uint) *benchDAG { + b.Helper() + store := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + bserv := blockservice.New(store, offline.Exchange(store)) + dserv := merkledag.NewDAGService(bserv) + gen := mdtest.NewDAGGenerator() + root, allCids, err := gen.MakeDagNode(dserv.Add, fanout, depth) + if err != nil { + b.Fatal(err) + } + return &benchDAG{ + bs: store, + bserv: bserv, + root: root, + numNodes: len(allCids), + fetchFac: makeIPLDFetcher(bserv), + } +} + +// makeBenchDAG_DagCBOR creates a pure dag-cbor tree where each node +// is a map with "data" (bytes) and "links" (list of CID links). +func makeBenchDAG_DagCBOR(b *testing.B, fanout, depth int) *benchDAG { + b.Helper() + store := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + bserv := blockservice.New(store, offline.Exchange(store)) + + ls := cidlink.DefaultLinkSystem() + ls.StorageWriteOpener = func(lctx ipld.LinkContext) (io.Writer, ipld.BlockWriteCommitter, error) { + buf := bytes.Buffer{} + return &buf, func(lnk ipld.Link) error { + cl := lnk.(cidlink.Link) + blk, err := blocks.NewBlockWithCid(buf.Bytes(), cl.Cid) + if err != nil { + return err + } + return store.Put(context.Background(), blk) + }, nil + } + + lp := cidlink.LinkPrototype{Prefix: cid.Prefix{ + Version: 1, + Codec: cid.DagCBOR, + MhType: 0x12, // sha2-256 + MhLength: 32, + }} + + count := 0 + var build func(d int) ipld.Link + build = func(d int) ipld.Link { + var childLinks []ipld.Link + if d < depth { + for range fanout { + childLinks = append(childLinks, build(d+1)) + } + } + nd, err := qp.BuildMap(basicnode.Prototype.Any, -1, func(ma ipld.MapAssembler) { + qp.MapEntry(ma, "data", qp.Bytes(fmt.Appendf(nil, "node-%d", count))) + qp.MapEntry(ma, "links", qp.List(-1, func(la ipld.ListAssembler) { + for _, cl := range childLinks { + qp.ListEntry(la, qp.Link(cl)) + } + })) + }) + if err != nil { + b.Fatal(err) + } + lnk, err := ls.Store(ipld.LinkContext{}, lp, nd) + if err != nil { + b.Fatal(err) + } + count++ + return lnk + } + + rootLink := build(0) + rootCid := rootLink.(cidlink.Link).Cid + + return &benchDAG{ + bs: store, + bserv: bserv, + root: rootCid, + numNodes: count, + fetchFac: makeIPLDFetcher(bserv), + } +} + +// makeBenchDAG_Mixed creates a dag-cbor root that links to multiple +// dag-pb subtrees (realistic: dag-cbor metadata wrapping UnixFS content). +func makeBenchDAG_Mixed(b *testing.B, dagPBFanout, dagPBDepth uint, numPBSubtrees int) *benchDAG { + b.Helper() + store := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + bserv := blockservice.New(store, offline.Exchange(store)) + dserv := merkledag.NewDAGService(bserv) + + // build dag-pb subtrees + gen := mdtest.NewDAGGenerator() + var subtreeRoots []cid.Cid + totalNodes := 0 + for range numPBSubtrees { + root, allCids, err := gen.MakeDagNode(dserv.Add, dagPBFanout, dagPBDepth) + if err != nil { + b.Fatal(err) + } + subtreeRoots = append(subtreeRoots, root) + totalNodes += len(allCids) + } + + // build dag-cbor root linking to all dag-pb subtrees + ls := cidlink.DefaultLinkSystem() + ls.StorageWriteOpener = func(lctx ipld.LinkContext) (io.Writer, ipld.BlockWriteCommitter, error) { + buf := bytes.Buffer{} + return &buf, func(lnk ipld.Link) error { + cl := lnk.(cidlink.Link) + blk, err := blocks.NewBlockWithCid(buf.Bytes(), cl.Cid) + if err != nil { + return err + } + return store.Put(context.Background(), blk) + }, nil + } + lp := cidlink.LinkPrototype{Prefix: cid.Prefix{ + Version: 1, + Codec: cid.DagCBOR, + MhType: 0x12, + MhLength: 32, + }} + + nd, err := qp.BuildMap(basicnode.Prototype.Any, -1, func(ma ipld.MapAssembler) { + qp.MapEntry(ma, "type", qp.String("metadata")) + qp.MapEntry(ma, "subtrees", qp.List(-1, func(la ipld.ListAssembler) { + for _, r := range subtreeRoots { + qp.ListEntry(la, qp.Link(cidlink.Link{Cid: r})) + } + })) + }) + if err != nil { + b.Fatal(err) + } + rootLink, err := ls.Store(ipld.LinkContext{}, lp, nd) + if err != nil { + b.Fatal(err) + } + totalNodes++ // the dag-cbor root itself + + return &benchDAG{ + bs: store, + bserv: bserv, + root: rootLink.(cidlink.Link).Cid, + numNodes: totalNodes, + fetchFac: makeIPLDFetcher(bserv), + } +} + +// makeIPLDFetcher matches kubo's OfflineIPLDFetcher setup: +// dagpb prototype chooser, SkipNotFound, NO unixfsnode.Reify. +func makeIPLDFetcher(bserv blockservice.BlockService) fetcher.Factory { + f := bsfetcher.NewFetcherConfig(bserv) + f.SkipNotFound = true + f.PrototypeChooser = dagpb.AddSupportToChooser(bsfetcher.DefaultPrototypeChooser) + return f +} + +// --- walk functions --- + +func benchBlockAll(b *testing.B, dag *benchDAG) { + b.Helper() + ctx := context.Background() + b.ResetTimer() + for range b.N { + session := dag.fetchFac.NewSession(ctx) + err := fetcherhelpers.BlockAll(ctx, session, cidlink.Link{Cid: dag.root}, func(res fetcher.FetchResult) error { + return nil + }) + if err != nil { + b.Fatal(err) + } + } +} + +func benchWalkerNoTracker(b *testing.B, dag *benchDAG) { + b.Helper() + ctx := context.Background() + fetch := walker.LinksFetcherFromBlockstore(dag.bs) + b.ResetTimer() + for range b.N { + walker.WalkDAG(ctx, dag.root, fetch, func(c cid.Cid) bool { return true }) + } +} + +func benchWalkerMapTracker(b *testing.B, dag *benchDAG) { + b.Helper() + ctx := context.Background() + fetch := walker.LinksFetcherFromBlockstore(dag.bs) + b.ResetTimer() + for range b.N { + t := walker.NewMapTracker() + walker.WalkDAG(ctx, dag.root, fetch, func(c cid.Cid) bool { return true }, walker.WithVisitedTracker(t)) + } +} + +func benchWalkerBloomTracker(b *testing.B, dag *benchDAG) { + b.Helper() + ctx := context.Background() + fetch := walker.LinksFetcherFromBlockstore(dag.bs) + b.ResetTimer() + for range b.N { + t, _ := walker.NewBloomTracker(max(uint(dag.numNodes), walker.MinBloomCapacity), walker.DefaultBloomFPRate) + walker.WalkDAG(ctx, dag.root, fetch, func(c cid.Cid) bool { return true }, walker.WithVisitedTracker(t)) + } +} + +func benchWalkerBloomSecondWalk(b *testing.B, dag *benchDAG) { + b.Helper() + ctx := context.Background() + fetch := walker.LinksFetcherFromBlockstore(dag.bs) + b.ResetTimer() + for range b.N { + t, _ := walker.NewBloomTracker(max(uint(dag.numNodes), walker.MinBloomCapacity), walker.DefaultBloomFPRate) + walker.WalkDAG(ctx, dag.root, fetch, func(c cid.Cid) bool { return true }, walker.WithVisitedTracker(t)) + walker.WalkDAG(ctx, dag.root, fetch, func(c cid.Cid) bool { + b.Fatal("unexpected CID in second walk") + return false + }, walker.WithVisitedTracker(t)) + } +} + +// --- dag-pb benchmarks (fanout=10, depth=3 -> ~1111 nodes) --- + +func BenchmarkDagPB_BlockAll(b *testing.B) { benchBlockAll(b, makeBenchDAG_DagPB(b, 10, 3)) } + +func BenchmarkDagPB_WalkerNoTracker(b *testing.B) { + benchWalkerNoTracker(b, makeBenchDAG_DagPB(b, 10, 3)) +} + +func BenchmarkDagPB_WalkerMapTracker(b *testing.B) { + benchWalkerMapTracker(b, makeBenchDAG_DagPB(b, 10, 3)) +} + +func BenchmarkDagPB_WalkerBloomTracker(b *testing.B) { + benchWalkerBloomTracker(b, makeBenchDAG_DagPB(b, 10, 3)) +} + +func BenchmarkDagPB_BloomSecondWalk(b *testing.B) { + benchWalkerBloomSecondWalk(b, makeBenchDAG_DagPB(b, 10, 3)) +} + +// --- dag-cbor benchmarks (fanout=10, depth=3 -> 1111 nodes) --- + +func BenchmarkDagCBOR_BlockAll(b *testing.B) { benchBlockAll(b, makeBenchDAG_DagCBOR(b, 10, 3)) } + +func BenchmarkDagCBOR_WalkerNoTracker(b *testing.B) { + benchWalkerNoTracker(b, makeBenchDAG_DagCBOR(b, 10, 3)) +} + +func BenchmarkDagCBOR_WalkerMapTracker(b *testing.B) { + benchWalkerMapTracker(b, makeBenchDAG_DagCBOR(b, 10, 3)) +} + +func BenchmarkDagCBOR_WalkerBloomTracker(b *testing.B) { + benchWalkerBloomTracker(b, makeBenchDAG_DagCBOR(b, 10, 3)) +} + +func BenchmarkDagCBOR_BloomSecondWalk(b *testing.B) { + benchWalkerBloomSecondWalk(b, makeBenchDAG_DagCBOR(b, 10, 3)) +} + +// --- mixed benchmarks (dag-cbor root -> 5 dag-pb subtrees, each fanout=5 depth=2 -> ~156 nodes) --- + +func BenchmarkMixed_BlockAll(b *testing.B) { benchBlockAll(b, makeBenchDAG_Mixed(b, 5, 2, 5)) } + +func BenchmarkMixed_WalkerNoTracker(b *testing.B) { + benchWalkerNoTracker(b, makeBenchDAG_Mixed(b, 5, 2, 5)) +} + +func BenchmarkMixed_WalkerMapTracker(b *testing.B) { + benchWalkerMapTracker(b, makeBenchDAG_Mixed(b, 5, 2, 5)) +} + +func BenchmarkMixed_WalkerBloomTracker(b *testing.B) { + benchWalkerBloomTracker(b, makeBenchDAG_Mixed(b, 5, 2, 5)) +} + +func BenchmarkMixed_BloomSecondWalk(b *testing.B) { + benchWalkerBloomSecondWalk(b, makeBenchDAG_Mixed(b, 5, 2, 5)) +} diff --git a/dag/walker/walker_test.go b/dag/walker/walker_test.go new file mode 100644 index 000000000..67f6ba996 --- /dev/null +++ b/dag/walker/walker_test.go @@ -0,0 +1,490 @@ +package walker_test + +import ( + "context" + "fmt" + "testing" + + blockstore "github.com/ipfs/boxo/blockstore" + "github.com/ipfs/boxo/dag/walker" + "github.com/ipfs/boxo/ipld/merkledag" + mdtest "github.com/ipfs/boxo/ipld/merkledag/test" + ft "github.com/ipfs/boxo/ipld/unixfs" + "github.com/ipfs/boxo/ipld/unixfs/hamt" + blocks "github.com/ipfs/go-block-format" + cid "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + mh "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func newTestBlockstore() blockstore.Blockstore { + return blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) +} + +func buildDAG(t *testing.T, bs blockstore.Blockstore, fanout, depth uint) (cid.Cid, []cid.Cid) { + t.Helper() + dserv := merkledag.NewDAGService(mdtest.Bserv()) + gen := mdtest.NewDAGGenerator() + root, allCids, err := gen.MakeDagNode(dserv.Add, fanout, depth) + require.NoError(t, err) + for _, c := range allCids { + nd, err := dserv.Get(t.Context(), c) + require.NoError(t, err) + require.NoError(t, bs.Put(t.Context(), nd)) + } + return root, allCids +} + +func putRawBlock(t *testing.T, bs blockstore.Blockstore, data []byte) cid.Cid { + t.Helper() + hash, _ := mh.Sum(data, mh.SHA2_256, -1) + c := cid.NewCidV1(cid.Raw, hash) + blk, err := blocks.NewBlockWithCid(data, c) + require.NoError(t, err) + require.NoError(t, bs.Put(t.Context(), blk)) + return c +} + +func collectWalk(t *testing.T, bs blockstore.Blockstore, root cid.Cid, opts ...walker.Option) []cid.Cid { + t.Helper() + var visited []cid.Cid + fetch := walker.LinksFetcherFromBlockstore(bs) + err := walker.WalkDAG(t.Context(), root, fetch, func(c cid.Cid) bool { + visited = append(visited, c) + return true + }, opts...) + require.NoError(t, err) + return visited +} + +func TestWalkDAG_Traversal(t *testing.T) { + // Verify the walker visits every node in a DAG exactly once + // and in DFS pre-order (root first). + t.Run("visits all nodes in a multi-level DAG", func(t *testing.T) { + bs := newTestBlockstore() + root, allCids := buildDAG(t, bs, 3, 2) + + visited := collectWalk(t, bs, root) + assert.Len(t, visited, len(allCids), + "should visit every node in the DAG") + + visitedSet := make(map[cid.Cid]struct{}) + for _, c := range visited { + visitedSet[c] = struct{}{} + } + for _, c := range allCids { + assert.Contains(t, visitedSet, c, + "every CID in the DAG should be visited") + } + }) + + // The root CID must always be the first emitted CID, which is + // critical for ExecuteFastProvideRoot (root must be announced + // before any other block). + t.Run("root is always the first CID emitted", func(t *testing.T) { + bs := newTestBlockstore() + root, _ := buildDAG(t, bs, 3, 2) + + visited := collectWalk(t, bs, root) + require.NotEmpty(t, visited) + assert.Equal(t, root, visited[0]) + }) + + // Siblings must be visited in left-to-right link order, matching + // the legacy fetcherhelpers.BlockAll selector traversal and the + // conventional DFS order from IPIP-0412. + t.Run("siblings visited in left-to-right link order", func(t *testing.T) { + bs := newTestBlockstore() + dserv := merkledag.NewDAGService(mdtest.Bserv()) + + // three distinct leaves with deterministic link order + leafA := merkledag.NodeWithData([]byte("leaf-A")) + leafB := merkledag.NodeWithData([]byte("leaf-B")) + leafC := merkledag.NodeWithData([]byte("leaf-C")) + require.NoError(t, dserv.Add(t.Context(), leafA)) + require.NoError(t, dserv.Add(t.Context(), leafB)) + require.NoError(t, dserv.Add(t.Context(), leafC)) + + root := merkledag.NodeWithData([]byte("root")) + root.AddNodeLink("a", leafA) + root.AddNodeLink("b", leafB) + root.AddNodeLink("c", leafC) + require.NoError(t, dserv.Add(t.Context(), root)) + + for _, nd := range []merkledag.ProtoNode{*root, *leafA, *leafB, *leafC} { + require.NoError(t, bs.Put(t.Context(), &nd)) + } + + visited := collectWalk(t, bs, root.Cid()) + require.Len(t, visited, 4) + assert.Equal(t, root.Cid(), visited[0], "root first (pre-order)") + assert.Equal(t, leafA.Cid(), visited[1], "first link visited first") + assert.Equal(t, leafB.Cid(), visited[2], "second link visited second") + assert.Equal(t, leafC.Cid(), visited[3], "third link visited third") + }) + + // A single raw block with no links should be walked as a + // single-node DAG (common case: small files stored as raw leaves). + t.Run("single leaf node with no children", func(t *testing.T) { + bs := newTestBlockstore() + leaf := putRawBlock(t, bs, []byte("leaf data")) + + visited := collectWalk(t, bs, leaf) + assert.Len(t, visited, 1) + assert.Equal(t, leaf, visited[0]) + }) + + // When no VisitedTracker is provided, the walker should still + // visit every node (no dedup, no crash). + t.Run("works without any tracker", func(t *testing.T) { + bs := newTestBlockstore() + root, allCids := buildDAG(t, bs, 2, 2) + + visited := collectWalk(t, bs, root) // no WithVisitedTracker + assert.Len(t, visited, len(allCids)) + }) + + // DAG diamond: root -> {A, B}, A -> {C}, B -> {C}. + // With a tracker, C must be visited exactly once even though two + // paths lead to it. Without a tracker, C would be visited twice. + t.Run("DAG diamond: shared child visited once with tracker", func(t *testing.T) { + bs := newTestBlockstore() + dserv := merkledag.NewDAGService(mdtest.Bserv()) + + leafC := merkledag.NodeWithData([]byte("shared-leaf-C")) + require.NoError(t, dserv.Add(t.Context(), leafC)) + + nodeA := merkledag.NodeWithData([]byte("node-A")) + nodeA.AddNodeLink("c", leafC) + require.NoError(t, dserv.Add(t.Context(), nodeA)) + + nodeB := merkledag.NodeWithData([]byte("node-B")) + nodeB.AddNodeLink("c", leafC) + require.NoError(t, dserv.Add(t.Context(), nodeB)) + + root := merkledag.NodeWithData([]byte("root")) + root.AddNodeLink("a", nodeA) + root.AddNodeLink("b", nodeB) + require.NoError(t, dserv.Add(t.Context(), root)) + + for _, nd := range []merkledag.ProtoNode{*root, *nodeA, *nodeB, *leafC} { + require.NoError(t, bs.Put(t.Context(), &nd)) + } + + tracker := walker.NewMapTracker() + visited := collectWalk(t, bs, root.Cid(), walker.WithVisitedTracker(tracker)) + + cCount := 0 + for _, v := range visited { + if v == leafC.Cid() { + cCount++ + } + } + assert.Equal(t, 1, cCount, + "shared child C must be visited exactly once") + assert.Len(t, visited, 4, // root, A, B, C + "diamond DAG has 4 unique nodes") + }) + + // HAMT sharded directories are multi-level dag-pb structures where + // internal shard buckets are separate blocks. WalkDAG must visit + // every internal shard node and every leaf entry node. This is + // critical for provide: all HAMT layers must be announced so peers + // can enumerate the directory. + t.Run("HAMT sharded directory: all internal shard nodes visited", func(t *testing.T) { + bs := newTestBlockstore() + dserv := merkledag.NewDAGService(mdtest.Bserv()) + + // build a HAMT with 500 entries to force multiple shard levels. + // half are empty dirs (all share the same CID -- tests dedup + // across repeated leaves), half are unique files (distinct CIDs). + const nEntries = 500 + shard, err := hamt.NewShard(dserv, 256) + require.NoError(t, err) + leafCids := make(map[cid.Cid]struct{}) + emptyDir := ft.EmptyDirNode() + require.NoError(t, dserv.Add(t.Context(), emptyDir)) + for i := range nEntries { + name := fmt.Sprintf("entry-%04d", i) + if i%2 == 0 { + // empty dir (shared CID across all even entries) + require.NoError(t, shard.Set(t.Context(), name, emptyDir)) + leafCids[emptyDir.Cid()] = struct{}{} + } else { + // unique file + leaf := merkledag.NodeWithData([]byte(fmt.Sprintf("file-%04d", i))) + require.NoError(t, dserv.Add(t.Context(), leaf)) + require.NoError(t, shard.Set(t.Context(), name, leaf)) + leafCids[leaf.Cid()] = struct{}{} + } + } + + // serialize the HAMT (writes all shard nodes to dserv) + rootNd, err := shard.Node() + require.NoError(t, err) + rootCid := rootNd.Cid() + + // collect all CIDs reachable from root via dserv (ground truth) + allCids := make(map[cid.Cid]struct{}) + var enumerate func(c cid.Cid) + enumerate = func(c cid.Cid) { + if _, ok := allCids[c]; ok { + return + } + allCids[c] = struct{}{} + nd, err := dserv.Get(t.Context(), c) + if err != nil { + return + } + for _, lnk := range nd.Links() { + enumerate(lnk.Cid) + } + } + enumerate(rootCid) + + // copy all blocks to test blockstore + for c := range allCids { + nd, err := dserv.Get(t.Context(), c) + require.NoError(t, err) + require.NoError(t, bs.Put(t.Context(), nd)) + } + + // walk with tracker to dedup (HAMT leaf nodes are unique but + // the walker without tracker would revisit them via each shard) + tracker := walker.NewMapTracker() + visited := collectWalk(t, bs, rootCid, walker.WithVisitedTracker(tracker)) + visitedSet := make(map[cid.Cid]struct{}) + for _, c := range visited { + visitedSet[c] = struct{}{} + } + + assert.Len(t, visitedSet, len(allCids), + "WalkDAG must visit every unique block in the HAMT (internal shards + leaf entries)") + for c := range allCids { + assert.Contains(t, visitedSet, c, + "CID %s reachable from HAMT root must be visited", c) + } + + // verify internal shard nodes exist (not just leaves) + internalCount := len(allCids) - len(leafCids) + assert.Greater(t, internalCount, 1, + "HAMT with 500 entries must have multiple internal shard nodes") + t.Logf("HAMT: %d total blocks (%d internal shards, %d leaf entries)", + len(allCids), internalCount, len(leafCids)) + }) +} + +func TestWalkDAG_Dedup(t *testing.T) { + // MapTracker across two walks: CIDs from the first walk are + // skipped in the second walk. This is the core mechanism for + // cross-pin dedup in the reprovide cycle. + t.Run("shared MapTracker skips already-visited subtrees", func(t *testing.T) { + bs := newTestBlockstore() + root1, cids1 := buildDAG(t, bs, 2, 2) + root2, _ := buildDAG(t, bs, 2, 2) + + tracker := walker.NewMapTracker() + + visited1 := collectWalk(t, bs, root1, walker.WithVisitedTracker(tracker)) + assert.Len(t, visited1, len(cids1)) + + // second walk: independent root, but if any CID overlapped + // it would be skipped + visited2 := collectWalk(t, bs, root2, walker.WithVisitedTracker(tracker)) + for _, c := range visited2 { + for _, c1 := range visited1 { + assert.NotEqual(t, c, c1, + "CID from first walk must not appear in second walk") + } + } + }) + + // BloomTracker: walk the same root twice. Second walk should + // produce zero CIDs because everything is already in the bloom. + t.Run("shared BloomTracker dedup across walks of same root", func(t *testing.T) { + bs := newTestBlockstore() + root, allCids := buildDAG(t, bs, 3, 2) + + tracker, err := walker.NewBloomTracker(walker.MinBloomCapacity, walker.DefaultBloomFPRate) + require.NoError(t, err) + + visited1 := collectWalk(t, bs, root, walker.WithVisitedTracker(tracker)) + assert.Len(t, visited1, len(allCids)) + + visited2 := collectWalk(t, bs, root, walker.WithVisitedTracker(tracker)) + assert.Empty(t, visited2, + "second walk of same root must produce zero CIDs") + }) +} + +func TestWalkDAG_Locality(t *testing.T) { + // WithLocality filters CIDs that are not locally available. + // Used by MFS providers to skip blocks not in the local blockstore. + t.Run("only local CIDs are visited", func(t *testing.T) { + bs := newTestBlockstore() + root, _ := buildDAG(t, bs, 2, 1) + + locality := func(_ context.Context, c cid.Cid) (bool, error) { + return c == root, nil // only root is "local" + } + + visited := collectWalk(t, bs, root, walker.WithLocality(locality)) + assert.Len(t, visited, 1, + "only the local root should be visited") + assert.Equal(t, root, visited[0]) + }) + + // Locality errors should skip the CID (best-effort), not crash + // the walk. + t.Run("locality error skips CID gracefully", func(t *testing.T) { + bs := newTestBlockstore() + root, _ := buildDAG(t, bs, 2, 1) + + locality := func(_ context.Context, c cid.Cid) (bool, error) { + if c == root { + return true, nil + } + return false, assert.AnError + } + + visited := collectWalk(t, bs, root, walker.WithLocality(locality)) + assert.Len(t, visited, 1, + "children with locality errors should be skipped") + assert.Equal(t, root, visited[0]) + }) +} + +func TestWalkDAG_ErrorHandling(t *testing.T) { + // When the root itself is missing from the blockstore, the walk + // should return successfully with zero CIDs (best-effort: a + // corrupt block should not break the entire provide cycle). + t.Run("missing root produces no CIDs", func(t *testing.T) { + bs := newTestBlockstore() + missing := putRawBlock(t, bs, []byte("will-be-deleted")) + require.NoError(t, bs.DeleteBlock(t.Context(), missing)) + + var visited []cid.Cid + fetch := walker.LinksFetcherFromBlockstore(bs) + err := walker.WalkDAG(t.Context(), missing, fetch, func(c cid.Cid) bool { + visited = append(visited, c) + return true + }) + require.NoError(t, err, + "walk should succeed even with missing root (best-effort)") + assert.Empty(t, visited) + }) + + // When children fail to fetch, they are skipped but the root is + // still emitted. This ensures a corrupt child block doesn't + // prevent the parent from being provided. + t.Run("fetch error on children skips them but emits root", func(t *testing.T) { + bs := newTestBlockstore() + root, _ := buildDAG(t, bs, 2, 1) + + realFetch := walker.LinksFetcherFromBlockstore(bs) + failFetch := func(ctx context.Context, c cid.Cid) ([]cid.Cid, error) { + if c == root { + return realFetch(ctx, c) + } + return nil, assert.AnError + } + + var visited []cid.Cid + err := walker.WalkDAG(t.Context(), root, failFetch, func(c cid.Cid) bool { + visited = append(visited, c) + return true + }) + require.NoError(t, err) + assert.Len(t, visited, 1, + "only root should be emitted when children fail") + assert.Equal(t, root, visited[0]) + }) + + // CIDs are marked visited at pop time (before fetch). If fetch + // fails, the CID stays in the tracker and won't be retried this + // cycle. This avoids a double bloom scan per CID. The CID is + // caught in the next reprovide cycle (22h). + t.Run("fetch error still marks CID as visited", func(t *testing.T) { + bs := newTestBlockstore() + root, _ := buildDAG(t, bs, 2, 1) + + tracker := walker.NewMapTracker() + realFetch := walker.LinksFetcherFromBlockstore(bs) + failChildFetch := func(ctx context.Context, c cid.Cid) ([]cid.Cid, error) { + if c != root { + return nil, assert.AnError + } + return realFetch(ctx, c) + } + + walker.WalkDAG(t.Context(), root, failChildFetch, func(c cid.Cid) bool { + return true + }, walker.WithVisitedTracker(tracker)) + + children, _ := realFetch(t.Context(), root) + for _, child := range children { + assert.True(t, tracker.Has(child), + "CID %s must be marked visited even after fetch error", child) + } + }) +} + +func TestWalkDAG_StopConditions(t *testing.T) { + // emit returning false must stop the walk immediately. + t.Run("emit false stops walk after N CIDs", func(t *testing.T) { + bs := newTestBlockstore() + root, _ := buildDAG(t, bs, 3, 3) + + count := 0 + fetch := walker.LinksFetcherFromBlockstore(bs) + err := walker.WalkDAG(t.Context(), root, fetch, func(c cid.Cid) bool { + count++ + return count < 5 + }) + require.NoError(t, err) + assert.Equal(t, 5, count, + "walk should stop after emit returns false") + }) + + // Context cancellation during the walk should stop it and return + // the context error. + t.Run("context cancellation stops walk mid-flight", func(t *testing.T) { + bs := newTestBlockstore() + root, _ := buildDAG(t, bs, 3, 3) + + ctx, cancel := context.WithCancel(t.Context()) + count := 0 + fetch := walker.LinksFetcherFromBlockstore(bs) + err := walker.WalkDAG(ctx, root, fetch, func(c cid.Cid) bool { + count++ + if count >= 3 { + cancel() + } + return true + }) + assert.ErrorIs(t, err, context.Canceled) + }) + + // An already-cancelled context should return immediately without + // visiting any CIDs. + t.Run("already-cancelled context returns immediately", func(t *testing.T) { + bs := newTestBlockstore() + root, _ := buildDAG(t, bs, 2, 1) + + ctx, cancel := context.WithCancel(t.Context()) + cancel() + + var visited []cid.Cid + fetch := walker.LinksFetcherFromBlockstore(bs) + err := walker.WalkDAG(ctx, root, fetch, func(c cid.Cid) bool { + visited = append(visited, c) + return true + }) + assert.ErrorIs(t, err, context.Canceled) + assert.Empty(t, visited, + "no CIDs should be visited with cancelled context") + }) +} diff --git a/examples/go.mod b/examples/go.mod index bbbbbbaa7..c30c2da66 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -50,7 +50,7 @@ require ( github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/huin/goupnp v1.3.0 // indirect - github.com/ipfs/bbloom v0.0.4 // indirect + github.com/ipfs/bbloom v0.0.5-0.20260313134946-e882fc33651e // indirect github.com/ipfs/go-bitfield v1.1.0 // indirect github.com/ipfs/go-cidutil v0.1.1 // indirect github.com/ipfs/go-dsqueue v0.2.0 // indirect diff --git a/examples/go.sum b/examples/go.sum index bb0f2bd2d..c6ff32c59 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -239,8 +239,8 @@ github.com/huin/goupnp v1.3.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFck github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= -github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= -github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= +github.com/ipfs/bbloom v0.0.5-0.20260313134946-e882fc33651e h1:mKp9/zRW6JAXYLUQWEPCuS6WLWi1YYimMo9UrUFBRG0= +github.com/ipfs/bbloom v0.0.5-0.20260313134946-e882fc33651e/go.mod h1:lDy3A3i6ndgEW2z1CaRFvDi5/ZTzgM1IxA/pkL7Wgts= github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA= github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU= github.com/ipfs/go-block-format v0.2.3 h1:mpCuDaNXJ4wrBJLrtEaGFGXkferrw5eqVvzaHhtFKQk= diff --git a/go.mod b/go.mod index adc82a12a..5b90f455d 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/google/uuid v1.6.0 github.com/gorilla/mux v1.8.1 github.com/hashicorp/golang-lru/v2 v2.0.7 - github.com/ipfs/bbloom v0.0.4 + github.com/ipfs/bbloom v0.0.5-0.20260313134946-e882fc33651e github.com/ipfs/go-bitfield v1.1.0 github.com/ipfs/go-block-format v0.2.3 github.com/ipfs/go-cid v0.6.0 diff --git a/go.sum b/go.sum index 4fd50e03e..1d04fd01a 100644 --- a/go.sum +++ b/go.sum @@ -240,8 +240,8 @@ github.com/huin/goupnp v1.3.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFck github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= -github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= -github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= +github.com/ipfs/bbloom v0.0.5-0.20260313134946-e882fc33651e h1:mKp9/zRW6JAXYLUQWEPCuS6WLWi1YYimMo9UrUFBRG0= +github.com/ipfs/bbloom v0.0.5-0.20260313134946-e882fc33651e/go.mod h1:lDy3A3i6ndgEW2z1CaRFvDi5/ZTzgM1IxA/pkL7Wgts= github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA= github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU= github.com/ipfs/go-block-format v0.2.3 h1:mpCuDaNXJ4wrBJLrtEaGFGXkferrw5eqVvzaHhtFKQk= diff --git a/pinning/pinner/dspinner/uniquepinprovider.go b/pinning/pinner/dspinner/uniquepinprovider.go new file mode 100644 index 000000000..be52d2922 --- /dev/null +++ b/pinning/pinner/dspinner/uniquepinprovider.go @@ -0,0 +1,155 @@ +package dspinner + +import ( + "context" + + "github.com/ipfs/boxo/blockstore" + "github.com/ipfs/boxo/dag/walker" + ipfspinner "github.com/ipfs/boxo/pinning/pinner" + "github.com/ipfs/boxo/provider" + "github.com/ipfs/go-cid" + mh "github.com/multiformats/go-multihash" +) + +// NewUniquePinnedProvider returns a [provider.KeyChanFunc] that emits +// all blocks reachable from pinned roots, with bloom filter cross-pin +// deduplication via the shared [walker.VisitedTracker]. +// +// Processing order: recursive pin DAGs first (via [walker.WalkDAG]), +// then direct pins. This order ensures that by the time direct pins +// are processed, all recursive DAGs have been walked and their CIDs +// are in the tracker. +// +// The existing [NewPinnedProvider] is unchanged. This function is used +// only when the +unique strategy modifier is active. +func NewUniquePinnedProvider( + pinning ipfspinner.Pinner, + bs blockstore.Blockstore, + tracker walker.VisitedTracker, +) provider.KeyChanFunc { + return func(ctx context.Context) (<-chan cid.Cid, error) { + outCh := make(chan cid.Cid) + + go func() { + defer close(outCh) + + emit := func(c cid.Cid) bool { + select { + case outCh <- c: + return true + case <-ctx.Done(): + return false + } + } + + fetch := walker.LinksFetcherFromBlockstore(bs) + + // 1. Walk recursive pin DAGs (bulk of dedup benefit). + // A corrupted pin entry is logged and skipped so it does + // not prevent remaining pins from being provided. + for sc := range pinning.RecursiveKeys(ctx, false) { + if sc.Err != nil { + log.Errorf("unique provide recursive pins: %s", sc.Err) + continue + } + if err := walker.WalkDAG(ctx, sc.Pin.Key, fetch, emit, walker.WithVisitedTracker(tracker)); err != nil { + return // context cancelled + } + } + + // 2. Direct pins (emit if not already visited). + // Same best-effort: skip corrupted entries. + for sc := range pinning.DirectKeys(ctx, false) { + if sc.Err != nil { + log.Errorf("unique provide direct pins: %s", sc.Err) + continue + } + // skip identity CIDs: content is inline, no need to provide + if sc.Pin.Key.Prefix().MhType == mh.IDENTITY { + continue + } + // skip if already visited (by a recursive pin walk above) + if !tracker.Visit(sc.Pin.Key) { + continue + } + // emit returns false when context is cancelled + // (consumer stopped reading from the channel) + if !emit(sc.Pin.Key) { + return + } + } + }() + + return outCh, nil + } +} + +// NewPinnedEntityRootsProvider returns a [provider.KeyChanFunc] that +// emits entity roots (files, directories, HAMT shards) reachable from +// pinned roots, skipping internal file chunks. Uses +// [walker.WalkEntityRoots] with the shared [walker.VisitedTracker] +// for cross-pin deduplication. +// +// Same processing order as [NewUniquePinnedProvider]: recursive pins +// first, direct pins second. +func NewPinnedEntityRootsProvider( + pinning ipfspinner.Pinner, + bs blockstore.Blockstore, + tracker walker.VisitedTracker, +) provider.KeyChanFunc { + return func(ctx context.Context) (<-chan cid.Cid, error) { + outCh := make(chan cid.Cid) + + go func() { + defer close(outCh) + + emit := func(c cid.Cid) bool { + select { + case outCh <- c: + return true + case <-ctx.Done(): + return false + } + } + + fetch := walker.NodeFetcherFromBlockstore(bs) + + // 1. Walk recursive pin DAGs for entity roots. + // A corrupted pin entry is logged and skipped so it does + // not prevent remaining pins from being provided. + for sc := range pinning.RecursiveKeys(ctx, false) { + if sc.Err != nil { + log.Errorf("entity provide recursive pins: %s", sc.Err) + continue + } + if err := walker.WalkEntityRoots(ctx, sc.Pin.Key, fetch, emit, walker.WithVisitedTracker(tracker)); err != nil { + return // context cancelled + } + } + + // 2. Direct pins (always entity roots by definition). + // Same best-effort: skip corrupted entries. + for sc := range pinning.DirectKeys(ctx, false) { + if sc.Err != nil { + log.Errorf("entity provide direct pins: %s", sc.Err) + continue + } + // skip identity CIDs: content is inline, no need to provide + if sc.Pin.Key.Prefix().MhType == mh.IDENTITY { + continue + } + // skip if already visited (by a recursive pin walk above) + if !tracker.Visit(sc.Pin.Key) { + continue + } + // emit returns false when context is cancelled + // (consumer stopped reading from the channel) + if !emit(sc.Pin.Key) { + return + } + } + }() + + return outCh, nil + } +} diff --git a/pinning/pinner/dspinner/uniquepinprovider_test.go b/pinning/pinner/dspinner/uniquepinprovider_test.go new file mode 100644 index 000000000..5e903285f --- /dev/null +++ b/pinning/pinner/dspinner/uniquepinprovider_test.go @@ -0,0 +1,333 @@ +package dspinner + +import ( + "testing" + + "github.com/ipfs/boxo/blockservice" + "github.com/ipfs/boxo/blockstore" + "github.com/ipfs/boxo/dag/walker" + "github.com/ipfs/boxo/exchange/offline" + "github.com/ipfs/boxo/ipld/merkledag" + mdutils "github.com/ipfs/boxo/ipld/merkledag/test" + ft "github.com/ipfs/boxo/ipld/unixfs" + ipfspinner "github.com/ipfs/boxo/pinning/pinner" + "github.com/ipfs/boxo/provider" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + format "github.com/ipfs/go-ipld-format" + mh "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func setupPinTest(t *testing.T) (blockstore.Blockstore, ipfspinner.Pinner, format.DAGService) { + t.Helper() + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + bs := blockstore.NewBlockstore(ds) + bserv := blockservice.New(bs, offline.Exchange(bs)) + dserv := merkledag.NewDAGService(bserv) + pinner, err := New(t.Context(), ds, dserv) + require.NoError(t, err) + return bs, pinner, dserv +} + +// TestUniquePinnedProvider_DedupAcrossPins verifies that blocks shared +// between two recursive pins are emitted only once. This is the core +// use case: append-only datasets where each pin is the previous one +// plus a small delta, sharing the majority of their DAGs. +func TestUniquePinnedProvider_DedupAcrossPins(t *testing.T) { + bs, pinner, dserv := setupPinTest(t) + + // two DAGs that share a common subtree + shared := merkledag.NodeWithData([]byte("shared")) + require.NoError(t, dserv.Add(t.Context(), shared)) + + root1 := merkledag.NodeWithData([]byte("root1")) + root1.AddNodeLink("shared", shared) + require.NoError(t, dserv.Add(t.Context(), root1)) + + root2 := merkledag.NodeWithData([]byte("root2")) + root2.AddNodeLink("shared", shared) + require.NoError(t, dserv.Add(t.Context(), root2)) + + require.NoError(t, pinner.PinWithMode(t.Context(), root1.Cid(), ipfspinner.Recursive, "pin1")) + require.NoError(t, pinner.PinWithMode(t.Context(), root2.Cid(), ipfspinner.Recursive, "pin2")) + + tracker := walker.NewMapTracker() + keyChanF := NewUniquePinnedProvider(pinner, bs, tracker) + ch, err := keyChanF(t.Context()) + require.NoError(t, err) + + visited := make(map[cid.Cid]int) + for c := range ch { + visited[c]++ + } + + assert.Equal(t, 1, visited[shared.Cid()], + "shared block emitted exactly once across both pins") + assert.Equal(t, 1, visited[root1.Cid()]) + assert.Equal(t, 1, visited[root2.Cid()]) + assert.Len(t, visited, 3, "root1 + root2 + shared") +} + +// TestUniquePinnedProvider_DirectPins verifies that direct pins are +// emitted and deduplicated against recursive pin walks. A CID that +// appears both as a direct pin and within a recursive pin DAG should +// be emitted only once. +func TestUniquePinnedProvider_DirectPins(t *testing.T) { + bs, pinner, dserv := setupPinTest(t) + + leaf := merkledag.NodeWithData([]byte("leaf")) + require.NoError(t, dserv.Add(t.Context(), leaf)) + + root := merkledag.NodeWithData([]byte("root")) + root.AddNodeLink("leaf", leaf) + require.NoError(t, dserv.Add(t.Context(), root)) + + // pin root recursively (covers root + leaf) + require.NoError(t, pinner.PinWithMode(t.Context(), root.Cid(), ipfspinner.Recursive, "rec")) + // also direct-pin the leaf + require.NoError(t, pinner.PinWithMode(t.Context(), leaf.Cid(), ipfspinner.Direct, "dir")) + + tracker := walker.NewMapTracker() + keyChanF := NewUniquePinnedProvider(pinner, bs, tracker) + ch, err := keyChanF(t.Context()) + require.NoError(t, err) + + visited := make(map[cid.Cid]int) + for c := range ch { + visited[c]++ + } + + assert.Equal(t, 1, visited[leaf.Cid()], + "leaf emitted once despite being both recursively and directly pinned") + assert.Equal(t, 1, visited[root.Cid()]) + assert.Len(t, visited, 2) +} + +// TestUniquePinnedProvider_BufferedProviderCompat verifies that +// NewUniquePinnedProvider works with NewBufferedProvider, matching +// the wrapping pattern used in kubo's createKeyProvider. +func TestUniquePinnedProvider_BufferedProviderCompat(t *testing.T) { + bs, pinner, dserv := setupPinTest(t) + daggen := mdutils.NewDAGGenerator() + root, allCids, err := daggen.MakeDagNode(dserv.Add, 3, 2) + require.NoError(t, err) + require.NoError(t, pinner.PinWithMode(t.Context(), root, ipfspinner.Recursive, "test")) + + tracker := walker.NewMapTracker() + keyChanF := provider.NewBufferedProvider( + NewUniquePinnedProvider(pinner, bs, tracker)) + ch, err := keyChanF(t.Context()) + require.NoError(t, err) + + count := 0 + for range ch { + count++ + } + assert.Equal(t, len(allCids), count) +} + +// TestPinnedEntityRootsProvider_SkipsChunks verifies that the entity +// roots provider emits file roots but does not descend into chunks. +// This is the core optimization of the +entities strategy applied to +// pinned content. +func TestPinnedEntityRootsProvider_SkipsChunks(t *testing.T) { + bs, pinner, dserv := setupPinTest(t) + + // chunked file: root -> chunk1, chunk2 + chunk1 := merkledag.NewRawNode([]byte("chunk1")) + chunk2 := merkledag.NewRawNode([]byte("chunk2")) + require.NoError(t, dserv.Add(t.Context(), chunk1)) + require.NoError(t, dserv.Add(t.Context(), chunk2)) + + fsn := ft.NewFSNode(ft.TFile) + fsn.AddBlockSize(6) + fsn.AddBlockSize(6) + fileData, err := fsn.GetBytes() + require.NoError(t, err) + fileNode := merkledag.NodeWithData(fileData) + fileNode.AddNodeLink("", chunk1) + fileNode.AddNodeLink("", chunk2) + require.NoError(t, dserv.Add(t.Context(), fileNode)) + + // directory containing the file + dir := ft.EmptyDirNode() + dir.AddNodeLink("big.bin", fileNode) + require.NoError(t, dserv.Add(t.Context(), dir)) + require.NoError(t, pinner.PinWithMode(t.Context(), dir.Cid(), ipfspinner.Recursive, "test")) + + tracker := walker.NewMapTracker() + keyChanF := NewPinnedEntityRootsProvider(pinner, bs, tracker) + ch, err := keyChanF(t.Context()) + require.NoError(t, err) + + var visited []cid.Cid + for c := range ch { + visited = append(visited, c) + } + + assert.Contains(t, visited, dir.Cid(), "directory emitted") + assert.Contains(t, visited, fileNode.Cid(), "file root emitted") + assert.NotContains(t, visited, chunk1.Cid(), "chunk1 NOT emitted") + assert.NotContains(t, visited, chunk2.Cid(), "chunk2 NOT emitted") + assert.Len(t, visited, 2, "dir + file root only") +} + +// TestPinnedEntityRootsProvider_DedupAcrossPins verifies that entity +// roots shared between pins are emitted only once, same as +// NewUniquePinnedProvider but at the entity level. +func TestPinnedEntityRootsProvider_DedupAcrossPins(t *testing.T) { + bs, pinner, dserv := setupPinTest(t) + + // shared file across two directories + sharedFile := merkledag.NodeWithData(func() []byte { + fsn := ft.NewFSNode(ft.TFile) + fsn.SetData([]byte("shared")) + b, _ := fsn.GetBytes() + return b + }()) + require.NoError(t, dserv.Add(t.Context(), sharedFile)) + + // directories must be distinct (different unique files) so they + // get different CIDs + unique1 := merkledag.NodeWithData(func() []byte { + fsn := ft.NewFSNode(ft.TFile) + fsn.SetData([]byte("unique1")) + b, _ := fsn.GetBytes() + return b + }()) + unique2 := merkledag.NodeWithData(func() []byte { + fsn := ft.NewFSNode(ft.TFile) + fsn.SetData([]byte("unique2")) + b, _ := fsn.GetBytes() + return b + }()) + require.NoError(t, dserv.Add(t.Context(), unique1)) + require.NoError(t, dserv.Add(t.Context(), unique2)) + + dir1 := ft.EmptyDirNode() + dir1.AddNodeLink("shared.txt", sharedFile) + dir1.AddNodeLink("unique.txt", unique1) + require.NoError(t, dserv.Add(t.Context(), dir1)) + + dir2 := ft.EmptyDirNode() + dir2.AddNodeLink("shared.txt", sharedFile) + dir2.AddNodeLink("unique.txt", unique2) + require.NoError(t, dserv.Add(t.Context(), dir2)) + + require.NoError(t, pinner.PinWithMode(t.Context(), dir1.Cid(), ipfspinner.Recursive, "pin1")) + require.NoError(t, pinner.PinWithMode(t.Context(), dir2.Cid(), ipfspinner.Recursive, "pin2")) + + tracker := walker.NewMapTracker() + keyChanF := NewPinnedEntityRootsProvider(pinner, bs, tracker) + ch, err := keyChanF(t.Context()) + require.NoError(t, err) + + visited := make(map[cid.Cid]int) + for c := range ch { + visited[c]++ + } + + assert.Equal(t, 1, visited[sharedFile.Cid()], + "shared file emitted once across both pins") + assert.Len(t, visited, 5, "dir1 + dir2 + shared + unique1 + unique2") +} + +// --- identity CID filtering --- + +func makeIdentityCID(t *testing.T, data []byte) cid.Cid { + t.Helper() + hash, err := mh.Encode(data, mh.IDENTITY) + require.NoError(t, err) + return cid.NewCidV1(cid.Raw, hash) +} + +// TestUniquePinnedProvider_IdentityDirectPin verifies that a +// directly-pinned identity CID is not emitted. Identity CIDs embed +// data inline, so providing them to the DHT is wasteful. +func TestUniquePinnedProvider_IdentityDirectPin(t *testing.T) { + bs, pinner, dserv := setupPinTest(t) + + normal := merkledag.NodeWithData([]byte("normal")) + require.NoError(t, dserv.Add(t.Context(), normal)) + + idCid := makeIdentityCID(t, []byte("inline")) + + require.NoError(t, pinner.PinWithMode(t.Context(), normal.Cid(), ipfspinner.Direct, "normal")) + require.NoError(t, pinner.PinWithMode(t.Context(), idCid, ipfspinner.Direct, "identity")) + + tracker := walker.NewMapTracker() + keyChanF := NewUniquePinnedProvider(pinner, bs, tracker) + ch, err := keyChanF(t.Context()) + require.NoError(t, err) + + var visited []cid.Cid + for c := range ch { + visited = append(visited, c) + } + + assert.Contains(t, visited, normal.Cid(), "normal direct pin emitted") + assert.NotContains(t, visited, idCid, "identity direct pin must not be emitted") +} + +// TestUniquePinnedProvider_IdentityInRecursiveDAG verifies that +// identity CIDs within a recursive pin DAG are not emitted. The +// walker traverses through them but skips emission. +func TestUniquePinnedProvider_IdentityInRecursiveDAG(t *testing.T) { + bs, pinner, dserv := setupPinTest(t) + + idChild := makeIdentityCID(t, []byte("inline-leaf")) + + root := merkledag.NodeWithData([]byte("root-with-id")) + require.NoError(t, root.AddRawLink("inline", &format.Link{Cid: idChild})) + require.NoError(t, dserv.Add(t.Context(), root)) + + require.NoError(t, pinner.PinWithMode(t.Context(), root.Cid(), ipfspinner.Recursive, "rec")) + + tracker := walker.NewMapTracker() + keyChanF := NewUniquePinnedProvider(pinner, bs, tracker) + ch, err := keyChanF(t.Context()) + require.NoError(t, err) + + var visited []cid.Cid + for c := range ch { + visited = append(visited, c) + } + + assert.Contains(t, visited, root.Cid(), "non-identity root emitted") + assert.NotContains(t, visited, idChild, "identity child must not be emitted") +} + +// TestPinnedEntityRootsProvider_IdentityDirectPin verifies that the +// entity roots provider also filters identity CIDs from direct pins. +func TestPinnedEntityRootsProvider_IdentityDirectPin(t *testing.T) { + bs, pinner, dserv := setupPinTest(t) + + normal := merkledag.NodeWithData(func() []byte { + fsn := ft.NewFSNode(ft.TFile) + fsn.SetData([]byte("normal")) + b, _ := fsn.GetBytes() + return b + }()) + require.NoError(t, dserv.Add(t.Context(), normal)) + + idCid := makeIdentityCID(t, []byte("inline")) + + require.NoError(t, pinner.PinWithMode(t.Context(), normal.Cid(), ipfspinner.Direct, "normal")) + require.NoError(t, pinner.PinWithMode(t.Context(), idCid, ipfspinner.Direct, "identity")) + + tracker := walker.NewMapTracker() + keyChanF := NewPinnedEntityRootsProvider(pinner, bs, tracker) + ch, err := keyChanF(t.Context()) + require.NoError(t, err) + + var visited []cid.Cid + for c := range ch { + visited = append(visited, c) + } + + assert.Contains(t, visited, normal.Cid(), "normal direct pin emitted") + assert.NotContains(t, visited, idCid, "identity direct pin must not be emitted") +} diff --git a/provider/provider.go b/provider/provider.go index 1429bf17e..a52d77b26 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -111,8 +111,54 @@ func NewPrioritizedProvider(streams ...KeyChanFunc) KeyChanFunc { last := len(streams) - 1 for i, stream := range streams { if err := handleStream(stream, i < last); err != nil { - log.Warnf("error in prioritized strategy while handling CID stream %d: %w", i, err) - return + log.Errorf("error in prioritized strategy while handling CID stream %d: %s", i, err) + continue // best-effort: e.g. MFS flush error should not prevent pinned content from being provided + } + } + }() + + return outCh, nil + } +} + +// NewConcatProvider concatenates multiple KeyChanFunc streams into one, +// running them sequentially in order. All CIDs from each stream are +// forwarded to the output channel without deduplication. +// +// Use this when the input streams are already deduplicated externally +// (e.g. via a shared [walker.VisitedTracker]). For streams that may +// produce overlapping CIDs, use [NewPrioritizedProvider] instead, which +// maintains its own visited set. +// +// Like [NewPrioritizedProvider], a failure in one stream's KeyChanFunc +// is logged and skipped -- remaining streams still run. +func NewConcatProvider(streams ...KeyChanFunc) KeyChanFunc { + return func(ctx context.Context) (<-chan cid.Cid, error) { + outCh := make(chan cid.Cid) + + go func() { + defer close(outCh) + for i, stream := range streams { + ch, err := stream(ctx) + if err != nil { + log.Errorf("error in concat strategy while handling CID stream %d: %s", i, err) + continue + } + drain: + for { + select { + case <-ctx.Done(): + return + case c, ok := <-ch: + if !ok { + break drain + } + select { + case <-ctx.Done(): + return + case outCh <- c: + } + } } } }() diff --git a/provider/reprovider_test.go b/provider/reprovider_test.go index e65c7ffcb..357af5747 100644 --- a/provider/reprovider_test.go +++ b/provider/reprovider_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto/rand" + "fmt" "runtime" "slices" "strconv" @@ -322,3 +323,220 @@ func TestNewPrioritizedProvider(t *testing.T) { }) } } + +// TestPrioritizedProvider_StreamErrorContinues verifies that a failure +// in one stream does not prevent subsequent streams from running. +// e.g. MFS flush failure should not block pinned content from being provided. +func TestPrioritizedProvider_StreamErrorContinues(t *testing.T) { + cids := makeCIDs(3) + + failingStream := func(_ context.Context) (<-chan cid.Cid, error) { + return nil, fmt.Errorf("stream init failed") + } + goodStream := newMockKeyChanFunc(cids) + + // failing stream first, good stream second + stream := NewPrioritizedProvider(failingStream, goodStream) + ch, err := stream(t.Context()) + require.NoError(t, err) + + var received []cid.Cid + for c := range ch { + received = append(received, c) + } + // good stream should still produce its CIDs despite the first stream failing + require.Equal(t, cids, received) +} + +// TestPrioritizedProvider_ContextCancellation verifies that context +// cancellation stops the provider cleanly without hanging. +func TestPrioritizedProvider_ContextCancellation(t *testing.T) { + // slow stream that blocks until context is cancelled + slowStream := func(ctx context.Context) (<-chan cid.Cid, error) { + ch := make(chan cid.Cid) + go func() { + defer close(ch) + <-ctx.Done() + }() + return ch, nil + } + + ctx, cancel := context.WithCancel(t.Context()) + stream := NewPrioritizedProvider(slowStream) + ch, err := stream(ctx) + require.NoError(t, err) + + cancel() + // channel should close promptly after cancellation + for range ch { + t.Fatal("should not receive CIDs after cancellation") + } +} + +// TestPrioritizedProvider_ThreeStreams verifies correct ordering and +// dedup across three streams (the common case: MFS + pinned + direct). +func TestPrioritizedProvider_ThreeStreams(t *testing.T) { + cids := makeCIDs(9) + s1 := newMockKeyChanFunc(cids[:3]) // highest priority + s2 := newMockKeyChanFunc(append(cids[1:4:4], cids[4:6]...)) // overlaps with s1 + s3 := newMockKeyChanFunc(cids[6:]) // lowest priority + + stream := NewPrioritizedProvider(s1, s2, s3) + ch, err := stream(t.Context()) + require.NoError(t, err) + + var received []cid.Cid + for c := range ch { + received = append(received, c) + } + // s1: 0,1,2 (all new) + // s2: 1,2 skipped (seen in s1), 3,4,5 new + // s3: 6,7,8 (all new, last stream so no dedup tracking) + require.Equal(t, []cid.Cid{ + cids[0], cids[1], cids[2], // s1 + cids[3], cids[4], cids[5], // s2 (deduped 1,2) + cids[6], cids[7], cids[8], // s3 + }, received) +} + +// TestPrioritizedProvider_AllStreamsFail verifies that when every +// stream fails, the output channel closes cleanly with no CIDs. +func TestPrioritizedProvider_AllStreamsFail(t *testing.T) { + fail := func(_ context.Context) (<-chan cid.Cid, error) { + return nil, fmt.Errorf("fail") + } + stream := NewPrioritizedProvider(fail, fail, fail) + ch, err := stream(t.Context()) + require.NoError(t, err) + + var received []cid.Cid + for c := range ch { + received = append(received, c) + } + require.Empty(t, received) +} + +// TestPrioritizedProvider_ErrorContinues verifies that a failing stream +// does not prevent subsequent streams from being processed. This is a +// regression test for a bug where the goroutine returned on error +// instead of continuing to the next stream. +func TestPrioritizedProvider_ErrorContinues(t *testing.T) { + cids := makeCIDs(3) + fail := func(_ context.Context) (<-chan cid.Cid, error) { + return nil, fmt.Errorf("stream error") + } + good := newMockKeyChanFunc(cids) + + stream := NewPrioritizedProvider(fail, good) + ch, err := stream(t.Context()) + require.NoError(t, err) + + var received []cid.Cid + for c := range ch { + received = append(received, c) + } + require.Equal(t, cids, received, + "CIDs from the good stream must still be emitted after a prior stream fails") +} + +// TestNewConcatProvider verifies that ConcatProvider concatenates +// streams in order without deduplication. Unlike PrioritizedProvider, +// duplicate CIDs across streams are NOT filtered. +func TestNewConcatProvider(t *testing.T) { + cids := makeCIDs(6) + + t.Run("concatenates in order", func(t *testing.T) { + s1 := newMockKeyChanFunc(cids[:3]) + s2 := newMockKeyChanFunc(cids[3:]) + + stream := NewConcatProvider(s1, s2) + ch, err := stream(t.Context()) + require.NoError(t, err) + + var received []cid.Cid + for c := range ch { + received = append(received, c) + } + require.Equal(t, cids, received) + }) + + t.Run("duplicates are NOT filtered", func(t *testing.T) { + // same CIDs in both streams -- ConcatProvider passes them all through + s1 := newMockKeyChanFunc(cids[:3]) + s2 := newMockKeyChanFunc(cids[:3]) + + stream := NewConcatProvider(s1, s2) + ch, err := stream(t.Context()) + require.NoError(t, err) + + var received []cid.Cid + for c := range ch { + received = append(received, c) + } + expected := append(cids[:3:3], cids[:3]...) + require.Equal(t, expected, received) + }) + + t.Run("stream error skips to next", func(t *testing.T) { + failing := func(_ context.Context) (<-chan cid.Cid, error) { + return nil, fmt.Errorf("init failed") + } + good := newMockKeyChanFunc(cids[:3]) + + stream := NewConcatProvider(failing, good) + ch, err := stream(t.Context()) + require.NoError(t, err) + + var received []cid.Cid + for c := range ch { + received = append(received, c) + } + require.Equal(t, cids[:3], received) + }) + + t.Run("single stream", func(t *testing.T) { + stream := NewConcatProvider(newMockKeyChanFunc(cids)) + ch, err := stream(t.Context()) + require.NoError(t, err) + + var received []cid.Cid + for c := range ch { + received = append(received, c) + } + require.Equal(t, cids, received) + }) + + t.Run("empty streams", func(t *testing.T) { + empty := newMockKeyChanFunc(nil) + stream := NewConcatProvider(empty, newMockKeyChanFunc(cids[:2])) + ch, err := stream(t.Context()) + require.NoError(t, err) + + var received []cid.Cid + for c := range ch { + received = append(received, c) + } + require.Equal(t, cids[:2], received) + }) + + t.Run("context cancellation stops cleanly", func(t *testing.T) { + slowStream := func(ctx context.Context) (<-chan cid.Cid, error) { + ch := make(chan cid.Cid) + go func() { + defer close(ch) + <-ctx.Done() + }() + return ch, nil + } + + ctx, cancel := context.WithCancel(t.Context()) + stream := NewConcatProvider(slowStream) + ch, err := stream(ctx) + require.NoError(t, err) + + cancel() + for range ch { + t.Fatal("should not receive CIDs after cancellation") + } + }) +}