-
Notifications
You must be signed in to change notification settings - Fork 153
feat(dag/walker): opt-in BloomTracker to avoid duplicated walks #1124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
lidel
wants to merge
20
commits into
main
Choose a base branch
from
feat/provide-entity-roots-with-dedup
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
2b591c4
feat(dag/walker): add VisitedTracker with BloomTracker and MapTracker
lidel 224c2ae
feat(dag/walker): add WalkDAG with codec-agnostic link extraction
lidel 8c8f5f6
feat(dag/walker): add WalkEntityRoots for entity-aware traversal
lidel 99495ec
test(dag/walker): add BloomTracker FP rate regression tests
lidel 53d7674
fix(provider): stream error continues to next, add NewConcatProvider
lidel e38f023
feat(pinner): add NewUniquePinnedProvider and NewPinnedEntityRootsPro…
lidel 7b8f853
test: add PrioritizedProvider error-continue regression test
lidel 685c82e
refactor(provider): use labeled break in NewConcatProvider for consis…
lidel b75fa03
refactor(dag/walker): extract shared linkSystemForBlockstore helper
lidel 5147959
fix(dag/walker): skip emitting identity CIDs, add tests
lidel 609ff3d
test(dag/walker): add symlink entity detection tests
lidel 8cfa9a0
refactor: consolidate identity CID tests, filter direct pins
lidel 11cd29e
fix(dag/walker): visit siblings in left-to-right link order
lidel 44bb0a3
fix(pinner): continue on pin iteration error in unique providers
lidel 56a0a31
docs(dag/walker): document implicit behaviors
lidel dcfda13
Merge branch 'main' into feat/provide-entity-roots-with-dedup
lidel 14c5f91
Merge remote-tracking branch 'origin/main' into feat/provide-entity-r…
lidel 0fc1a0b
fix: address review feedback from gammazero
lidel 577fa3f
feat(walker): log bloom tracker creation and autoscaling
lidel c8920a2
feat(walker): add Deduplicated() to BloomTracker and MapTracker
lidel File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| // Package walker provides memory-efficient DAG traversal with | ||
| // deduplication. Optimized for the IPFS provide system, but useful | ||
| // anywhere repeated DAG walks need to skip already-visited subtrees. | ||
| // | ||
| // The primary entry point is [WalkDAG], which walks a DAG rooted at a | ||
| // given CID, emitting each visited CID to a callback. When combined | ||
| // with a [VisitedTracker] (e.g. [BloomTracker]), entire subtrees | ||
| // already seen are skipped in O(1). | ||
| // | ||
| // For entity-aware traversal that only emits file/directory/HAMT roots | ||
| // instead of every block, see [WalkEntityRoots]. | ||
| // | ||
| // Blocks are decoded using the codecs registered in the process via | ||
| // the global multicodec registry. In a standard kubo build this | ||
| // includes dag-pb, dag-cbor, dag-json, cbor, json, and raw. | ||
| // | ||
| // Use [LinksFetcherFromBlockstore] to create a fetcher backed by a | ||
| // local blockstore. For custom link extraction (e.g. a different codec | ||
| // registry or non-blockstore storage), pass your own [LinksFetcher] | ||
| // function directly to [WalkDAG]. | ||
| package walker |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,192 @@ | ||
| package walker | ||
|
|
||
| import ( | ||
| "context" | ||
| "slices" | ||
|
|
||
| blockstore "github.com/ipfs/boxo/blockstore" | ||
| "github.com/ipfs/boxo/ipld/unixfs" | ||
| cid "github.com/ipfs/go-cid" | ||
| ipld "github.com/ipld/go-ipld-prime" | ||
| cidlink "github.com/ipld/go-ipld-prime/linking/cid" | ||
| basicnode "github.com/ipld/go-ipld-prime/node/basic" | ||
| mh "github.com/multiformats/go-multihash" | ||
| ) | ||
|
|
||
| // EntityType represents the semantic type of a DAG entity. | ||
| type EntityType int | ||
|
|
||
| const ( | ||
| EntityUnknown EntityType = iota | ||
| EntityFile // UnixFS file root (not its chunks) | ||
| EntityDirectory // UnixFS flat directory | ||
| EntityHAMTShard // UnixFS HAMT sharded directory bucket | ||
| EntitySymlink // UnixFS symbolic link | ||
| ) | ||
|
|
||
| // NodeFetcher returns child link CIDs and entity type for a given CID. | ||
| // Used by [WalkEntityRoots] which needs UnixFS type detection to decide | ||
| // whether to descend into children (directories, HAMT shards) or stop | ||
| // (files, symlinks). | ||
| type NodeFetcher func(ctx context.Context, c cid.Cid) (linkCIDs []cid.Cid, entityType EntityType, err error) | ||
|
|
||
| // NodeFetcherFromBlockstore creates a [NodeFetcher] backed by a local | ||
| // blockstore. Like [LinksFetcherFromBlockstore], it decodes blocks via | ||
| // ipld-prime's global multicodec registry (dag-pb, dag-cbor, raw, etc.) | ||
| // and handles identity CIDs transparently via [blockstore.NewIdStore]. | ||
| // | ||
| // Entity type detection: | ||
| // - dag-pb with valid UnixFS Data: file, directory, HAMT shard, or symlink | ||
| // - dag-pb without valid UnixFS Data: EntityUnknown | ||
| // - raw codec: EntityFile (small file stored as a single raw block) | ||
| // - all other codecs (dag-cbor, dag-json, etc.): EntityUnknown | ||
| func NodeFetcherFromBlockstore(bs blockstore.Blockstore) NodeFetcher { | ||
| ls := linkSystemForBlockstore(bs) | ||
|
|
||
| return func(ctx context.Context, c cid.Cid) ([]cid.Cid, EntityType, error) { | ||
| lnk := cidlink.Link{Cid: c} | ||
| nd, err := ls.Load(ipld.LinkContext{Ctx: ctx}, lnk, basicnode.Prototype.Any) | ||
| if err != nil { | ||
| return nil, EntityUnknown, err | ||
| } | ||
|
|
||
| links := collectLinks(c, nd) | ||
| entityType := detectEntityType(c, nd) | ||
| return links, entityType, nil | ||
| } | ||
| } | ||
|
|
||
| // detectEntityType infers the UnixFS entity type from an ipld-prime | ||
| // decoded node. For dag-pb nodes, it reads the "Data" field and parses | ||
| // it as UnixFS protobuf. For raw codec nodes, it returns EntityFile. | ||
| // For everything else, it returns EntityUnknown. | ||
| func detectEntityType(c cid.Cid, nd ipld.Node) EntityType { | ||
| codec := c.Prefix().Codec | ||
|
|
||
| // raw codec: small file stored as a single block | ||
| if codec == cid.Raw { | ||
| return EntityFile | ||
| } | ||
|
|
||
| // only dag-pb has UnixFS semantics; other codecs are unknown | ||
| if codec != cid.DagProtobuf { | ||
| return EntityUnknown | ||
| } | ||
|
|
||
| // dag-pb: try to read the "Data" field for UnixFS type | ||
| dataField, err := nd.LookupByString("Data") | ||
| if err != nil || dataField.IsAbsent() || dataField.IsNull() { | ||
| return EntityUnknown | ||
| } | ||
|
|
||
| dataBytes, err := dataField.AsBytes() | ||
| if err != nil { | ||
| return EntityUnknown | ||
| } | ||
|
|
||
| fsn, err := unixfs.FSNodeFromBytes(dataBytes) | ||
| if err != nil { | ||
| return EntityUnknown | ||
| } | ||
|
|
||
| switch fsn.Type() { | ||
| case unixfs.TFile, unixfs.TRaw: | ||
| return EntityFile | ||
| case unixfs.TDirectory: | ||
| return EntityDirectory | ||
| case unixfs.THAMTShard: | ||
| return EntityHAMTShard | ||
| case unixfs.TSymlink: | ||
| return EntitySymlink | ||
| default: | ||
| return EntityUnknown | ||
| } | ||
| } | ||
|
|
||
| // WalkEntityRoots traverses a DAG calling emit for each entity root. | ||
| // | ||
| // Entity roots are semantic boundaries in the DAG: | ||
| // - File/symlink roots: emitted, children (chunks) NOT traversed | ||
| // - Directory roots: emitted, children recursed | ||
| // - HAMT shard nodes: emitted (needed for directory enumeration), | ||
| // children recursed | ||
| // - Non-UnixFS nodes (dag-cbor, dag-json, etc.): emitted AND children | ||
| // recursed to discover further content. The +entities optimization | ||
| // (skip chunks) only applies to UnixFS files; for all other codecs, | ||
| // every reachable CID is emitted. | ||
| // - Raw leaf nodes: emitted (no children to recurse) | ||
| // | ||
| // Same traversal order as [WalkDAG]: pre-order DFS with left-to-right | ||
| // sibling visiting. Uses the same option types: [WithVisitedTracker] | ||
| // for bloom/map dedup across walks, [WithLocality] for MFS locality | ||
| // checks. | ||
| func WalkEntityRoots( | ||
| ctx context.Context, | ||
| root cid.Cid, | ||
| fetch NodeFetcher, | ||
| emit func(cid.Cid) bool, | ||
| opts ...Option, | ||
| ) error { | ||
| cfg := &walkConfig{} | ||
| for _, o := range opts { | ||
| o(cfg) | ||
| } | ||
|
|
||
| stack := []cid.Cid{root} | ||
|
|
||
| for len(stack) > 0 { | ||
| if ctx.Err() != nil { | ||
| return ctx.Err() | ||
| } | ||
|
|
||
| // pop | ||
| c := stack[len(stack)-1] | ||
| stack = stack[:len(stack)-1] | ||
|
|
||
| // dedup via tracker | ||
| if cfg.tracker != nil && !cfg.tracker.Visit(c) { | ||
| continue | ||
| } | ||
|
|
||
| // locality check | ||
| if cfg.locality != nil { | ||
| local, err := cfg.locality(ctx, c) | ||
| if err != nil { | ||
| log.Errorf("entity walk: locality check %s: %s", c, err) | ||
| continue | ||
| } | ||
| if !local { | ||
| continue | ||
| } | ||
| } | ||
|
|
||
| // fetch block and detect entity type | ||
| children, entityType, err := fetch(ctx, c) | ||
| if err != nil { | ||
| log.Errorf("entity walk: fetch %s: %s", c, err) | ||
| continue | ||
| } | ||
|
|
||
| // decide whether to descend into children | ||
| descend := entityType != EntityFile && entityType != EntitySymlink | ||
| if descend { | ||
| // reverse so first link is popped next (left-to-right | ||
| // sibling order, matching WalkDAG and legacy BlockAll) | ||
| slices.Reverse(children) | ||
| stack = append(stack, children...) | ||
| } | ||
|
|
||
| // skip identity CIDs: content is inline, no need to provide. | ||
| // we still descend (above) so an inlined dag-pb directory's | ||
| // normal children get provided. | ||
| if c.Prefix().MhType == mh.IDENTITY { | ||
| continue | ||
| } | ||
|
|
||
| if !emit(c) { | ||
| return nil | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.