diff --git a/rbdeal/deal_tracker.go b/rbdeal/deal_tracker.go index 5f7a3e4..ae3570c 100644 --- a/rbdeal/deal_tracker.go +++ b/rbdeal/deal_tracker.go @@ -32,6 +32,12 @@ func (r *ribs) dealTracker(ctx context.Context) { } defer closer() + fcdu, err := ributil.NewFullCDU(ctx, gw) + if err != nil { + log.Errorw("failed to create full CDU", "error", err) + return + } + for { checkStart := time.Now() select { @@ -40,7 +46,7 @@ func (r *ribs) dealTracker(ctx context.Context) { default: } - err := r.runDealCheckLoop(ctx, gw) + err := r.runDealCheckLoop(ctx, gw, fcdu) if err != nil { log.Errorw("deal check loop failed", "error", err) } @@ -59,13 +65,7 @@ func (r *ribs) dealTracker(ctx context.Context) { } } -func (r *ribs) runDealCheckLoop(ctx context.Context, gw api.Gateway) error { - gw, closer, err := client.NewGatewayRPCV1(ctx, r.lotusRPCAddr, nil) - if err != nil { - return xerrors.Errorf("creating gateway rpc client: %w", err) - } - defer closer() - +func (r *ribs) runDealCheckLoop(ctx context.Context, gw api.Gateway, fcdu *ributil.FullCDU) error { /* PUBLISHED DEAL CHECKS */ /* Wait for published deals to become active (or expire) */ @@ -111,7 +111,7 @@ func (r *ribs) runDealCheckLoop(ctx context.Context, gw api.Gateway) error { /* Wait for publish at "good-enough" finality */ { - cdm := ributil.CurrentDealInfoManager{CDAPI: gw} + cdm := ributil.CurrentDealInfoManager{CDAPI: fcdu} toCheck, err := r.db.PublishingDeals() if err != nil { @@ -205,7 +205,7 @@ func (r *ribs) runDealCheckLoop(ctx context.Context, gw api.Gateway) error { // check market deal state - // Inactive, expired deal cleanup + // Inactive, unpublished, expired deal cleanup { head, err := gw.ChainHead(ctx) // todo lookback if err != nil { diff --git a/ributil/arcbs.go b/ributil/arcbs.go new file mode 100644 index 0000000..c5195eb --- /dev/null +++ b/ributil/arcbs.go @@ -0,0 +1,124 @@ +package ributil + +import ( + "context" + "github.com/filecoin-project/lotus/blockstore" + lru "github.com/hashicorp/golang-lru/v2" + "github.com/ipfs/go-cid" + ipld "github.com/ipfs/go-ipld-format" + block "github.com/ipfs/go-libipfs/blocks" + "strings" +) + +type ArcBlockstore struct { + write blockstore.Blockstore + cache *lru.ARCCache[cid.Cid, block.Block] +} + +var CacheBstoreSize = (2048 << 20) / 16000 // 2G with average block size of 16KB + +func ARCStore(base blockstore.Blockstore) *ArcBlockstore { + c, _ := lru.NewARC[cid.Cid, block.Block](CacheBstoreSize) + + bs := &ArcBlockstore{ + write: base, + + cache: c, + } + return bs +} + +var ( + _ blockstore.Blockstore = (*ArcBlockstore)(nil) + _ blockstore.Viewer = (*ArcBlockstore)(nil) +) + +func (bs *ArcBlockstore) Flush(ctx context.Context) error { + return bs.write.Flush(ctx) +} + +func (bs *ArcBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + return bs.write.AllKeysChan(ctx) +} + +func (bs *ArcBlockstore) DeleteBlock(ctx context.Context, c cid.Cid) error { + bs.cache.Remove(c) + return bs.write.DeleteBlock(ctx, c) +} + +func (bs *ArcBlockstore) DeleteMany(ctx context.Context, cids []cid.Cid) error { + for _, c := range cids { + bs.cache.Remove(c) + } + return bs.write.DeleteMany(ctx, cids) +} + +func (bs *ArcBlockstore) View(ctx context.Context, c cid.Cid, callback func([]byte) error) error { + if blk, ok := bs.cache.Get(c); ok { + return callback(blk.RawData()) + } + + return bs.write.View(ctx, c, func(bytes []byte) error { + blk, err := block.NewBlockWithCid(bytes, c) + if err != nil { + return err + } + bs.cache.Add(c, blk) + + return callback(bytes) + }) +} + +func (bs *ArcBlockstore) Get(ctx context.Context, c cid.Cid) (block.Block, error) { + if blk, ok := bs.cache.Get(c); ok { + return blk, nil + } + + return bs.write.Get(ctx, c) +} + +func (bs *ArcBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) { + if blk, ok := bs.cache.Get(c); ok { + return len(blk.RawData()), nil + } + + b, err := bs.Get(ctx, c) + if err != nil { + if strings.Contains(err.Error(), "ipld: could not find") { + return 0, &ipld.ErrNotFound{Cid: c} + } + return 0, err + } + + return len(b.RawData()), nil +} + +func (bs *ArcBlockstore) Put(ctx context.Context, blk block.Block) error { + bs.cache.Add(blk.Cid(), blk) + + return nil +} + +func (bs *ArcBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) { + if bs.cache.Contains(c) { + return true, nil + } + + return bs.write.Has(ctx, c) +} + +func (bs *ArcBlockstore) HashOnRead(hor bool) { + bs.write.HashOnRead(hor) +} + +func (bs *ArcBlockstore) PutMany(ctx context.Context, blks []block.Block) error { + for _, blk := range blks { + if bs.cache.Contains(blk.Cid()) { + continue + } + + bs.cache.Add(blk.Cid(), blk) + } + + return nil +} diff --git a/ributil/chainbootstrap/mainnet.pi b/ributil/chainbootstrap/mainnet.pi new file mode 100644 index 0000000..4511339 --- /dev/null +++ b/ributil/chainbootstrap/mainnet.pi @@ -0,0 +1,7 @@ +/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN +/dnsaddr/bootstrap.libp2p.io/p2p/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa +/dnsaddr/bootstrap.libp2p.io/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb +/dnsaddr/bootstrap.libp2p.io/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt +/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ +/ip4/104.131.131.82/udp/4001/quic/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ +/dns4/elastic.dag.house/tcp/443/wss/p2p/QmQzqxhK82kAmKvARFZSkUVS6fo9sySaiogAnx5EnZ6ZmC \ No newline at end of file diff --git a/ributil/currentdealinfo.go b/ributil/currentdealinfo.go index a68578a..0e9bed7 100644 --- a/ributil/currentdealinfo.go +++ b/ributil/currentdealinfo.go @@ -101,7 +101,7 @@ func (mgr *CurrentDealInfoManager) FindCloseMsgTipset(ctx context.Context, tsk t return headTs.Key(), nil } - // load 15 tipsets back to curTs + // load /step/ tipsets back to curTs headTs = curTs toLoad := curTs.Height() - step curTs, err = mgr.CDAPI.ChainGetTipSetByHeight(ctx, toLoad, curTs.Key()) diff --git a/ributil/fallbackstore.go b/ributil/fallbackstore.go new file mode 100644 index 0000000..a6f9751 --- /dev/null +++ b/ributil/fallbackstore.go @@ -0,0 +1,108 @@ +package ributil + +import ( + "context" + "github.com/filecoin-project/lotus/blockstore" + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + ipld "github.com/ipfs/go-ipld-format" + "golang.org/x/xerrors" + "strings" + "sync" + "time" +) + +type FallbackStore struct { + blockstore.Blockstore + + lk sync.RWMutex + // missFn is the function that will be invoked on a local miss to pull the + // block from elsewhere. + missFn func(context.Context, cid.Cid) (blocks.Block, error) +} + +func (fbs *FallbackStore) SetFallback(missFn func(context.Context, cid.Cid) (blocks.Block, error)) { + fbs.lk.Lock() + defer fbs.lk.Unlock() + + fbs.missFn = missFn +} + +func (fbs *FallbackStore) getFallback(ctx context.Context, c cid.Cid) (blocks.Block, error) { + log.Warnf("fallbackstore: block not found locally, fetching from the network; cid: %s", c) + fbs.lk.RLock() + defer fbs.lk.RUnlock() + + if fbs.missFn == nil { + // FallbackStore wasn't configured yet (chainstore/bitswap aren't up yet) + // Wait for a bit and retry + fbs.lk.RUnlock() + time.Sleep(5 * time.Second) + fbs.lk.RLock() + + if fbs.missFn == nil { + log.Errorw("fallbackstore: missFn not configured yet") + return nil, ipld.ErrNotFound{Cid: c} + } + } + + ctx, cancel := context.WithTimeout(ctx, 120*time.Second) + defer cancel() + + b, err := fbs.missFn(ctx, c) + if err != nil { + return nil, err + } + + // chain bitswap puts blocks in temp blockstore which is cleaned up + // every few min (to drop any messages we fetched but don't want) + // in this case we want to keep this block around + if err := fbs.Put(ctx, b); err != nil { + return nil, xerrors.Errorf("persisting fallback-fetched block: %w", err) + } + return b, nil +} + +func (fbs *FallbackStore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) { + b, err := fbs.Blockstore.Get(ctx, c) + switch { + case err == nil: + return b, nil + case ipld.IsNotFound(err) || strings.Contains(err.Error(), "ipld: could not find"): + return fbs.getFallback(ctx, c) + default: + return b, xerrors.Errorf("fbs get: %w", err) + } +} + +func (fbs *FallbackStore) GetSize(ctx context.Context, c cid.Cid) (int, error) { + sz, err := fbs.Blockstore.GetSize(ctx, c) + switch { + case err == nil: + return sz, nil + case ipld.IsNotFound(err) || strings.Contains(err.Error(), "ipld: could not find"): + b, err := fbs.getFallback(ctx, c) + if err != nil { + return 0, err + } + return len(b.RawData()), nil + default: + return sz, xerrors.Errorf("fbs getsize: %w", err) + } +} + +func (fbs *FallbackStore) View(ctx context.Context, c cid.Cid, callback func([]byte) error) error { + err := fbs.Blockstore.View(ctx, c, callback) + switch { + case err == nil: + return nil + case ipld.IsNotFound(err) || strings.Contains(err.Error(), "ipld: could not find"): + b, err := fbs.getFallback(ctx, c) + if err != nil { + return err + } + return callback(b.RawData()) + default: + return xerrors.Errorf("fbs view: %w", err) + } +} diff --git a/ributil/fullchain_cduapi.go b/ributil/fullchain_cduapi.go new file mode 100644 index 0000000..38904f5 --- /dev/null +++ b/ributil/fullchain_cduapi.go @@ -0,0 +1,826 @@ +package ributil + +import ( + "context" + "embed" + "errors" + "fmt" + "github.com/filecoin-project/lotus/lib/addrutil" + "github.com/filecoin-project/lotus/lib/result" + "github.com/ipfs/kubo/core/node" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/p2p/muxer/mplex" + "path" + "runtime/debug" + "strings" + "sync" + "time" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/builtin/v11/util/adt" + "github.com/filecoin-project/go-state-types/network" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/blockstore" + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/actors/builtin" + "github.com/filecoin-project/lotus/chain/actors/builtin/market" + "github.com/filecoin-project/lotus/chain/state" + "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/must" + blockadt "github.com/filecoin-project/specs-actors/actors/util/adt" + lru "github.com/hashicorp/golang-lru/v2" + "github.com/ipfs/go-cid" + bstore "github.com/ipfs/go-ipfs-blockstore" + cbor "github.com/ipfs/go-ipld-cbor" + "github.com/ipfs/go-libipfs/bitswap" + bsnetwork "github.com/ipfs/go-libipfs/bitswap/network" + "github.com/ipfs/go-libipfs/blocks" + "github.com/libp2p/go-libp2p" + dht "github.com/libp2p/go-libp2p-kad-dht" + cbg "github.com/whyrusleeping/cbor-gen" + "golang.org/x/sync/errgroup" + "golang.org/x/xerrors" +) + +//go:embed chainbootstrap +var bootstrapfs embed.FS + +func builtinBootstrap() ([]peer.AddrInfo, error) { + spi, err := bootstrapfs.ReadFile(path.Join("chainbootstrap", "mainnet.pi")) + if err != nil { + return nil, err + } + if len(spi) == 0 { + return nil, nil + } + + return addrutil.ParseAddresses(context.TODO(), strings.Split(strings.TrimSpace(string(spi)), "\n")) +} + +type FullCDU struct { + gw api.Gateway + + bstore blockstore.Blockstore + + cindex *store.ChainIndex + + mmCache *lru.Cache[cid.Cid, mmCids] +} + +// NewFullCDU wraps gateway, which usually has limited lookback, and provides full-node like impl for CDU. +func NewFullCDU(ctx context.Context, gw api.Gateway) (*FullCDU, error) { + chainBitswapHost, err := libp2p.New( + libp2p.DefaultTransports, libp2p.DefaultMuxers, + libp2p.Muxer(mplex.ID, mplex.DefaultTransport), + ) + if err != nil { + return nil, xerrors.Errorf("creating chain bitswap host: %w", err) + } + + bspeers, err := builtinBootstrap() + if err != nil { + return nil, xerrors.Errorf("getting bootstrap peers: %w", err) + } + + rtopts := []dht.Option{dht.Mode(dht.ModeClient), + dht.Validator(node.RecordValidator(chainBitswapHost.Peerstore())), + //dht.ProtocolPrefix(build.DhtProtocolName(nn)), + dht.QueryFilter(dht.PublicQueryFilter), + dht.RoutingTableFilter(dht.PublicRoutingTableFilter), + //dht.DisableProviders(), + //dht.DisableValues(), + dht.BootstrapPeers(bspeers...), + } + d, err := dht.New( + ctx, chainBitswapHost, rtopts..., + ) + if err != nil { + return nil, xerrors.Errorf("creating chain dht client: %w", err) + } + + bitswapNetwork := bsnetwork.NewFromIpfsHost(chainBitswapHost, d) + bitswapOptions := []bitswap.Option{bitswap.ProvideEnabled(false)} + + apiBs := blockstore.NewAPIBlockstore(gw) + + bstore := ARCStore(apiBs) + + bswap := bitswap.New(ctx, bitswapNetwork, bstore, bitswapOptions...) + + fbs := &FallbackStore{ + Blockstore: bstore, + } + + var bootstrapOnce sync.Once + + fbs.SetFallback(func(ctx context.Context, c cid.Cid) (blocks.Block, error) { + bootstrapOnce.Do(func() { + for _, p := range bspeers { + go func(p peer.AddrInfo) { + if err := chainBitswapHost.Connect(context.TODO(), p); err != nil { + log.Errorw("failed to connect to bootstrap peer", "error", err) + } + }(p) + } + + go func() { + time.Sleep(1 * time.Second) + err := d.Bootstrap(context.TODO()) + if err != nil { + log.Errorw("failed to bootstrap dht", "error", err) + } + }() + + time.Sleep(3 * time.Second) + }) + + debug.PrintStack() + + log.Errorw("falling back to bitswap", "cid", c) + return bswap.GetBlock(ctx, c) + }) + + fc := &FullCDU{gw: gw, bstore: fbs, + mmCache: must.One(lru.New[cid.Cid, mmCids](2048)), + } + fc.cindex = store.NewChainIndex(fc.LoadTipSet) + + go func() { + head, err := fc.gw.ChainHead(ctx) + if err != nil { + log.Errorw("FullCDU cache warmup: failed to get head", "error", err) + return + } + + to := head.Height() - builtin.EpochsInDay*14 + start := time.Now() + _, err = fc.cindex.GetTipsetByHeight(ctx, head, to) + if err != nil { + log.Errorw("FullCDU cache warmup: failed to load tipset", "error", err) + return + } + + log.Infow("FullCDU cache warmup: done", "took", time.Since(start)) + }() + + return fc, nil +} + +func (f *FullCDU) ChainGetMessage(ctx context.Context, cid cid.Cid) (*types.Message, error) { + cm, err := f.GetCMessage(ctx, cid) + if err != nil { + return nil, xerrors.Errorf("fullcdu get message: %w", err) + } + + return cm.VMMessage(), nil +} + +func (f *FullCDU) StateLookupID(ctx context.Context, a address.Address, tsk types.TipSetKey) (address.Address, error) { + ts, err := f.GetTipSetFromKey(ctx, tsk) + if err != nil { + return address.Undef, xerrors.Errorf("loading tipset %s: %w", tsk, err) + } + + cst := cbor.NewCborStore(f.bstore) + st, err := state.LoadStateTree(cst, ts.ParentState()) + if err != nil { + return address.Undef, xerrors.Errorf("load state tree: %w", err) + } + return st.LookupID(a) +} + +func (f *FullCDU) StateMarketStorageDeal(ctx context.Context, id abi.DealID, tsk types.TipSetKey) (*api.MarketDeal, error) { + act, err := f.StateGetActor(ctx, market.Address, tsk) + if err != nil { + return nil, xerrors.Errorf("failed to load market actor: %w", err) + } + + state, err := market.Load(ActorStore(ctx, f.bstore), act) + if err != nil { + return nil, xerrors.Errorf("failed to load market actor state: %w", err) + } + + proposals, err := state.Proposals() + if err != nil { + return nil, xerrors.Errorf("failed to get proposals from state : %w", err) + } + + proposal, found, err := proposals.Get(id) + + if err != nil { + return nil, xerrors.Errorf("failed to get proposal : %w", err) + } else if !found { + return nil, xerrors.Errorf( + "deal %d not found "+ + "- deal may not have completed sealing before deal proposal "+ + "start epoch, or deal may have been slashed", + id) + } + + states, err := state.States() + if err != nil { + return nil, xerrors.Errorf("failed to get states : %w", err) + } + + st, found, err := states.Get(id) + if err != nil { + return nil, xerrors.Errorf("failed to get state : %w", err) + } + + if !found { + st = market.EmptyDealState() + } + + return &api.MarketDeal{ + Proposal: *proposal, + State: *st, + }, nil +} + +func (f *FullCDU) StateSearchMsg(ctx context.Context, tsk types.TipSetKey, msg cid.Cid, lookbackLimit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error) { + fromTs, err := f.GetTipSetFromKey(ctx, tsk) + if err != nil { + return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err) + } + + ts, recpt, found, err := f.SearchForMessage(ctx, fromTs, msg, lookbackLimit, allowReplaced) + if err != nil { + return nil, err + } + + if ts != nil { + return &api.MsgLookup{ + Message: found, + Receipt: *recpt, + TipSet: ts.Key(), + Height: ts.Height(), + }, nil + } + return nil, nil +} + +func (f *FullCDU) StateNetworkVersion(ctx context.Context, tsk types.TipSetKey) (network.Version, error) { + return f.StateNetworkVersion(ctx, tsk) +} + +func (f *FullCDU) ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) { + return f.GetTipSetFromKey(ctx, tsk) +} + +func (f *FullCDU) ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) { + ts, err := f.GetTipSetFromKey(ctx, tsk) + if err != nil { + return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err) + } + + if h > ts.Height() { + return nil, xerrors.Errorf("looking for tipset with height greater than start point") + } + + if h == ts.Height() { + return ts, nil + } + + lbts, err := f.cindex.GetTipsetByHeight(ctx, ts, h) + if err != nil { + return nil, err + } + + if lbts.Height() < h { + log.Warnf("chain index returned the wrong tipset at height %d, using slow retrieval", h) + lbts, err = f.cindex.GetTipsetByHeightWithoutCache(ctx, ts, h) + if err != nil { + return nil, err + } + } + + prev := true + + if lbts.Height() == h || !prev { + return lbts, nil + } + + return f.LoadTipSet(ctx, lbts.Parents()) +} + +func (f *FullCDU) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) { + ts, err := f.GetTipSetFromKey(ctx, tsk) + if err != nil { + return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err) + } + + state, err := f.ParentState(ts) + if err != nil { + return nil, err + } + return state.GetActor(actor) +} + +// borrowed from lotus chainstore / statestore + +func ActorStore(ctx context.Context, bs bstore.Blockstore) adt.Store { + return adt.WrapStore(ctx, cbor.NewCborStore(bs)) +} + +func (f *FullCDU) ParentState(ts *types.TipSet) (*state.StateTree, error) { + cst := cbor.NewCborStore(f.bstore) + state, err := state.LoadStateTree(cst, ts.ParentState()) + if err != nil { + return nil, xerrors.Errorf("load state tree: %w", err) + } + + return state, nil +} + +func (f *FullCDU) GetTipSetFromKey(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) { + if tsk.IsEmpty() { + return f.gw.ChainHead(ctx) + } + return f.LoadTipSet(ctx, tsk) +} + +func (f *FullCDU) LoadTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) { + // Fetch tipset block headers from blockstore in parallel + var eg errgroup.Group + cids := tsk.Cids() + blks := make([]*types.BlockHeader, len(cids)) + for i, c := range cids { + i, c := i, c + eg.Go(func() error { + var blk *types.BlockHeader + err := f.bstore.View(ctx, c, func(b []byte) (err error) { + blk, err = types.DecodeBlock(b) + return err + }) + if err != nil { + return err + } + + blks[i] = blk + return nil + }) + } + err := eg.Wait() + if err != nil { + return nil, err + } + + ts, err := types.NewTipSet(blks) + if err != nil { + return nil, err + } + + return ts, nil +} + +func (f *FullCDU) GetCMessage(ctx context.Context, c cid.Cid) (types.ChainMsg, error) { + res := make(chan result.Result[types.ChainMsg], 2) + + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + go func() { + res <- result.Wrap[types.ChainMsg](f.GetMessage(ctx, c)) + }() + go func() { + res <- result.Wrap[types.ChainMsg](f.GetSignedMessage(ctx, c)) + }() + + r1, r2 := <-res, <-res + + if r1.Error != nil && r2.Error != nil { + return nil, xerrors.Errorf("failed to fetch message: %w, %w", r1.Error, r2.Error) + } + + if r1.Error != nil { + return r2.Value, nil + } + + return r1.Value, nil +} + +func (f *FullCDU) GetMessage(ctx context.Context, c cid.Cid) (*types.Message, error) { + var msg *types.Message + err := f.bstore.View(ctx, c, func(b []byte) (err error) { + msg, err = types.DecodeMessage(b) + return err + }) + return msg, err +} + +func (f *FullCDU) GetSignedMessage(ctx context.Context, c cid.Cid) (*types.SignedMessage, error) { + var msg *types.SignedMessage + err := f.bstore.View(ctx, c, func(b []byte) (err error) { + msg, err = types.DecodeSignedMessage(b) + return err + }) + return msg, err +} + +// message search, borrowed from lotus as well + +func (f *FullCDU) SearchForMessage(ctx context.Context, head *types.TipSet, mcid cid.Cid, lookbackLimit abi.ChainEpoch, allowReplaced bool) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) { + msg, err := f.GetCMessage(ctx, mcid) + if err != nil { + return nil, nil, cid.Undef, fmt.Errorf("failed to load message: %w", err) + } + + r, foundMsg, err := f.tipsetExecutedMessage(ctx, head, mcid, msg.VMMessage(), allowReplaced) + if err != nil { + return nil, nil, cid.Undef, err + } + + if r != nil { + return head, r, foundMsg, nil + } + + fts, r, foundMsg, err := f.searchBackForMsg(ctx, head, msg, lookbackLimit, allowReplaced) + + if err != nil { + log.Warnf("failed to look back through chain for message %s", mcid) + return nil, nil, cid.Undef, err + } + + if fts == nil { + return nil, nil, cid.Undef, nil + } + + return fts, r, foundMsg, nil +} + +const LookbackNoLimit = api.LookbackNoLimit + +func (f *FullCDU) searchBackForMsg(ctx context.Context, from *types.TipSet, m types.ChainMsg, limit abi.ChainEpoch, allowReplaced bool) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) { + limitHeight := from.Height() - limit + noLimit := limit == LookbackNoLimit + + cur := from + curActor, err := f.StateGetActor(ctx, m.VMMessage().From, cur.Key()) + if err != nil { + return nil, nil, cid.Undef, xerrors.Errorf("failed to load initital tipset") + } + + mFromId, err := f.StateLookupID(ctx, m.VMMessage().From, from.Key()) + if err != nil { + return nil, nil, cid.Undef, xerrors.Errorf("looking up From id address: %w", err) + } + + mNonce := m.VMMessage().Nonce + + for { + // If we've reached the genesis block, or we've reached the limit of + // how far back to look + if cur.Height() == 0 || !noLimit && cur.Height() <= limitHeight { + // it ain't here! + return nil, nil, cid.Undef, nil + } + + select { + case <-ctx.Done(): + return nil, nil, cid.Undef, nil + default: + } + + // we either have no messages from the sender, or the latest message we found has a lower nonce than the one being searched for, + // either way, no reason to lookback, it ain't there + if curActor == nil || curActor.Nonce == 0 || curActor.Nonce < mNonce { + return nil, nil, cid.Undef, nil + } + + pts, err := f.LoadTipSet(ctx, cur.Parents()) + if err != nil { + return nil, nil, cid.Undef, xerrors.Errorf("failed to load tipset during msg wait searchback: %w", err) + } + + act, err := f.StateGetActor(ctx, mFromId, pts.Key()) + actorNoExist := errors.Is(err, types.ErrActorNotFound) + if err != nil && !actorNoExist { + return nil, nil, cid.Cid{}, xerrors.Errorf("failed to load the actor: %w", err) + } + + // check that between cur and parent tipset the nonce fell into range of our message + if actorNoExist || (curActor.Nonce > mNonce && act.Nonce <= mNonce) { + r, foundMsg, err := f.tipsetExecutedMessage(ctx, cur, m.Cid(), m.VMMessage(), allowReplaced) + if err != nil { + return nil, nil, cid.Undef, xerrors.Errorf("checking for message execution during lookback: %w", err) + } + + if r != nil { + return cur, r, foundMsg, nil + } + } + + cur = pts + curActor = act + } +} + +func (f *FullCDU) tipsetExecutedMessage(ctx context.Context, ts *types.TipSet, msg cid.Cid, vmm *types.Message, allowReplaced bool) (*types.MessageReceipt, cid.Cid, error) { + // The genesis block did not execute any messages + if ts.Height() == 0 { + return nil, cid.Undef, nil + } + + pts, err := f.LoadTipSet(ctx, ts.Parents()) + if err != nil { + return nil, cid.Undef, err + } + + cm, err := f.MessagesForTipset(ctx, pts) + if err != nil { + return nil, cid.Undef, err + } + + for ii := range cm { + // iterate in reverse because we going backwards through the chain + i := len(cm) - ii - 1 + m := cm[i] + + if m.VMMessage().From == vmm.From { // cheaper to just check origin first + if m.VMMessage().Nonce == vmm.Nonce { + if !m.VMMessage().EqualCall(vmm) { + // this is an entirely different message, fail + return nil, cid.Undef, xerrors.Errorf("found message with equal nonce as the one we are looking for that is NOT a valid replacement message (F:%s n %d, TS: %s n%d)", + msg, vmm.Nonce, m.Cid(), m.VMMessage().Nonce) + } + + if m.Cid() != msg { + if !allowReplaced { + log.Warnw("found message with equal nonce and call params but different CID", + "wanted", msg, "found", m.Cid(), "nonce", vmm.Nonce, "from", vmm.From) + return nil, cid.Undef, xerrors.Errorf("found message with equal nonce as the one we are looking for (F:%s n %d, TS: %s n%d)", + msg, vmm.Nonce, m.Cid(), m.VMMessage().Nonce) + } + } + + pr, err := f.GetParentReceipt(ctx, ts.Blocks()[0], i) + if err != nil { + return nil, cid.Undef, err + } + + return pr, m.Cid(), nil + } + if m.VMMessage().Nonce < vmm.Nonce { + return nil, cid.Undef, nil // don't bother looking further + } + } + } + + return nil, cid.Undef, nil +} + +func (f *FullCDU) GetParentReceipt(ctx context.Context, b *types.BlockHeader, i int) (*types.MessageReceipt, error) { + // block headers use adt0, for now. + a, err := blockadt.AsArray(ActorStore(ctx, f.bstore), b.ParentMessageReceipts) + if err != nil { + return nil, xerrors.Errorf("amt load: %w", err) + } + + var r types.MessageReceipt + if found, err := a.Get(uint64(i), &r); err != nil { + return nil, err + } else if !found { + return nil, xerrors.Errorf("failed to find receipt %d", i) + } + + return &r, nil +} + +func (f *FullCDU) MessagesForTipset(ctx context.Context, ts *types.TipSet) ([]types.ChainMsg, error) { + bmsgs, err := f.BlockMsgsForTipset(ctx, ts) + if err != nil { + return nil, err + } + + var out []types.ChainMsg + for _, bm := range bmsgs { + for _, blsm := range bm.BlsMessages { + out = append(out, blsm) + } + + for _, secm := range bm.SecpkMessages { + out = append(out, secm) + } + } + + return out, nil +} + +type BlockMessages struct { + Miner address.Address + BlsMessages []types.ChainMsg + SecpkMessages []types.ChainMsg +} + +func (f *FullCDU) BlockMsgsForTipset(ctx context.Context, ts *types.TipSet) ([]BlockMessages, error) { + // returned BlockMessages match block order in tipset + + applied := make(map[address.Address]uint64) + + cst := cbor.NewCborStore(f.bstore) + st, err := state.LoadStateTree(cst, ts.Blocks()[0].ParentStateRoot) + if err != nil { + return nil, xerrors.Errorf("failed to load state tree at tipset %s: %w", ts, err) + } + + useIds := false + selectMsg := func(m *types.Message) (bool, error) { + var sender address.Address + if ts.Height() >= build.UpgradeHyperdriveHeight { + if useIds { + sender, err = st.LookupID(m.From) + if err != nil { + return false, xerrors.Errorf("failed to resolve sender: %w", err) + } + } else { + if m.From.Protocol() != address.ID { + // we haven't been told to use IDs, just use the robust addr + sender = m.From + } else { + // uh-oh, we actually have an ID-sender! + useIds = true + for robust, nonce := range applied { + resolved, err := st.LookupID(robust) + if err != nil { + return false, xerrors.Errorf("failed to resolve sender: %w", err) + } + applied[resolved] = nonce + } + + sender, err = st.LookupID(m.From) + if err != nil { + return false, xerrors.Errorf("failed to resolve sender: %w", err) + } + } + } + } else { + sender = m.From + } + + // The first match for a sender is guaranteed to have correct nonce -- the block isn't valid otherwise + if _, ok := applied[sender]; !ok { + applied[sender] = m.Nonce + } + + if applied[sender] != m.Nonce { + return false, nil + } + + applied[sender]++ + + return true, nil + } + + var out []BlockMessages + for _, b := range ts.Blocks() { + + bms, sms, err := f.MessagesForBlock(ctx, b) + if err != nil { + return nil, xerrors.Errorf("failed to get messages for block: %w", err) + } + + bm := BlockMessages{ + Miner: b.Miner, + BlsMessages: make([]types.ChainMsg, 0, len(bms)), + SecpkMessages: make([]types.ChainMsg, 0, len(sms)), + } + + for _, bmsg := range bms { + b, err := selectMsg(bmsg.VMMessage()) + if err != nil { + return nil, xerrors.Errorf("failed to decide whether to select message for block: %w", err) + } + + if b { + bm.BlsMessages = append(bm.BlsMessages, bmsg) + } + } + + for _, smsg := range sms { + b, err := selectMsg(smsg.VMMessage()) + if err != nil { + return nil, xerrors.Errorf("failed to decide whether to select message for block: %w", err) + } + + if b { + bm.SecpkMessages = append(bm.SecpkMessages, smsg) + } + } + + out = append(out, bm) + } + + return out, nil +} + +func (f *FullCDU) MessagesForBlock(ctx context.Context, b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) { + blscids, secpkcids, err := f.ReadMsgMetaCids(ctx, b.Messages) + if err != nil { + return nil, nil, err + } + + blsmsgs, err := f.LoadMessagesFromCids(ctx, blscids) + if err != nil { + return nil, nil, xerrors.Errorf("loading bls messages for block: %w", err) + } + + secpkmsgs, err := f.LoadSignedMessagesFromCids(ctx, secpkcids) + if err != nil { + return nil, nil, xerrors.Errorf("loading secpk messages for block: %w", err) + } + + return blsmsgs, secpkmsgs, nil +} + +type mmCids struct { + bls []cid.Cid + secpk []cid.Cid +} + +func (f *FullCDU) ReadMsgMetaCids(ctx context.Context, mmc cid.Cid) ([]cid.Cid, []cid.Cid, error) { + if mmcids, ok := f.mmCache.Get(mmc); ok { + return mmcids.bls, mmcids.secpk, nil + } + + cst := cbor.NewCborStore(f.bstore) + var msgmeta types.MsgMeta + if err := cst.Get(ctx, mmc, &msgmeta); err != nil { + return nil, nil, xerrors.Errorf("failed to load msgmeta (%s): %w", mmc, err) + } + + blscids, err := f.readAMTCids(msgmeta.BlsMessages) + if err != nil { + return nil, nil, xerrors.Errorf("loading bls message cids for block: %w", err) + } + + secpkcids, err := f.readAMTCids(msgmeta.SecpkMessages) + if err != nil { + return nil, nil, xerrors.Errorf("loading secpk message cids for block: %w", err) + } + + f.mmCache.Add(mmc, mmCids{ + bls: blscids, + secpk: secpkcids, + }) + + return blscids, secpkcids, nil +} + +func (f *FullCDU) readAMTCids(root cid.Cid) ([]cid.Cid, error) { + ctx := context.TODO() + // block headers use adt0, for now. + a, err := blockadt.AsArray(ActorStore(ctx, f.bstore), root) + if err != nil { + return nil, xerrors.Errorf("amt load: %w", err) + } + + var ( + cids []cid.Cid + cborCid cbg.CborCid + ) + if err := a.ForEach(&cborCid, func(i int64) error { + c := cid.Cid(cborCid) + cids = append(cids, c) + return nil + }); err != nil { + return nil, xerrors.Errorf("failed to traverse amt: %w", err) + } + + if uint64(len(cids)) != a.Length() { + return nil, xerrors.Errorf("found %d cids, expected %d", len(cids), a.Length()) + } + + return cids, nil +} + +func (f *FullCDU) LoadMessagesFromCids(ctx context.Context, cids []cid.Cid) ([]*types.Message, error) { + msgs := make([]*types.Message, 0, len(cids)) + for i, c := range cids { + m, err := f.GetMessage(ctx, c) + if err != nil { + return nil, xerrors.Errorf("failed to get message: (%s):%d: %w", c, i, err) + } + + msgs = append(msgs, m) + } + + return msgs, nil +} + +func (f *FullCDU) LoadSignedMessagesFromCids(ctx context.Context, cids []cid.Cid) ([]*types.SignedMessage, error) { + msgs := make([]*types.SignedMessage, 0, len(cids)) + for i, c := range cids { + m, err := f.GetSignedMessage(ctx, c) + if err != nil { + return nil, xerrors.Errorf("failed to get message: (%s):%d: %w", c, i, err) + } + + msgs = append(msgs, m) + } + + return msgs, nil +} + +var _ CurrentDealInfoAPI = (*FullCDU)(nil)