diff --git a/actpool/actionstore.go b/actpool/actionstore.go index 6dfcd54d5d..89f9c48b2f 100644 --- a/actpool/actionstore.go +++ b/actpool/actionstore.go @@ -144,6 +144,9 @@ func (s *actionStore) Open(onData onAction) error { func (s *actionStore) Close() error { s.lock.Lock() defer s.lock.Unlock() + if !s.IsReady() { + return nil + } if err := s.TurnOff(); err != nil { return err diff --git a/actsync/actionsync.go b/actsync/actionsync.go index ffe3313854..f0e63e9ccb 100644 --- a/actsync/actionsync.go +++ b/actsync/actionsync.go @@ -90,6 +90,9 @@ func (as *ActionSync) Start(ctx context.Context) error { // Stop stops the action syncer func (as *ActionSync) Stop(ctx context.Context) error { + if !as.IsReady() { + return nil + } log.L().Info("stopping action sync") if err := as.TurnOff(); err != nil { return err diff --git a/blockchain/blockdao/blockdao.go b/blockchain/blockdao/blockdao.go index 0e837505e7..7507bc7de7 100644 --- a/blockchain/blockdao/blockdao.go +++ b/blockchain/blockdao/blockdao.go @@ -16,13 +16,11 @@ import ( "github.com/iotexproject/iotex-proto/golang/iotextypes" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "go.uber.org/zap" "github.com/iotexproject/iotex-core/v2/action" "github.com/iotexproject/iotex-core/v2/blockchain/block" "github.com/iotexproject/iotex-core/v2/db" "github.com/iotexproject/iotex-core/v2/pkg/lifecycle" - "github.com/iotexproject/iotex-core/v2/pkg/log" "github.com/iotexproject/iotex-core/v2/pkg/prometheustimer" ) @@ -74,6 +72,7 @@ type ( blockCache cache.LRUCache txLogCache cache.LRUCache tipHeight uint64 + checker *BlockIndexerChecker } ) @@ -85,6 +84,12 @@ func WithBlobStore(bs BlobStore) Option { } } +func WithIndexerTargetHeight(target uint64) Option { + return func(dao *blockDAO) { + dao.checker = NewBlockIndexerChecker(dao, target) + } +} + // NewBlockDAOWithIndexersAndCache returns a BlockDAO with indexers which will consume blocks appended, and // caches which will speed up reading func NewBlockDAOWithIndexersAndCache(blkStore BlockStore, indexers []BlockIndexer, cacheSize int, opts ...Option) BlockDAO { @@ -96,6 +101,7 @@ func NewBlockDAOWithIndexersAndCache(blkStore BlockStore, indexers []BlockIndexe blockStore: blkStore, indexers: indexers, } + blockDAO.checker = NewBlockIndexerChecker(blockDAO, 0) for _, opt := range opts { opt(blockDAO) } @@ -139,29 +145,7 @@ func (dao *blockDAO) Start(ctx context.Context) error { return err } atomic.StoreUint64(&dao.tipHeight, tipHeight) - return dao.checkIndexers(ctx) -} - -func (dao *blockDAO) checkIndexers(ctx context.Context) error { - checker := NewBlockIndexerChecker(dao) - for i, indexer := range dao.indexers { - if err := checker.CheckIndexer(ctx, indexer, 0, func(height uint64) { - if height%5000 == 0 { - log.L().Info( - "indexer is catching up.", - zap.Int("indexer", i), - zap.Uint64("height", height), - ) - } - }); err != nil { - return err - } - log.L().Info( - "indexer is up to date.", - zap.Int("indexer", i), - ) - } - return nil + return dao.checker.CheckIndexers(ctx, dao.indexers) } func (dao *blockDAO) Stop(ctx context.Context) error { diff --git a/blockchain/blockdao/blockdao_test.go b/blockchain/blockdao/blockdao_test.go index 302b15d673..b9c1405c51 100644 --- a/blockchain/blockdao/blockdao_test.go +++ b/blockchain/blockdao/blockdao_test.go @@ -137,7 +137,7 @@ func Test_blockDAO_checkIndexers(t *testing.T) { p.ApplyMethodReturn(&BlockIndexerChecker{}, "CheckIndexer", errors.New(t.Name())) - err := blockdao.checkIndexers(context.Background()) + err := blockdao.checker.CheckIndexers(context.Background(), blockdao.indexers) r.ErrorContains(err, t.Name()) }) @@ -158,7 +158,7 @@ func Test_blockDAO_checkIndexers(t *testing.T) { mockblockdao.EXPECT().Height().Return(daoTip, nil).Times(1) mockblockdao.EXPECT().GetBlockByHeight(gomock.Any()).Return(&block.Block{}, nil).Times(1) - err := blockdao.checkIndexers(ctx) + err := blockdao.checker.CheckIndexers(ctx, blockdao.indexers) r.NoError(err) }) } diff --git a/blockchain/blockdao/blockindexer.go b/blockchain/blockdao/blockindexer.go index 7b3d40c3a1..cc3bf0f3b8 100644 --- a/blockchain/blockdao/blockindexer.go +++ b/blockchain/blockdao/blockindexer.go @@ -7,6 +7,7 @@ package blockdao import ( "context" + "fmt" "time" "github.com/pkg/errors" @@ -36,13 +37,14 @@ type ( // BlockIndexerChecker defines a checker of block indexer BlockIndexerChecker struct { - dao BlockDAO + dao BlockDAO + targetHeight uint64 } ) // NewBlockIndexerChecker creates a new block indexer checker -func NewBlockIndexerChecker(dao BlockDAO) *BlockIndexerChecker { - return &BlockIndexerChecker{dao: dao} +func NewBlockIndexerChecker(dao BlockDAO, target uint64) *BlockIndexerChecker { + return &BlockIndexerChecker{dao: dao, targetHeight: target} } // CheckIndexer checks a block indexer against block dao @@ -66,10 +68,6 @@ func (bic *BlockIndexerChecker) CheckIndexer(ctx context.Context, indexer BlockI if tipHeight > daoTip { return errors.New("indexer tip height cannot by higher than dao tip height") } - tipBlk, err := bic.dao.GetBlockByHeight(tipHeight) - if err != nil { - return err - } if targetHeight == 0 || targetHeight > daoTip { targetHeight = daoTip } @@ -80,6 +78,10 @@ func (bic *BlockIndexerChecker) CheckIndexer(ctx context.Context, indexer BlockI startHeight = indexStartHeight } } + tipBlk, err := bic.dao.GetBlockByHeight(startHeight - 1) + if err != nil { + return err + } for i := startHeight; i <= targetHeight; i++ { // ternimate if context is done if err := ctx.Err(); err != nil { @@ -142,3 +144,34 @@ func (bic *BlockIndexerChecker) CheckIndexer(ctx context.Context, indexer BlockI } return nil } + +func (bic *BlockIndexerChecker) CheckIndexers(ctx context.Context, indexers []BlockIndexer) error { + for i, indexer := range indexers { + if err := bic.CheckIndexer(ctx, indexer, bic.targetHeight, func(height uint64) { + if height%5000 == 0 { + log.L().Info( + "indexer is catching up.", + zap.Int("indexer", i), + zap.Uint64("height", height), + ) + } + }); err != nil { + return err + } + log.L().Info( + "indexer is up to date.", + zap.Int("indexer", i), + ) + } + for _, indexer := range indexers { + height, err := indexer.Height() + if err != nil { + return errors.Wrap(err, "failed to get indexer height") + } + log.L().Info("indexer height", zap.Uint64("height", height), zap.String("indexer", fmt.Sprintf("%T", indexer)), zap.Any("content", indexer)) + } + if bic.targetHeight > 0 { + return errors.Errorf("indexers are up to target height %d", bic.targetHeight) + } + return nil +} diff --git a/blockchain/blockdao/blockindexer_test.go b/blockchain/blockdao/blockindexer_test.go index 3fc39b1a8b..1b6cf4d575 100644 --- a/blockchain/blockdao/blockindexer_test.go +++ b/blockchain/blockdao/blockindexer_test.go @@ -49,7 +49,7 @@ func TestCheckIndexer(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() mockDao := mock_blockdao.NewMockBlockDAO(ctrl) - checker := NewBlockIndexerChecker(mockDao) + checker := NewBlockIndexerChecker(mockDao, 0) indexer := mock_blockdao.NewMockBlockIndexer(ctrl) putBlocks := make([]*block.Block, 0) @@ -121,7 +121,7 @@ func TestCheckIndexerWithStart(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() mockDao := mock_blockdao.NewMockBlockDAO(ctrl) - checker := NewBlockIndexerChecker(mockDao) + checker := NewBlockIndexerChecker(mockDao, 0) indexer := mock_blockdao.NewMockBlockIndexerWithStart(ctrl) putBlocks := make([]*block.Block, 0) @@ -179,7 +179,7 @@ func TestBlockIndexerChecker_CheckIndexer(t *testing.T) { ctx := context.Background() store := mock_blockdao.NewMockBlockDAO(ctrl) dao := &blockDAO{blockStore: store} - bic := NewBlockIndexerChecker(dao) + bic := NewBlockIndexerChecker(dao, 0) indexer := mock_blockdao.NewMockBlockIndexer(ctrl) t.Run("WithoutBlockchainContext", func(t *testing.T) { diff --git a/blockchain/config.go b/blockchain/config.go index c14396ba8d..f43d3aad08 100644 --- a/blockchain/config.go +++ b/blockchain/config.go @@ -87,6 +87,10 @@ type ( FactoryDBType string `yaml:"factoryDBType"` // MintTimeout is the timeout for minting MintTimeout time.Duration `yaml:"-"` + // BlockIndexerTargetHeight is the target height for block indexer + BlockIndexerTargetHeight uint64 `yaml:"blockIndexerTargetHeight"` + // KVStoreAutoFlushThreshold is the threshold for auto flushing the KVStore + TrieDBAutoFlushThreshold int `yaml:"trieDBAutoFlushThreshold"` } ) diff --git a/blockindex/bloomfilterindexer.go b/blockindex/bloomfilterindexer.go index 37309566ed..99be0cea2c 100644 --- a/blockindex/bloomfilterindexer.go +++ b/blockindex/bloomfilterindexer.go @@ -20,6 +20,7 @@ import ( "github.com/iotexproject/iotex-core/v2/blockchain/blockdao" "github.com/iotexproject/iotex-core/v2/db" "github.com/iotexproject/iotex-core/v2/db/batch" + "github.com/iotexproject/iotex-core/v2/pkg/lifecycle" "github.com/iotexproject/iotex-core/v2/pkg/util/byteutil" ) @@ -60,6 +61,7 @@ type ( // bloomfilterIndexer is a struct for bloomfilter indexer bloomfilterIndexer struct { + lifecycle.Readiness mutex sync.RWMutex // mutex for curRangeBloomfilter kvStore db.KVStore rangeSize uint64 @@ -95,6 +97,9 @@ func (bfx *bloomfilterIndexer) Start(ctx context.Context) error { if err := bfx.kvStore.Start(ctx); err != nil { return err } + if err := bfx.TurnOn(); err != nil { + return err + } bfx.mutex.Lock() defer bfx.mutex.Unlock() @@ -142,7 +147,15 @@ func (bfx *bloomfilterIndexer) initRangeBloomFilter(height uint64) error { // Stop stops the bloomfilter indexer func (bfx *bloomfilterIndexer) Stop(ctx context.Context) error { - bfx.totalRange.Close() + if bfx.totalRange != nil { + bfx.totalRange.Close() + } + if !bfx.IsReady() { + return nil + } + if err := bfx.TurnOff(); err != nil { + return err + } return bfx.kvStore.Stop(ctx) } diff --git a/blockindex/contractstaking/indexer.go b/blockindex/contractstaking/indexer.go index 8bb29e76a2..12358d0182 100644 --- a/blockindex/contractstaking/indexer.go +++ b/blockindex/contractstaking/indexer.go @@ -112,6 +112,9 @@ func (s *Indexer) start(ctx context.Context) error { // Stop stops the indexer func (s *Indexer) Stop(ctx context.Context) error { + if !s.IsReady() { + return nil + } if err := s.kvstore.Stop(ctx); err != nil { return err } diff --git a/blockindex/indexer.go b/blockindex/indexer.go index c7cd07d845..ce010c397f 100644 --- a/blockindex/indexer.go +++ b/blockindex/indexer.go @@ -21,6 +21,7 @@ import ( "github.com/iotexproject/iotex-core/v2/blockchain/block" "github.com/iotexproject/iotex-core/v2/db" "github.com/iotexproject/iotex-core/v2/db/batch" + "github.com/iotexproject/iotex-core/v2/pkg/lifecycle" "github.com/iotexproject/iotex-core/v2/pkg/util/byteutil" ) @@ -62,6 +63,7 @@ type ( // blockIndexer implements the Indexer interface blockIndexer struct { + lifecycle.Readiness mutex sync.RWMutex genesisHash hash.Hash256 kvStore db.KVStoreWithRange @@ -95,6 +97,9 @@ func (x *blockIndexer) Start(ctx context.Context) error { if err := x.kvStore.Start(ctx); err != nil { return err } + if err := x.TurnOn(); err != nil { + return err + } // create the total block and action index var err error if x.tbk, err = db.NewCountingIndexNX(x.kvStore, _totalBlocksBucket); err != nil { @@ -115,6 +120,12 @@ func (x *blockIndexer) Start(ctx context.Context) error { // Stop stops the indexer func (x *blockIndexer) Stop(ctx context.Context) error { + if !x.IsReady() { + return nil + } + if err := x.TurnOff(); err != nil { + return err + } return x.kvStore.Stop(ctx) } diff --git a/blockindex/sync_indexers.go b/blockindex/sync_indexers.go index 903f5d7e92..fa57105869 100644 --- a/blockindex/sync_indexers.go +++ b/blockindex/sync_indexers.go @@ -7,6 +7,8 @@ package blockindex import ( "context" + "fmt" + "strings" "go.uber.org/zap" @@ -121,3 +123,12 @@ func (ig *SyncIndexers) initStartHeight() error { } return nil } + +func (ig *SyncIndexers) String() string { + var sb strings.Builder + for i, indexer := range ig.indexers { + height, _ := indexer.Height() + sb.WriteString(fmt.Sprintf("Indexer %d: %T, Height: %d\n", i, indexer, height)) + } + return sb.String() +} diff --git a/chainservice/builder.go b/chainservice/builder.go index a3ec0ff691..4618486e07 100644 --- a/chainservice/builder.go +++ b/chainservice/builder.go @@ -179,6 +179,12 @@ func (builder *Builder) createFactory(forTest bool) (factory.Factory, error) { if err != nil { return nil, err } + if builder.cfg.Chain.TrieDBAutoFlushThreshold > 0 { + dao, err = db.NewKVStoreAutoFlush(dao, builder.cfg.Chain.TrieDBAutoFlushThreshold) + if err != nil { + return nil, err + } + } return factory.NewStateDB(factoryCfg, dao, opts...) } @@ -336,6 +342,9 @@ func (builder *Builder) buildBlockDAO(forTest bool) error { if err != nil { return err } + if cfg.Chain.BlockIndexerTargetHeight > 0 { + opts = append(opts, blockdao.WithIndexerTargetHeight(cfg.Chain.BlockIndexerTargetHeight)) + } builder.cs.blockdao = blockdao.NewBlockDAOWithIndexersAndCache( store, indexers, cfg.DB.MaxCacheSize, opts...) diff --git a/consensus/consensus.go b/consensus/consensus.go index b1505f260d..51bd59ed3d 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -225,7 +225,7 @@ func (c *IotxConsensus) Stop(ctx context.Context) error { err := c.scheme.Stop(ctx) if err != nil { - return errors.Wrapf(err, "failed to stop scheme %s", c.cfg.Scheme) + log.Logger("consensus").Error("Failed to stop the consensus scheme.", zap.Error(err)) } return nil } diff --git a/db/kvstorewithbuffer.go b/db/kvstorewithbuffer.go index 69952d62c4..5bdd7d903b 100644 --- a/db/kvstorewithbuffer.go +++ b/db/kvstorewithbuffer.go @@ -6,8 +6,10 @@ import ( "fmt" "github.com/pkg/errors" + "go.uber.org/zap" "github.com/iotexproject/iotex-core/v2/db/batch" + "github.com/iotexproject/iotex-core/v2/pkg/log" ) type ( @@ -32,6 +34,11 @@ type ( buffer batch.CachedBatch } + kvStoreAutoWriteBatch struct { + *kvStoreWithBuffer + threshold int + } + // KVStoreFlusher is a wrapper of KVStoreWithBuffer, which has flush api KVStoreFlusher interface { SerializeQueue() []byte @@ -110,6 +117,25 @@ func NewKVStoreFlusher(store KVStore, buffer batch.CachedBatch, opts ...KVStoreF return f, nil } +// NewKVStoreAutoFlush creates a new KVStoreAutoFlusher +func NewKVStoreAutoFlush(store KVStore, threshold int) (KVStore, error) { + if store == nil { + return nil, errors.New("store cannot be nil") + } + if threshold <= 0 { + return nil, errors.New("invalid threshold") + } + buffer := batch.NewCachedBatch() + kva := &kvStoreAutoWriteBatch{ + kvStoreWithBuffer: &kvStoreWithBuffer{ + store: store, + buffer: buffer, + }, + threshold: threshold, + } + return kva, nil +} + func (f *flusher) Flush() error { if err := f.kvb.store.WriteBatch(f.kvb.buffer.Translate(f.flushTranslate)); err != nil { return err @@ -250,3 +276,30 @@ func (kvb *kvStoreWithBuffer) WriteBatch(b batch.KVStoreBatch) (err error) { kvb.buffer.Append(b) return nil } + +func (kva *kvStoreAutoWriteBatch) WriteBatch(b batch.KVStoreBatch) (err error) { + log.L().Debug("writing batch", zap.Int("size", kva.buffer.Size())) + kva.buffer.Append(b) + if kva.buffer.Size() >= kva.threshold { + kva.flush() + } + return nil +} + +func (kva *kvStoreAutoWriteBatch) Stop(ctx context.Context) (err error) { + log.L().Debug("stopping kv store with final flush") + if err := kva.flush(); err != nil { + return err + } + return kva.kvStoreWithBuffer.Stop(ctx) +} + +func (kva *kvStoreAutoWriteBatch) flush() (err error) { + log.L().Debug("flushing batch", zap.Int("size", kva.buffer.Size())) + if err := kva.store.WriteBatch(kva.buffer.Translate(func(wi *batch.WriteInfo) *batch.WriteInfo { return wi })); err != nil { + return err + } + kva.buffer.Lock() + kva.buffer.ClearAndUnlock() + return nil +} diff --git a/dispatcher/dispatcher.go b/dispatcher/dispatcher.go index 67ee89fc74..913e0a446b 100644 --- a/dispatcher/dispatcher.go +++ b/dispatcher/dispatcher.go @@ -164,6 +164,9 @@ func (d *IotxDispatcher) Start(ctx context.Context) error { // Stop gracefully shuts down the dispatcher by stopping all handlers and waiting for them to finish. func (d *IotxDispatcher) Stop(ctx context.Context) error { + if !d.IsReady() { + return nil + } if err := d.TurnOff(); err != nil { log.L().Warn("Dispatcher already in the process of shutting down.") return err diff --git a/nodeinfo/manager.go b/nodeinfo/manager.go index 69cc3e8379..d2013b8f24 100644 --- a/nodeinfo/manager.go +++ b/nodeinfo/manager.go @@ -52,6 +52,7 @@ type ( // InfoManager manage delegate node info InfoManager struct { lifecycle.Lifecycle + lifecycle.Readiness version string broadcastList atomic.Value // []string, whitelist to force enable broadcast nodeMap *lru.Cache @@ -121,11 +122,21 @@ func NewInfoManager(cfg *Config, t transmitter, ch chain, broadcastListFunc getB // Start start delegate broadcast task func (dm *InfoManager) Start(ctx context.Context) error { dm.updateBroadcastList() - return dm.OnStart(ctx) + err := dm.OnStart(ctx) + if err != nil { + return err + } + return dm.TurnOn() } // Stop stop delegate broadcast task func (dm *InfoManager) Stop(ctx context.Context) error { + if !dm.IsReady() { + return nil + } + if err := dm.TurnOff(); err != nil { + return err + } return dm.OnStop(ctx) } diff --git a/pkg/messagebatcher/batchwriter.go b/pkg/messagebatcher/batchwriter.go index e03a2409a4..0e7fad92ec 100644 --- a/pkg/messagebatcher/batchwriter.go +++ b/pkg/messagebatcher/batchwriter.go @@ -83,6 +83,9 @@ func (bm *Manager) Start() error { // Stop stops the Manager func (bm *Manager) Stop() error { + if bm.cancelHanlders == nil { + return nil + } bm.cancelHanlders() return nil } diff --git a/server/itx/nodestats/nodestats.go b/server/itx/nodestats/nodestats.go index ee06a5ca55..f8586782c5 100644 --- a/server/itx/nodestats/nodestats.go +++ b/server/itx/nodestats/nodestats.go @@ -40,6 +40,9 @@ func (s *NodeStats) Start(ctx context.Context) error { // Stop stops the node stats func (s *NodeStats) Stop(ctx context.Context) error { + if s.task == nil { + return nil + } return s.task.Stop(ctx) } diff --git a/server/itx/pauseable.go b/server/itx/pauseable.go index 62b490ce67..742c9df13d 100644 --- a/server/itx/pauseable.go +++ b/server/itx/pauseable.go @@ -2,35 +2,99 @@ package itx import ( "net/http" + "path" + "strconv" + "sync/atomic" + + "github.com/iotexproject/iotex-core/v2/blockchain/block" ) type ( + // Pauseable interface defines components that can be paused Pauseable interface { Pause(bool) } + // PauseMgr manages pause operations for blockchain components PauseMgr struct { - pauses []Pauseable + pauses []Pauseable + height uint64 + pauseAtHeight uint64 } ) +// NewPauseMgr creates a new pause manager func NewPauseMgr(pauses ...Pauseable) *PauseMgr { return &PauseMgr{ pauses: pauses, } } +// Pause pauses or unpauses all pauseable components func (pm *PauseMgr) Pause(pause bool) { for _, p := range pm.pauses { p.Pause(pause) } } -func (pm *PauseMgr) HandlePause(w http.ResponseWriter, r *http.Request) { +// HandlePause handles the immediate pause request +func (pm *PauseMgr) HandlePause(w http.ResponseWriter, _ *http.Request) { pm.Pause(true) w.WriteHeader(http.StatusOK) } -func (pm *PauseMgr) HandleUnPause(w http.ResponseWriter, r *http.Request) { +// HandleUnPause handles the unpause request +func (pm *PauseMgr) HandleUnPause(w http.ResponseWriter, _ *http.Request) { pm.Pause(false) w.WriteHeader(http.StatusOK) } + +// ReceiveBlock receives block notifications and checks for pause conditions +func (pm *PauseMgr) ReceiveBlock(block *block.Block) error { + currentHeight := block.Height() + atomic.StoreUint64(&pm.height, currentHeight) + + // Check if we should pause at the target height + pauseAtHeight := atomic.LoadUint64(&pm.pauseAtHeight) + if pauseAtHeight > 0 && currentHeight >= pauseAtHeight { + pm.Pause(true) + // Reset the pause height after pausing + atomic.StoreUint64(&pm.pauseAtHeight, 0) + } + return nil +} + +// HandlePauseAtHeight handles the pause at specific height request +func (pm *PauseMgr) HandlePauseAtHeight(w http.ResponseWriter, r *http.Request) { + // Extract height from URL path like /pause/12345 + pathStr := r.URL.Path + heightStr := path.Base(pathStr) + + height, err := strconv.ParseUint(heightStr, 10, 64) + if err != nil { + http.Error(w, "Invalid height parameter", http.StatusBadRequest) + return + } + + currentHeight := atomic.LoadUint64(&pm.height) + if height <= currentHeight { + http.Error(w, "Height must be greater than current block height", http.StatusBadRequest) + return + } + + // Check if pauseAtHeight was previously set + oldPauseAtHeight := atomic.LoadUint64(&pm.pauseAtHeight) + + // Set the target pause height + atomic.StoreUint64(&pm.pauseAtHeight, height) + + // Return different status codes based on whether this is a new setting or an update + if oldPauseAtHeight == 0 { + // First time setting pause height - return 201 Created + w.WriteHeader(http.StatusCreated) + w.Write([]byte("Pause scheduled at height " + heightStr)) + } else { + // Updating existing pause height - return 200 OK + w.WriteHeader(http.StatusOK) + w.Write([]byte("Pause height updated to " + heightStr)) + } +} diff --git a/server/itx/server.go b/server/itx/server.go index f0ec49dc8f..ad1f5da81c 100644 --- a/server/itx/server.go +++ b/server/itx/server.go @@ -132,6 +132,10 @@ func newServer(cfg config.Config, testing bool) (*Server, error) { return nil, errors.Wrap(err, "failed to add api server as subscriber") } } + // Add pauseMgr as a block subscriber to receive block notifications + if err := cs.Blockchain().AddSubscriber(pauseMgr); err != nil { + return nil, errors.Wrap(err, "failed to add pause manager as subscriber") + } // TODO: explorer dependency deleted here at #1085, need to revive by migrating to api chains[cs.ChainID()] = cs dispatcher.AddSubscriber(cs.ChainID(), cs) @@ -179,7 +183,11 @@ func (s *Server) Start(ctx context.Context) error { // Stop stops the server func (s *Server) Stop(ctx context.Context) error { - defer s.subModuleCancel() + defer func() { + if s.subModuleCancel != nil { + s.subModuleCancel() + } + }() if err := s.nodeStats.Stop(ctx); err != nil { return errors.Wrap(err, "error when stopping node stats") } @@ -262,15 +270,25 @@ func (s *Server) Dispatcher() dispatcher.Dispatcher { // StartServer starts a node server func StartServer(ctx context.Context, svr *Server, probeSvr *probe.Server, cfg config.Config) { - if err := svr.Start(ctx); err != nil { - log.L().Fatal("Failed to start server.", zap.Error(err)) - return + + if adminSvr := startAdminServer(ctx, svr, cfg); adminSvr != nil { + defer func() { + if err := adminSvr.Shutdown(ctx); err != nil { + log.L().Error("Error when serving metrics data.", zap.Error(err)) + } + }() } + defer func() { if err := svr.Stop(context.Background()); err != nil { log.L().Panic("Failed to stop server.", zap.Error(err)) } }() + if err := svr.Start(ctx); err != nil { + log.L().Fatal("Failed to start server.", zap.Error(err)) + return + } + if _, isGateway := cfg.Plugins[config.GatewayPlugin]; isGateway && cfg.API.ReadyDuration > 0 { // wait for a while to make sure the server is ready // The original intention was to ensure that all transactions that were not received during the restart were included in block, thereby avoiding inconsistencies in the state of the API node. @@ -294,43 +312,43 @@ func StartServer(ctx context.Context, svr *Server, probeSvr *probe.Server, cfg c }() } - var adminserv http.Server - if cfg.System.HTTPAdminPort > 0 { - mux := http.NewServeMux() - log.RegisterLevelConfigMux(mux) - haCtl := ha.New(svr.rootChainService.Consensus()) - mux.Handle("/ha", http.HandlerFunc(haCtl.Handle)) - mux.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index)) - mux.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline)) - mux.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile)) - mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol)) - mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace)) - mux.Handle("/pause", http.HandlerFunc(svr.pauseMgr.HandlePause)) - mux.Handle("/unpause", http.HandlerFunc(svr.pauseMgr.HandleUnPause)) - - port := fmt.Sprintf(":%d", cfg.System.HTTPAdminPort) - adminserv = httputil.NewServer(port, mux) - defer func() { - if err := adminserv.Shutdown(ctx); err != nil { - log.L().Error("Error when serving metrics data.", zap.Error(err)) - } - }() - go func() { - runtime.SetMutexProfileFraction(1) - runtime.SetBlockProfileRate(1) - ln, err := httputil.LimitListener(adminserv.Addr) - if err != nil { - log.L().Error("Error when listen to profiling port.", zap.Error(err)) - return - } - if err := adminserv.Serve(ln); err != nil { - log.L().Error("Error when serving performance profiling data.", zap.Error(err)) - } - }() - } - <-ctx.Done() if err := probeSvr.TurnOff(); err != nil { log.L().Panic("Failed to turn off probe server.", zap.Error(err)) } } + +func startAdminServer(ctx context.Context, svr *Server, cfg config.Config) *http.Server { + if cfg.System.HTTPAdminPort == 0 { + return nil + } + mux := http.NewServeMux() + log.RegisterLevelConfigMux(mux) + haCtl := ha.New(svr.rootChainService.Consensus()) + mux.Handle("/ha", http.HandlerFunc(haCtl.Handle)) + mux.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index)) + mux.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline)) + mux.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile)) + mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol)) + mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace)) + mux.Handle("/pause", http.HandlerFunc(svr.pauseMgr.HandlePause)) + mux.Handle("/unpause", http.HandlerFunc(svr.pauseMgr.HandleUnPause)) + mux.Handle("/pause/", http.HandlerFunc(svr.pauseMgr.HandlePauseAtHeight)) + + port := fmt.Sprintf(":%d", cfg.System.HTTPAdminPort) + adminserv := httputil.NewServer(port, mux) + + go func() { + runtime.SetMutexProfileFraction(1) + runtime.SetBlockProfileRate(1) + ln, err := httputil.LimitListener(adminserv.Addr) + if err != nil { + log.L().Error("Error when listen to profiling port.", zap.Error(err)) + return + } + if err := adminserv.Serve(ln); err != nil { + log.L().Error("Error when serving performance profiling data.", zap.Error(err)) + } + }() + return &adminserv +} diff --git a/state/factory/daoretrofitter.go b/state/factory/daoretrofitter.go index 7170fcabed..ef5955f69f 100644 --- a/state/factory/daoretrofitter.go +++ b/state/factory/daoretrofitter.go @@ -11,10 +11,12 @@ import ( "github.com/pkg/errors" "github.com/iotexproject/iotex-core/v2/db" + "github.com/iotexproject/iotex-core/v2/pkg/lifecycle" "github.com/iotexproject/iotex-core/v2/pkg/util/byteutil" ) type daoRTF struct { + lifecycle.Readiness dao db.KVStore } @@ -25,10 +27,22 @@ func newDaoRetrofitter(dao db.KVStore) *daoRTF { } func (rtf *daoRTF) Start(ctx context.Context) error { - return rtf.dao.Start(ctx) + if err := rtf.dao.Start(ctx); err != nil { + return err + } + if err := rtf.TurnOn(); err != nil { + return err + } + return nil } func (rtf *daoRTF) Stop(ctx context.Context) error { + if !rtf.IsReady() { + return nil + } + if err := rtf.TurnOff(); err != nil { + return err + } return rtf.dao.Stop(ctx) } diff --git a/systemcontractindex/stakingindex/index.go b/systemcontractindex/stakingindex/index.go index 7f69c9fca0..f52a4bd38a 100644 --- a/systemcontractindex/stakingindex/index.go +++ b/systemcontractindex/stakingindex/index.go @@ -146,6 +146,9 @@ func (s *Indexer) start(ctx context.Context) error { func (s *Indexer) Stop(ctx context.Context) error { s.mutex.Lock() defer s.mutex.Unlock() + if !s.common.Started() { + return nil + } return s.common.Stop(ctx) }