From 388c3c1a158a197d5b9e3d203d9216df5de4f9de Mon Sep 17 00:00:00 2001 From: envestcc Date: Mon, 1 Sep 2025 15:05:51 +0800 Subject: [PATCH 1/7] pause at height --- server/itx/pauseable.go | 70 +++++++++++++++++++++++++++++++++++++++-- server/itx/server.go | 5 +++ 2 files changed, 72 insertions(+), 3 deletions(-) diff --git a/server/itx/pauseable.go b/server/itx/pauseable.go index 62b490ce67..742c9df13d 100644 --- a/server/itx/pauseable.go +++ b/server/itx/pauseable.go @@ -2,35 +2,99 @@ package itx import ( "net/http" + "path" + "strconv" + "sync/atomic" + + "github.com/iotexproject/iotex-core/v2/blockchain/block" ) type ( + // Pauseable interface defines components that can be paused Pauseable interface { Pause(bool) } + // PauseMgr manages pause operations for blockchain components PauseMgr struct { - pauses []Pauseable + pauses []Pauseable + height uint64 + pauseAtHeight uint64 } ) +// NewPauseMgr creates a new pause manager func NewPauseMgr(pauses ...Pauseable) *PauseMgr { return &PauseMgr{ pauses: pauses, } } +// Pause pauses or unpauses all pauseable components func (pm *PauseMgr) Pause(pause bool) { for _, p := range pm.pauses { p.Pause(pause) } } -func (pm *PauseMgr) HandlePause(w http.ResponseWriter, r *http.Request) { +// HandlePause handles the immediate pause request +func (pm *PauseMgr) HandlePause(w http.ResponseWriter, _ *http.Request) { pm.Pause(true) w.WriteHeader(http.StatusOK) } -func (pm *PauseMgr) HandleUnPause(w http.ResponseWriter, r *http.Request) { +// HandleUnPause handles the unpause request +func (pm *PauseMgr) HandleUnPause(w http.ResponseWriter, _ *http.Request) { pm.Pause(false) w.WriteHeader(http.StatusOK) } + +// ReceiveBlock receives block notifications and checks for pause conditions +func (pm *PauseMgr) ReceiveBlock(block *block.Block) error { + currentHeight := block.Height() + atomic.StoreUint64(&pm.height, currentHeight) + + // Check if we should pause at the target height + pauseAtHeight := atomic.LoadUint64(&pm.pauseAtHeight) + if pauseAtHeight > 0 && currentHeight >= pauseAtHeight { + pm.Pause(true) + // Reset the pause height after pausing + atomic.StoreUint64(&pm.pauseAtHeight, 0) + } + return nil +} + +// HandlePauseAtHeight handles the pause at specific height request +func (pm *PauseMgr) HandlePauseAtHeight(w http.ResponseWriter, r *http.Request) { + // Extract height from URL path like /pause/12345 + pathStr := r.URL.Path + heightStr := path.Base(pathStr) + + height, err := strconv.ParseUint(heightStr, 10, 64) + if err != nil { + http.Error(w, "Invalid height parameter", http.StatusBadRequest) + return + } + + currentHeight := atomic.LoadUint64(&pm.height) + if height <= currentHeight { + http.Error(w, "Height must be greater than current block height", http.StatusBadRequest) + return + } + + // Check if pauseAtHeight was previously set + oldPauseAtHeight := atomic.LoadUint64(&pm.pauseAtHeight) + + // Set the target pause height + atomic.StoreUint64(&pm.pauseAtHeight, height) + + // Return different status codes based on whether this is a new setting or an update + if oldPauseAtHeight == 0 { + // First time setting pause height - return 201 Created + w.WriteHeader(http.StatusCreated) + w.Write([]byte("Pause scheduled at height " + heightStr)) + } else { + // Updating existing pause height - return 200 OK + w.WriteHeader(http.StatusOK) + w.Write([]byte("Pause height updated to " + heightStr)) + } +} diff --git a/server/itx/server.go b/server/itx/server.go index f0ec49dc8f..f1c3e64f23 100644 --- a/server/itx/server.go +++ b/server/itx/server.go @@ -132,6 +132,10 @@ func newServer(cfg config.Config, testing bool) (*Server, error) { return nil, errors.Wrap(err, "failed to add api server as subscriber") } } + // Add pauseMgr as a block subscriber to receive block notifications + if err := cs.Blockchain().AddSubscriber(pauseMgr); err != nil { + return nil, errors.Wrap(err, "failed to add pause manager as subscriber") + } // TODO: explorer dependency deleted here at #1085, need to revive by migrating to api chains[cs.ChainID()] = cs dispatcher.AddSubscriber(cs.ChainID(), cs) @@ -307,6 +311,7 @@ func StartServer(ctx context.Context, svr *Server, probeSvr *probe.Server, cfg c mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace)) mux.Handle("/pause", http.HandlerFunc(svr.pauseMgr.HandlePause)) mux.Handle("/unpause", http.HandlerFunc(svr.pauseMgr.HandleUnPause)) + mux.Handle("/pause/", http.HandlerFunc(svr.pauseMgr.HandlePauseAtHeight)) port := fmt.Sprintf(":%d", cfg.System.HTTPAdminPort) adminserv = httputil.NewServer(port, mux) From ad73a713c153227a61ba5ba9266e5e5c7282333b Mon Sep 17 00:00:00 2001 From: envestcc Date: Mon, 1 Sep 2025 16:18:15 +0800 Subject: [PATCH 2/7] block indexer target height --- blockchain/blockdao/blockdao.go | 34 ++++++--------------- blockchain/blockdao/blockdao_test.go | 4 +-- blockchain/blockdao/blockindexer.go | 39 ++++++++++++++++++++++-- blockchain/blockdao/blockindexer_test.go | 6 ++-- blockchain/config.go | 2 ++ blockindex/sync_indexers.go | 11 +++++++ chainservice/builder.go | 3 ++ 7 files changed, 66 insertions(+), 33 deletions(-) diff --git a/blockchain/blockdao/blockdao.go b/blockchain/blockdao/blockdao.go index 0e837505e7..7507bc7de7 100644 --- a/blockchain/blockdao/blockdao.go +++ b/blockchain/blockdao/blockdao.go @@ -16,13 +16,11 @@ import ( "github.com/iotexproject/iotex-proto/golang/iotextypes" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "go.uber.org/zap" "github.com/iotexproject/iotex-core/v2/action" "github.com/iotexproject/iotex-core/v2/blockchain/block" "github.com/iotexproject/iotex-core/v2/db" "github.com/iotexproject/iotex-core/v2/pkg/lifecycle" - "github.com/iotexproject/iotex-core/v2/pkg/log" "github.com/iotexproject/iotex-core/v2/pkg/prometheustimer" ) @@ -74,6 +72,7 @@ type ( blockCache cache.LRUCache txLogCache cache.LRUCache tipHeight uint64 + checker *BlockIndexerChecker } ) @@ -85,6 +84,12 @@ func WithBlobStore(bs BlobStore) Option { } } +func WithIndexerTargetHeight(target uint64) Option { + return func(dao *blockDAO) { + dao.checker = NewBlockIndexerChecker(dao, target) + } +} + // NewBlockDAOWithIndexersAndCache returns a BlockDAO with indexers which will consume blocks appended, and // caches which will speed up reading func NewBlockDAOWithIndexersAndCache(blkStore BlockStore, indexers []BlockIndexer, cacheSize int, opts ...Option) BlockDAO { @@ -96,6 +101,7 @@ func NewBlockDAOWithIndexersAndCache(blkStore BlockStore, indexers []BlockIndexe blockStore: blkStore, indexers: indexers, } + blockDAO.checker = NewBlockIndexerChecker(blockDAO, 0) for _, opt := range opts { opt(blockDAO) } @@ -139,29 +145,7 @@ func (dao *blockDAO) Start(ctx context.Context) error { return err } atomic.StoreUint64(&dao.tipHeight, tipHeight) - return dao.checkIndexers(ctx) -} - -func (dao *blockDAO) checkIndexers(ctx context.Context) error { - checker := NewBlockIndexerChecker(dao) - for i, indexer := range dao.indexers { - if err := checker.CheckIndexer(ctx, indexer, 0, func(height uint64) { - if height%5000 == 0 { - log.L().Info( - "indexer is catching up.", - zap.Int("indexer", i), - zap.Uint64("height", height), - ) - } - }); err != nil { - return err - } - log.L().Info( - "indexer is up to date.", - zap.Int("indexer", i), - ) - } - return nil + return dao.checker.CheckIndexers(ctx, dao.indexers) } func (dao *blockDAO) Stop(ctx context.Context) error { diff --git a/blockchain/blockdao/blockdao_test.go b/blockchain/blockdao/blockdao_test.go index 302b15d673..b9c1405c51 100644 --- a/blockchain/blockdao/blockdao_test.go +++ b/blockchain/blockdao/blockdao_test.go @@ -137,7 +137,7 @@ func Test_blockDAO_checkIndexers(t *testing.T) { p.ApplyMethodReturn(&BlockIndexerChecker{}, "CheckIndexer", errors.New(t.Name())) - err := blockdao.checkIndexers(context.Background()) + err := blockdao.checker.CheckIndexers(context.Background(), blockdao.indexers) r.ErrorContains(err, t.Name()) }) @@ -158,7 +158,7 @@ func Test_blockDAO_checkIndexers(t *testing.T) { mockblockdao.EXPECT().Height().Return(daoTip, nil).Times(1) mockblockdao.EXPECT().GetBlockByHeight(gomock.Any()).Return(&block.Block{}, nil).Times(1) - err := blockdao.checkIndexers(ctx) + err := blockdao.checker.CheckIndexers(ctx, blockdao.indexers) r.NoError(err) }) } diff --git a/blockchain/blockdao/blockindexer.go b/blockchain/blockdao/blockindexer.go index 7b3d40c3a1..ea4cdbae4c 100644 --- a/blockchain/blockdao/blockindexer.go +++ b/blockchain/blockdao/blockindexer.go @@ -7,6 +7,7 @@ package blockdao import ( "context" + "fmt" "time" "github.com/pkg/errors" @@ -36,13 +37,14 @@ type ( // BlockIndexerChecker defines a checker of block indexer BlockIndexerChecker struct { - dao BlockDAO + dao BlockDAO + targetHeight uint64 } ) // NewBlockIndexerChecker creates a new block indexer checker -func NewBlockIndexerChecker(dao BlockDAO) *BlockIndexerChecker { - return &BlockIndexerChecker{dao: dao} +func NewBlockIndexerChecker(dao BlockDAO, target uint64) *BlockIndexerChecker { + return &BlockIndexerChecker{dao: dao, targetHeight: target} } // CheckIndexer checks a block indexer against block dao @@ -142,3 +144,34 @@ func (bic *BlockIndexerChecker) CheckIndexer(ctx context.Context, indexer BlockI } return nil } + +func (bic *BlockIndexerChecker) CheckIndexers(ctx context.Context, indexers []BlockIndexer) error { + for i, indexer := range indexers { + if err := bic.CheckIndexer(ctx, indexer, bic.targetHeight, func(height uint64) { + if height%5000 == 0 { + log.L().Info( + "indexer is catching up.", + zap.Int("indexer", i), + zap.Uint64("height", height), + ) + } + }); err != nil { + return err + } + log.L().Info( + "indexer is up to date.", + zap.Int("indexer", i), + ) + } + for _, indexer := range indexers { + height, err := indexer.Height() + if err != nil { + return errors.Wrap(err, "failed to get indexer height") + } + log.L().Info("indexer height", zap.Uint64("height", height), zap.String("indexer", fmt.Sprintf("%T", indexer)), zap.Any("content", indexer)) + } + if bic.targetHeight > 0 { + return errors.Errorf("indexers are up to target height %d", bic.targetHeight) + } + return nil +} diff --git a/blockchain/blockdao/blockindexer_test.go b/blockchain/blockdao/blockindexer_test.go index 3fc39b1a8b..1b6cf4d575 100644 --- a/blockchain/blockdao/blockindexer_test.go +++ b/blockchain/blockdao/blockindexer_test.go @@ -49,7 +49,7 @@ func TestCheckIndexer(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() mockDao := mock_blockdao.NewMockBlockDAO(ctrl) - checker := NewBlockIndexerChecker(mockDao) + checker := NewBlockIndexerChecker(mockDao, 0) indexer := mock_blockdao.NewMockBlockIndexer(ctrl) putBlocks := make([]*block.Block, 0) @@ -121,7 +121,7 @@ func TestCheckIndexerWithStart(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() mockDao := mock_blockdao.NewMockBlockDAO(ctrl) - checker := NewBlockIndexerChecker(mockDao) + checker := NewBlockIndexerChecker(mockDao, 0) indexer := mock_blockdao.NewMockBlockIndexerWithStart(ctrl) putBlocks := make([]*block.Block, 0) @@ -179,7 +179,7 @@ func TestBlockIndexerChecker_CheckIndexer(t *testing.T) { ctx := context.Background() store := mock_blockdao.NewMockBlockDAO(ctrl) dao := &blockDAO{blockStore: store} - bic := NewBlockIndexerChecker(dao) + bic := NewBlockIndexerChecker(dao, 0) indexer := mock_blockdao.NewMockBlockIndexer(ctrl) t.Run("WithoutBlockchainContext", func(t *testing.T) { diff --git a/blockchain/config.go b/blockchain/config.go index c14396ba8d..34825c963d 100644 --- a/blockchain/config.go +++ b/blockchain/config.go @@ -87,6 +87,8 @@ type ( FactoryDBType string `yaml:"factoryDBType"` // MintTimeout is the timeout for minting MintTimeout time.Duration `yaml:"-"` + // BlockIndexerTargetHeight is the target height for block indexer + BlockIndexerTargetHeight uint64 `yaml:"blockIndexerTargetHeight"` } ) diff --git a/blockindex/sync_indexers.go b/blockindex/sync_indexers.go index 903f5d7e92..fa57105869 100644 --- a/blockindex/sync_indexers.go +++ b/blockindex/sync_indexers.go @@ -7,6 +7,8 @@ package blockindex import ( "context" + "fmt" + "strings" "go.uber.org/zap" @@ -121,3 +123,12 @@ func (ig *SyncIndexers) initStartHeight() error { } return nil } + +func (ig *SyncIndexers) String() string { + var sb strings.Builder + for i, indexer := range ig.indexers { + height, _ := indexer.Height() + sb.WriteString(fmt.Sprintf("Indexer %d: %T, Height: %d\n", i, indexer, height)) + } + return sb.String() +} diff --git a/chainservice/builder.go b/chainservice/builder.go index a3ec0ff691..0eaf7dc8cd 100644 --- a/chainservice/builder.go +++ b/chainservice/builder.go @@ -336,6 +336,9 @@ func (builder *Builder) buildBlockDAO(forTest bool) error { if err != nil { return err } + if cfg.Chain.BlockIndexerTargetHeight > 0 { + opts = append(opts, blockdao.WithIndexerTargetHeight(cfg.Chain.BlockIndexerTargetHeight)) + } builder.cs.blockdao = blockdao.NewBlockDAOWithIndexersAndCache( store, indexers, cfg.DB.MaxCacheSize, opts...) From 083b02855ff6087f6522007db8015111ef1759ee Mon Sep 17 00:00:00 2001 From: envestcc Date: Mon, 1 Sep 2025 17:08:17 +0800 Subject: [PATCH 3/7] fix check indexer --- blockchain/blockdao/blockindexer.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/blockchain/blockdao/blockindexer.go b/blockchain/blockdao/blockindexer.go index ea4cdbae4c..cc3bf0f3b8 100644 --- a/blockchain/blockdao/blockindexer.go +++ b/blockchain/blockdao/blockindexer.go @@ -68,10 +68,6 @@ func (bic *BlockIndexerChecker) CheckIndexer(ctx context.Context, indexer BlockI if tipHeight > daoTip { return errors.New("indexer tip height cannot by higher than dao tip height") } - tipBlk, err := bic.dao.GetBlockByHeight(tipHeight) - if err != nil { - return err - } if targetHeight == 0 || targetHeight > daoTip { targetHeight = daoTip } @@ -82,6 +78,10 @@ func (bic *BlockIndexerChecker) CheckIndexer(ctx context.Context, indexer BlockI startHeight = indexStartHeight } } + tipBlk, err := bic.dao.GetBlockByHeight(startHeight - 1) + if err != nil { + return err + } for i := startHeight; i <= targetHeight; i++ { // ternimate if context is done if err := ctx.Err(); err != nil { From 9a219d37e33f52c878ebab606a03cb4acf3276a5 Mon Sep 17 00:00:00 2001 From: envestcc Date: Mon, 1 Sep 2025 21:31:28 +0800 Subject: [PATCH 4/7] start admin server first --- server/itx/server.go | 80 ++++++++++++++++++++++++-------------------- 1 file changed, 44 insertions(+), 36 deletions(-) diff --git a/server/itx/server.go b/server/itx/server.go index f1c3e64f23..667a4f5cd3 100644 --- a/server/itx/server.go +++ b/server/itx/server.go @@ -266,6 +266,15 @@ func (s *Server) Dispatcher() dispatcher.Dispatcher { // StartServer starts a node server func StartServer(ctx context.Context, svr *Server, probeSvr *probe.Server, cfg config.Config) { + + if adminSvr := startAdminServer(ctx, svr, cfg); adminSvr != nil { + defer func() { + if err := adminSvr.Shutdown(ctx); err != nil { + log.L().Error("Error when serving metrics data.", zap.Error(err)) + } + }() + } + if err := svr.Start(ctx); err != nil { log.L().Fatal("Failed to start server.", zap.Error(err)) return @@ -298,44 +307,43 @@ func StartServer(ctx context.Context, svr *Server, probeSvr *probe.Server, cfg c }() } - var adminserv http.Server - if cfg.System.HTTPAdminPort > 0 { - mux := http.NewServeMux() - log.RegisterLevelConfigMux(mux) - haCtl := ha.New(svr.rootChainService.Consensus()) - mux.Handle("/ha", http.HandlerFunc(haCtl.Handle)) - mux.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index)) - mux.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline)) - mux.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile)) - mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol)) - mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace)) - mux.Handle("/pause", http.HandlerFunc(svr.pauseMgr.HandlePause)) - mux.Handle("/unpause", http.HandlerFunc(svr.pauseMgr.HandleUnPause)) - mux.Handle("/pause/", http.HandlerFunc(svr.pauseMgr.HandlePauseAtHeight)) - - port := fmt.Sprintf(":%d", cfg.System.HTTPAdminPort) - adminserv = httputil.NewServer(port, mux) - defer func() { - if err := adminserv.Shutdown(ctx); err != nil { - log.L().Error("Error when serving metrics data.", zap.Error(err)) - } - }() - go func() { - runtime.SetMutexProfileFraction(1) - runtime.SetBlockProfileRate(1) - ln, err := httputil.LimitListener(adminserv.Addr) - if err != nil { - log.L().Error("Error when listen to profiling port.", zap.Error(err)) - return - } - if err := adminserv.Serve(ln); err != nil { - log.L().Error("Error when serving performance profiling data.", zap.Error(err)) - } - }() - } - <-ctx.Done() if err := probeSvr.TurnOff(); err != nil { log.L().Panic("Failed to turn off probe server.", zap.Error(err)) } } + +func startAdminServer(ctx context.Context, svr *Server, cfg config.Config) *http.Server { + if cfg.System.HTTPAdminPort == 0 { + return nil + } + mux := http.NewServeMux() + log.RegisterLevelConfigMux(mux) + haCtl := ha.New(svr.rootChainService.Consensus()) + mux.Handle("/ha", http.HandlerFunc(haCtl.Handle)) + mux.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index)) + mux.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline)) + mux.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile)) + mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol)) + mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace)) + mux.Handle("/pause", http.HandlerFunc(svr.pauseMgr.HandlePause)) + mux.Handle("/unpause", http.HandlerFunc(svr.pauseMgr.HandleUnPause)) + mux.Handle("/pause/", http.HandlerFunc(svr.pauseMgr.HandlePauseAtHeight)) + + port := fmt.Sprintf(":%d", cfg.System.HTTPAdminPort) + adminserv := httputil.NewServer(port, mux) + + go func() { + runtime.SetMutexProfileFraction(1) + runtime.SetBlockProfileRate(1) + ln, err := httputil.LimitListener(adminserv.Addr) + if err != nil { + log.L().Error("Error when listen to profiling port.", zap.Error(err)) + return + } + if err := adminserv.Serve(ln); err != nil { + log.L().Error("Error when serving performance profiling data.", zap.Error(err)) + } + }() + return &adminserv +} From 6d859e8e5a8d87caf0d1e2107bce2165521ee441 Mon Sep 17 00:00:00 2001 From: envestcc Date: Tue, 2 Sep 2025 13:02:45 +0800 Subject: [PATCH 5/7] auto flush kvstore --- blockchain/config.go | 2 ++ chainservice/builder.go | 6 +++++ db/kvstorewithbuffer.go | 53 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 61 insertions(+) diff --git a/blockchain/config.go b/blockchain/config.go index 34825c963d..f43d3aad08 100644 --- a/blockchain/config.go +++ b/blockchain/config.go @@ -89,6 +89,8 @@ type ( MintTimeout time.Duration `yaml:"-"` // BlockIndexerTargetHeight is the target height for block indexer BlockIndexerTargetHeight uint64 `yaml:"blockIndexerTargetHeight"` + // KVStoreAutoFlushThreshold is the threshold for auto flushing the KVStore + TrieDBAutoFlushThreshold int `yaml:"trieDBAutoFlushThreshold"` } ) diff --git a/chainservice/builder.go b/chainservice/builder.go index 0eaf7dc8cd..4618486e07 100644 --- a/chainservice/builder.go +++ b/chainservice/builder.go @@ -179,6 +179,12 @@ func (builder *Builder) createFactory(forTest bool) (factory.Factory, error) { if err != nil { return nil, err } + if builder.cfg.Chain.TrieDBAutoFlushThreshold > 0 { + dao, err = db.NewKVStoreAutoFlush(dao, builder.cfg.Chain.TrieDBAutoFlushThreshold) + if err != nil { + return nil, err + } + } return factory.NewStateDB(factoryCfg, dao, opts...) } diff --git a/db/kvstorewithbuffer.go b/db/kvstorewithbuffer.go index 69952d62c4..5bdd7d903b 100644 --- a/db/kvstorewithbuffer.go +++ b/db/kvstorewithbuffer.go @@ -6,8 +6,10 @@ import ( "fmt" "github.com/pkg/errors" + "go.uber.org/zap" "github.com/iotexproject/iotex-core/v2/db/batch" + "github.com/iotexproject/iotex-core/v2/pkg/log" ) type ( @@ -32,6 +34,11 @@ type ( buffer batch.CachedBatch } + kvStoreAutoWriteBatch struct { + *kvStoreWithBuffer + threshold int + } + // KVStoreFlusher is a wrapper of KVStoreWithBuffer, which has flush api KVStoreFlusher interface { SerializeQueue() []byte @@ -110,6 +117,25 @@ func NewKVStoreFlusher(store KVStore, buffer batch.CachedBatch, opts ...KVStoreF return f, nil } +// NewKVStoreAutoFlush creates a new KVStoreAutoFlusher +func NewKVStoreAutoFlush(store KVStore, threshold int) (KVStore, error) { + if store == nil { + return nil, errors.New("store cannot be nil") + } + if threshold <= 0 { + return nil, errors.New("invalid threshold") + } + buffer := batch.NewCachedBatch() + kva := &kvStoreAutoWriteBatch{ + kvStoreWithBuffer: &kvStoreWithBuffer{ + store: store, + buffer: buffer, + }, + threshold: threshold, + } + return kva, nil +} + func (f *flusher) Flush() error { if err := f.kvb.store.WriteBatch(f.kvb.buffer.Translate(f.flushTranslate)); err != nil { return err @@ -250,3 +276,30 @@ func (kvb *kvStoreWithBuffer) WriteBatch(b batch.KVStoreBatch) (err error) { kvb.buffer.Append(b) return nil } + +func (kva *kvStoreAutoWriteBatch) WriteBatch(b batch.KVStoreBatch) (err error) { + log.L().Debug("writing batch", zap.Int("size", kva.buffer.Size())) + kva.buffer.Append(b) + if kva.buffer.Size() >= kva.threshold { + kva.flush() + } + return nil +} + +func (kva *kvStoreAutoWriteBatch) Stop(ctx context.Context) (err error) { + log.L().Debug("stopping kv store with final flush") + if err := kva.flush(); err != nil { + return err + } + return kva.kvStoreWithBuffer.Stop(ctx) +} + +func (kva *kvStoreAutoWriteBatch) flush() (err error) { + log.L().Debug("flushing batch", zap.Int("size", kva.buffer.Size())) + if err := kva.store.WriteBatch(kva.buffer.Translate(func(wi *batch.WriteInfo) *batch.WriteInfo { return wi })); err != nil { + return err + } + kva.buffer.Lock() + kva.buffer.ClearAndUnlock() + return nil +} From 1d64e77abf5077e709af6bb8260d840c59247742 Mon Sep 17 00:00:00 2001 From: envestcc Date: Tue, 2 Sep 2025 15:41:18 +0800 Subject: [PATCH 6/7] still stop if start error --- server/itx/server.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/server/itx/server.go b/server/itx/server.go index 667a4f5cd3..ad1f5da81c 100644 --- a/server/itx/server.go +++ b/server/itx/server.go @@ -183,7 +183,11 @@ func (s *Server) Start(ctx context.Context) error { // Stop stops the server func (s *Server) Stop(ctx context.Context) error { - defer s.subModuleCancel() + defer func() { + if s.subModuleCancel != nil { + s.subModuleCancel() + } + }() if err := s.nodeStats.Stop(ctx); err != nil { return errors.Wrap(err, "error when stopping node stats") } @@ -275,15 +279,16 @@ func StartServer(ctx context.Context, svr *Server, probeSvr *probe.Server, cfg c }() } - if err := svr.Start(ctx); err != nil { - log.L().Fatal("Failed to start server.", zap.Error(err)) - return - } defer func() { if err := svr.Stop(context.Background()); err != nil { log.L().Panic("Failed to stop server.", zap.Error(err)) } }() + if err := svr.Start(ctx); err != nil { + log.L().Fatal("Failed to start server.", zap.Error(err)) + return + } + if _, isGateway := cfg.Plugins[config.GatewayPlugin]; isGateway && cfg.API.ReadyDuration > 0 { // wait for a while to make sure the server is ready // The original intention was to ensure that all transactions that were not received during the restart were included in block, thereby avoiding inconsistencies in the state of the API node. From 9593fbebc822a53c106f1b079342002ccbb67086 Mon Sep 17 00:00:00 2001 From: envestcc Date: Tue, 2 Sep 2025 15:44:33 +0800 Subject: [PATCH 7/7] fix stop if not start --- actpool/actionstore.go | 3 +++ actsync/actionsync.go | 3 +++ blockindex/bloomfilterindexer.go | 15 ++++++++++++++- blockindex/contractstaking/indexer.go | 3 +++ blockindex/indexer.go | 11 +++++++++++ consensus/consensus.go | 2 +- dispatcher/dispatcher.go | 3 +++ nodeinfo/manager.go | 13 ++++++++++++- pkg/messagebatcher/batchwriter.go | 3 +++ server/itx/nodestats/nodestats.go | 3 +++ state/factory/daoretrofitter.go | 16 +++++++++++++++- systemcontractindex/stakingindex/index.go | 3 +++ 12 files changed, 74 insertions(+), 4 deletions(-) diff --git a/actpool/actionstore.go b/actpool/actionstore.go index 6dfcd54d5d..89f9c48b2f 100644 --- a/actpool/actionstore.go +++ b/actpool/actionstore.go @@ -144,6 +144,9 @@ func (s *actionStore) Open(onData onAction) error { func (s *actionStore) Close() error { s.lock.Lock() defer s.lock.Unlock() + if !s.IsReady() { + return nil + } if err := s.TurnOff(); err != nil { return err diff --git a/actsync/actionsync.go b/actsync/actionsync.go index ffe3313854..f0e63e9ccb 100644 --- a/actsync/actionsync.go +++ b/actsync/actionsync.go @@ -90,6 +90,9 @@ func (as *ActionSync) Start(ctx context.Context) error { // Stop stops the action syncer func (as *ActionSync) Stop(ctx context.Context) error { + if !as.IsReady() { + return nil + } log.L().Info("stopping action sync") if err := as.TurnOff(); err != nil { return err diff --git a/blockindex/bloomfilterindexer.go b/blockindex/bloomfilterindexer.go index 37309566ed..99be0cea2c 100644 --- a/blockindex/bloomfilterindexer.go +++ b/blockindex/bloomfilterindexer.go @@ -20,6 +20,7 @@ import ( "github.com/iotexproject/iotex-core/v2/blockchain/blockdao" "github.com/iotexproject/iotex-core/v2/db" "github.com/iotexproject/iotex-core/v2/db/batch" + "github.com/iotexproject/iotex-core/v2/pkg/lifecycle" "github.com/iotexproject/iotex-core/v2/pkg/util/byteutil" ) @@ -60,6 +61,7 @@ type ( // bloomfilterIndexer is a struct for bloomfilter indexer bloomfilterIndexer struct { + lifecycle.Readiness mutex sync.RWMutex // mutex for curRangeBloomfilter kvStore db.KVStore rangeSize uint64 @@ -95,6 +97,9 @@ func (bfx *bloomfilterIndexer) Start(ctx context.Context) error { if err := bfx.kvStore.Start(ctx); err != nil { return err } + if err := bfx.TurnOn(); err != nil { + return err + } bfx.mutex.Lock() defer bfx.mutex.Unlock() @@ -142,7 +147,15 @@ func (bfx *bloomfilterIndexer) initRangeBloomFilter(height uint64) error { // Stop stops the bloomfilter indexer func (bfx *bloomfilterIndexer) Stop(ctx context.Context) error { - bfx.totalRange.Close() + if bfx.totalRange != nil { + bfx.totalRange.Close() + } + if !bfx.IsReady() { + return nil + } + if err := bfx.TurnOff(); err != nil { + return err + } return bfx.kvStore.Stop(ctx) } diff --git a/blockindex/contractstaking/indexer.go b/blockindex/contractstaking/indexer.go index 8bb29e76a2..12358d0182 100644 --- a/blockindex/contractstaking/indexer.go +++ b/blockindex/contractstaking/indexer.go @@ -112,6 +112,9 @@ func (s *Indexer) start(ctx context.Context) error { // Stop stops the indexer func (s *Indexer) Stop(ctx context.Context) error { + if !s.IsReady() { + return nil + } if err := s.kvstore.Stop(ctx); err != nil { return err } diff --git a/blockindex/indexer.go b/blockindex/indexer.go index c7cd07d845..ce010c397f 100644 --- a/blockindex/indexer.go +++ b/blockindex/indexer.go @@ -21,6 +21,7 @@ import ( "github.com/iotexproject/iotex-core/v2/blockchain/block" "github.com/iotexproject/iotex-core/v2/db" "github.com/iotexproject/iotex-core/v2/db/batch" + "github.com/iotexproject/iotex-core/v2/pkg/lifecycle" "github.com/iotexproject/iotex-core/v2/pkg/util/byteutil" ) @@ -62,6 +63,7 @@ type ( // blockIndexer implements the Indexer interface blockIndexer struct { + lifecycle.Readiness mutex sync.RWMutex genesisHash hash.Hash256 kvStore db.KVStoreWithRange @@ -95,6 +97,9 @@ func (x *blockIndexer) Start(ctx context.Context) error { if err := x.kvStore.Start(ctx); err != nil { return err } + if err := x.TurnOn(); err != nil { + return err + } // create the total block and action index var err error if x.tbk, err = db.NewCountingIndexNX(x.kvStore, _totalBlocksBucket); err != nil { @@ -115,6 +120,12 @@ func (x *blockIndexer) Start(ctx context.Context) error { // Stop stops the indexer func (x *blockIndexer) Stop(ctx context.Context) error { + if !x.IsReady() { + return nil + } + if err := x.TurnOff(); err != nil { + return err + } return x.kvStore.Stop(ctx) } diff --git a/consensus/consensus.go b/consensus/consensus.go index b1505f260d..51bd59ed3d 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -225,7 +225,7 @@ func (c *IotxConsensus) Stop(ctx context.Context) error { err := c.scheme.Stop(ctx) if err != nil { - return errors.Wrapf(err, "failed to stop scheme %s", c.cfg.Scheme) + log.Logger("consensus").Error("Failed to stop the consensus scheme.", zap.Error(err)) } return nil } diff --git a/dispatcher/dispatcher.go b/dispatcher/dispatcher.go index 67ee89fc74..913e0a446b 100644 --- a/dispatcher/dispatcher.go +++ b/dispatcher/dispatcher.go @@ -164,6 +164,9 @@ func (d *IotxDispatcher) Start(ctx context.Context) error { // Stop gracefully shuts down the dispatcher by stopping all handlers and waiting for them to finish. func (d *IotxDispatcher) Stop(ctx context.Context) error { + if !d.IsReady() { + return nil + } if err := d.TurnOff(); err != nil { log.L().Warn("Dispatcher already in the process of shutting down.") return err diff --git a/nodeinfo/manager.go b/nodeinfo/manager.go index 69cc3e8379..d2013b8f24 100644 --- a/nodeinfo/manager.go +++ b/nodeinfo/manager.go @@ -52,6 +52,7 @@ type ( // InfoManager manage delegate node info InfoManager struct { lifecycle.Lifecycle + lifecycle.Readiness version string broadcastList atomic.Value // []string, whitelist to force enable broadcast nodeMap *lru.Cache @@ -121,11 +122,21 @@ func NewInfoManager(cfg *Config, t transmitter, ch chain, broadcastListFunc getB // Start start delegate broadcast task func (dm *InfoManager) Start(ctx context.Context) error { dm.updateBroadcastList() - return dm.OnStart(ctx) + err := dm.OnStart(ctx) + if err != nil { + return err + } + return dm.TurnOn() } // Stop stop delegate broadcast task func (dm *InfoManager) Stop(ctx context.Context) error { + if !dm.IsReady() { + return nil + } + if err := dm.TurnOff(); err != nil { + return err + } return dm.OnStop(ctx) } diff --git a/pkg/messagebatcher/batchwriter.go b/pkg/messagebatcher/batchwriter.go index e03a2409a4..0e7fad92ec 100644 --- a/pkg/messagebatcher/batchwriter.go +++ b/pkg/messagebatcher/batchwriter.go @@ -83,6 +83,9 @@ func (bm *Manager) Start() error { // Stop stops the Manager func (bm *Manager) Stop() error { + if bm.cancelHanlders == nil { + return nil + } bm.cancelHanlders() return nil } diff --git a/server/itx/nodestats/nodestats.go b/server/itx/nodestats/nodestats.go index ee06a5ca55..f8586782c5 100644 --- a/server/itx/nodestats/nodestats.go +++ b/server/itx/nodestats/nodestats.go @@ -40,6 +40,9 @@ func (s *NodeStats) Start(ctx context.Context) error { // Stop stops the node stats func (s *NodeStats) Stop(ctx context.Context) error { + if s.task == nil { + return nil + } return s.task.Stop(ctx) } diff --git a/state/factory/daoretrofitter.go b/state/factory/daoretrofitter.go index 7170fcabed..ef5955f69f 100644 --- a/state/factory/daoretrofitter.go +++ b/state/factory/daoretrofitter.go @@ -11,10 +11,12 @@ import ( "github.com/pkg/errors" "github.com/iotexproject/iotex-core/v2/db" + "github.com/iotexproject/iotex-core/v2/pkg/lifecycle" "github.com/iotexproject/iotex-core/v2/pkg/util/byteutil" ) type daoRTF struct { + lifecycle.Readiness dao db.KVStore } @@ -25,10 +27,22 @@ func newDaoRetrofitter(dao db.KVStore) *daoRTF { } func (rtf *daoRTF) Start(ctx context.Context) error { - return rtf.dao.Start(ctx) + if err := rtf.dao.Start(ctx); err != nil { + return err + } + if err := rtf.TurnOn(); err != nil { + return err + } + return nil } func (rtf *daoRTF) Stop(ctx context.Context) error { + if !rtf.IsReady() { + return nil + } + if err := rtf.TurnOff(); err != nil { + return err + } return rtf.dao.Stop(ctx) } diff --git a/systemcontractindex/stakingindex/index.go b/systemcontractindex/stakingindex/index.go index 7f69c9fca0..f52a4bd38a 100644 --- a/systemcontractindex/stakingindex/index.go +++ b/systemcontractindex/stakingindex/index.go @@ -146,6 +146,9 @@ func (s *Indexer) start(ctx context.Context) error { func (s *Indexer) Stop(ctx context.Context) error { s.mutex.Lock() defer s.mutex.Unlock() + if !s.common.Started() { + return nil + } return s.common.Stop(ctx) }