Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions actpool/actionstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ func (s *actionStore) Open(onData onAction) error {
func (s *actionStore) Close() error {
s.lock.Lock()
defer s.lock.Unlock()
if !s.IsReady() {
return nil
}

if err := s.TurnOff(); err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions actsync/actionsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ func (as *ActionSync) Start(ctx context.Context) error {

// Stop stops the action syncer
func (as *ActionSync) Stop(ctx context.Context) error {
if !as.IsReady() {
return nil
}
log.L().Info("stopping action sync")
if err := as.TurnOff(); err != nil {
return err
Expand Down
34 changes: 9 additions & 25 deletions blockchain/blockdao/blockdao.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@ import (
"github.com/iotexproject/iotex-proto/golang/iotextypes"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"

"github.com/iotexproject/iotex-core/v2/action"
"github.com/iotexproject/iotex-core/v2/blockchain/block"
"github.com/iotexproject/iotex-core/v2/db"
"github.com/iotexproject/iotex-core/v2/pkg/lifecycle"
"github.com/iotexproject/iotex-core/v2/pkg/log"
"github.com/iotexproject/iotex-core/v2/pkg/prometheustimer"
)

Expand Down Expand Up @@ -74,6 +72,7 @@ type (
blockCache cache.LRUCache
txLogCache cache.LRUCache
tipHeight uint64
checker *BlockIndexerChecker
}
)

Expand All @@ -85,6 +84,12 @@ func WithBlobStore(bs BlobStore) Option {
}
}

func WithIndexerTargetHeight(target uint64) Option {
return func(dao *blockDAO) {
dao.checker = NewBlockIndexerChecker(dao, target)
}
}

// NewBlockDAOWithIndexersAndCache returns a BlockDAO with indexers which will consume blocks appended, and
// caches which will speed up reading
func NewBlockDAOWithIndexersAndCache(blkStore BlockStore, indexers []BlockIndexer, cacheSize int, opts ...Option) BlockDAO {
Expand All @@ -96,6 +101,7 @@ func NewBlockDAOWithIndexersAndCache(blkStore BlockStore, indexers []BlockIndexe
blockStore: blkStore,
indexers: indexers,
}
blockDAO.checker = NewBlockIndexerChecker(blockDAO, 0)
for _, opt := range opts {
opt(blockDAO)
}
Expand Down Expand Up @@ -139,29 +145,7 @@ func (dao *blockDAO) Start(ctx context.Context) error {
return err
}
atomic.StoreUint64(&dao.tipHeight, tipHeight)
return dao.checkIndexers(ctx)
}

func (dao *blockDAO) checkIndexers(ctx context.Context) error {
checker := NewBlockIndexerChecker(dao)
for i, indexer := range dao.indexers {
if err := checker.CheckIndexer(ctx, indexer, 0, func(height uint64) {
if height%5000 == 0 {
log.L().Info(
"indexer is catching up.",
zap.Int("indexer", i),
zap.Uint64("height", height),
)
}
}); err != nil {
return err
}
log.L().Info(
"indexer is up to date.",
zap.Int("indexer", i),
)
}
return nil
return dao.checker.CheckIndexers(ctx, dao.indexers)
}

func (dao *blockDAO) Stop(ctx context.Context) error {
Expand Down
4 changes: 2 additions & 2 deletions blockchain/blockdao/blockdao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func Test_blockDAO_checkIndexers(t *testing.T) {

p.ApplyMethodReturn(&BlockIndexerChecker{}, "CheckIndexer", errors.New(t.Name()))

err := blockdao.checkIndexers(context.Background())
err := blockdao.checker.CheckIndexers(context.Background(), blockdao.indexers)
r.ErrorContains(err, t.Name())
})

Expand All @@ -158,7 +158,7 @@ func Test_blockDAO_checkIndexers(t *testing.T) {
mockblockdao.EXPECT().Height().Return(daoTip, nil).Times(1)
mockblockdao.EXPECT().GetBlockByHeight(gomock.Any()).Return(&block.Block{}, nil).Times(1)

err := blockdao.checkIndexers(ctx)
err := blockdao.checker.CheckIndexers(ctx, blockdao.indexers)
r.NoError(err)
})
}
Expand Down
47 changes: 40 additions & 7 deletions blockchain/blockdao/blockindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package blockdao

import (
"context"
"fmt"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -36,13 +37,14 @@ type (

// BlockIndexerChecker defines a checker of block indexer
BlockIndexerChecker struct {
dao BlockDAO
dao BlockDAO
targetHeight uint64
}
)

// NewBlockIndexerChecker creates a new block indexer checker
func NewBlockIndexerChecker(dao BlockDAO) *BlockIndexerChecker {
return &BlockIndexerChecker{dao: dao}
func NewBlockIndexerChecker(dao BlockDAO, target uint64) *BlockIndexerChecker {
return &BlockIndexerChecker{dao: dao, targetHeight: target}
}

// CheckIndexer checks a block indexer against block dao
Expand All @@ -66,10 +68,6 @@ func (bic *BlockIndexerChecker) CheckIndexer(ctx context.Context, indexer BlockI
if tipHeight > daoTip {
return errors.New("indexer tip height cannot by higher than dao tip height")
}
tipBlk, err := bic.dao.GetBlockByHeight(tipHeight)
if err != nil {
return err
}
if targetHeight == 0 || targetHeight > daoTip {
targetHeight = daoTip
}
Expand All @@ -80,6 +78,10 @@ func (bic *BlockIndexerChecker) CheckIndexer(ctx context.Context, indexer BlockI
startHeight = indexStartHeight
}
}
tipBlk, err := bic.dao.GetBlockByHeight(startHeight - 1)
if err != nil {
return err
}
for i := startHeight; i <= targetHeight; i++ {
// ternimate if context is done
if err := ctx.Err(); err != nil {
Expand Down Expand Up @@ -142,3 +144,34 @@ func (bic *BlockIndexerChecker) CheckIndexer(ctx context.Context, indexer BlockI
}
return nil
}

func (bic *BlockIndexerChecker) CheckIndexers(ctx context.Context, indexers []BlockIndexer) error {
for i, indexer := range indexers {
if err := bic.CheckIndexer(ctx, indexer, bic.targetHeight, func(height uint64) {
if height%5000 == 0 {
log.L().Info(
"indexer is catching up.",
zap.Int("indexer", i),
zap.Uint64("height", height),
)
}
}); err != nil {
return err
}
log.L().Info(
"indexer is up to date.",
zap.Int("indexer", i),
)
}
for _, indexer := range indexers {
height, err := indexer.Height()
if err != nil {
return errors.Wrap(err, "failed to get indexer height")
}
log.L().Info("indexer height", zap.Uint64("height", height), zap.String("indexer", fmt.Sprintf("%T", indexer)), zap.Any("content", indexer))
}
if bic.targetHeight > 0 {
return errors.Errorf("indexers are up to target height %d", bic.targetHeight)
}
return nil
}
6 changes: 3 additions & 3 deletions blockchain/blockdao/blockindexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestCheckIndexer(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockDao := mock_blockdao.NewMockBlockDAO(ctrl)
checker := NewBlockIndexerChecker(mockDao)
checker := NewBlockIndexerChecker(mockDao, 0)
indexer := mock_blockdao.NewMockBlockIndexer(ctrl)

putBlocks := make([]*block.Block, 0)
Expand Down Expand Up @@ -121,7 +121,7 @@ func TestCheckIndexerWithStart(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockDao := mock_blockdao.NewMockBlockDAO(ctrl)
checker := NewBlockIndexerChecker(mockDao)
checker := NewBlockIndexerChecker(mockDao, 0)
indexer := mock_blockdao.NewMockBlockIndexerWithStart(ctrl)

putBlocks := make([]*block.Block, 0)
Expand Down Expand Up @@ -179,7 +179,7 @@ func TestBlockIndexerChecker_CheckIndexer(t *testing.T) {
ctx := context.Background()
store := mock_blockdao.NewMockBlockDAO(ctrl)
dao := &blockDAO{blockStore: store}
bic := NewBlockIndexerChecker(dao)
bic := NewBlockIndexerChecker(dao, 0)
indexer := mock_blockdao.NewMockBlockIndexer(ctrl)

t.Run("WithoutBlockchainContext", func(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions blockchain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ type (
FactoryDBType string `yaml:"factoryDBType"`
// MintTimeout is the timeout for minting
MintTimeout time.Duration `yaml:"-"`
// BlockIndexerTargetHeight is the target height for block indexer
BlockIndexerTargetHeight uint64 `yaml:"blockIndexerTargetHeight"`
// KVStoreAutoFlushThreshold is the threshold for auto flushing the KVStore
TrieDBAutoFlushThreshold int `yaml:"trieDBAutoFlushThreshold"`
}
)

Expand Down
15 changes: 14 additions & 1 deletion blockindex/bloomfilterindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/iotexproject/iotex-core/v2/blockchain/blockdao"
"github.com/iotexproject/iotex-core/v2/db"
"github.com/iotexproject/iotex-core/v2/db/batch"
"github.com/iotexproject/iotex-core/v2/pkg/lifecycle"
"github.com/iotexproject/iotex-core/v2/pkg/util/byteutil"
)

Expand Down Expand Up @@ -60,6 +61,7 @@ type (

// bloomfilterIndexer is a struct for bloomfilter indexer
bloomfilterIndexer struct {
lifecycle.Readiness
mutex sync.RWMutex // mutex for curRangeBloomfilter
kvStore db.KVStore
rangeSize uint64
Expand Down Expand Up @@ -95,6 +97,9 @@ func (bfx *bloomfilterIndexer) Start(ctx context.Context) error {
if err := bfx.kvStore.Start(ctx); err != nil {
return err
}
if err := bfx.TurnOn(); err != nil {
return err
}

bfx.mutex.Lock()
defer bfx.mutex.Unlock()
Expand Down Expand Up @@ -142,7 +147,15 @@ func (bfx *bloomfilterIndexer) initRangeBloomFilter(height uint64) error {

// Stop stops the bloomfilter indexer
func (bfx *bloomfilterIndexer) Stop(ctx context.Context) error {
bfx.totalRange.Close()
if bfx.totalRange != nil {
bfx.totalRange.Close()
}
if !bfx.IsReady() {
return nil
}
if err := bfx.TurnOff(); err != nil {
return err
}
return bfx.kvStore.Stop(ctx)
}

Expand Down
3 changes: 3 additions & 0 deletions blockindex/contractstaking/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ func (s *Indexer) start(ctx context.Context) error {

// Stop stops the indexer
func (s *Indexer) Stop(ctx context.Context) error {
if !s.IsReady() {
return nil
}
if err := s.kvstore.Stop(ctx); err != nil {
return err
}
Expand Down
11 changes: 11 additions & 0 deletions blockindex/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/iotexproject/iotex-core/v2/blockchain/block"
"github.com/iotexproject/iotex-core/v2/db"
"github.com/iotexproject/iotex-core/v2/db/batch"
"github.com/iotexproject/iotex-core/v2/pkg/lifecycle"
"github.com/iotexproject/iotex-core/v2/pkg/util/byteutil"
)

Expand Down Expand Up @@ -62,6 +63,7 @@ type (

// blockIndexer implements the Indexer interface
blockIndexer struct {
lifecycle.Readiness
mutex sync.RWMutex
genesisHash hash.Hash256
kvStore db.KVStoreWithRange
Expand Down Expand Up @@ -95,6 +97,9 @@ func (x *blockIndexer) Start(ctx context.Context) error {
if err := x.kvStore.Start(ctx); err != nil {
return err
}
if err := x.TurnOn(); err != nil {
return err
}
// create the total block and action index
var err error
if x.tbk, err = db.NewCountingIndexNX(x.kvStore, _totalBlocksBucket); err != nil {
Expand All @@ -115,6 +120,12 @@ func (x *blockIndexer) Start(ctx context.Context) error {

// Stop stops the indexer
func (x *blockIndexer) Stop(ctx context.Context) error {
if !x.IsReady() {
return nil
}
if err := x.TurnOff(); err != nil {
return err
}
return x.kvStore.Stop(ctx)
}

Expand Down
11 changes: 11 additions & 0 deletions blockindex/sync_indexers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package blockindex

import (
"context"
"fmt"
"strings"

"go.uber.org/zap"

Expand Down Expand Up @@ -121,3 +123,12 @@ func (ig *SyncIndexers) initStartHeight() error {
}
return nil
}

func (ig *SyncIndexers) String() string {
var sb strings.Builder
for i, indexer := range ig.indexers {
height, _ := indexer.Height()
sb.WriteString(fmt.Sprintf("Indexer %d: %T, Height: %d\n", i, indexer, height))
}
return sb.String()
}
9 changes: 9 additions & 0 deletions chainservice/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,12 @@ func (builder *Builder) createFactory(forTest bool) (factory.Factory, error) {
if err != nil {
return nil, err
}
if builder.cfg.Chain.TrieDBAutoFlushThreshold > 0 {
dao, err = db.NewKVStoreAutoFlush(dao, builder.cfg.Chain.TrieDBAutoFlushThreshold)
if err != nil {
return nil, err
}
}
return factory.NewStateDB(factoryCfg, dao, opts...)
}

Expand Down Expand Up @@ -336,6 +342,9 @@ func (builder *Builder) buildBlockDAO(forTest bool) error {
if err != nil {
return err
}
if cfg.Chain.BlockIndexerTargetHeight > 0 {
opts = append(opts, blockdao.WithIndexerTargetHeight(cfg.Chain.BlockIndexerTargetHeight))
}
builder.cs.blockdao = blockdao.NewBlockDAOWithIndexersAndCache(
store, indexers, cfg.DB.MaxCacheSize, opts...)

Expand Down
2 changes: 1 addition & 1 deletion consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (c *IotxConsensus) Stop(ctx context.Context) error {

err := c.scheme.Stop(ctx)
if err != nil {
return errors.Wrapf(err, "failed to stop scheme %s", c.cfg.Scheme)
log.Logger("consensus").Error("Failed to stop the consensus scheme.", zap.Error(err))
}
return nil
}
Expand Down
Loading
Loading