diff --git a/cmd/run.go b/cmd/run.go index dffa2d6fe..6c008d890 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -628,6 +628,7 @@ func runL1MultiDownloaderIfNeeded( l1Client, // rpcClient nil, // storage nil, // blockNotifierManager + nil, // reorgProcessor ) if err != nil { return nil, nil, fmt.Errorf("failed to create L1 MultiDownloader: %w", err) diff --git a/common/block_range.go b/common/block_range.go index 9c486df88..b4929ffc1 100644 --- a/common/block_range.go +++ b/common/block_range.go @@ -191,3 +191,14 @@ func ChunkedRangeQuery[T any]( return all, nil } + +func (b BlockRange) ListBlockNumbers() []uint64 { + if b.IsEmpty() { + return []uint64{} + } + blockNumbers := make([]uint64, 0, b.CountBlocks()) + for i := b.FromBlock; i <= b.ToBlock; i++ { + blockNumbers = append(blockNumbers, i) + } + return blockNumbers +} diff --git a/common/block_range_test.go b/common/block_range_test.go index be22e95d4..86aa4d940 100644 --- a/common/block_range_test.go +++ b/common/block_range_test.go @@ -471,3 +471,12 @@ func TestChunkedRangeQuery_EmptyRange(t *testing.T) { require.NoError(t, err) require.Equal(t, empty, result) } + +func TestBlockRange_ListBlockNumbers(t *testing.T) { + bn1 := NewBlockRange(1, 1) + require.Equal(t, []uint64{1}, bn1.ListBlockNumbers()) + bn2 := NewBlockRange(3, 5) + require.Equal(t, []uint64{3, 4, 5}, bn2.ListBlockNumbers()) + bn3 := NewBlockRange(0, 0) + require.Equal(t, []uint64{}, bn3.ListBlockNumbers()) +} diff --git a/common/time_tracker.go b/common/time_tracker.go index 4775d3ced..286be69dd 100644 --- a/common/time_tracker.go +++ b/common/time_tracker.go @@ -18,7 +18,7 @@ type TimeTracker struct { func (t *TimeTracker) String() string { return "TimeTracker{times=" + strconv.Itoa(int(t.times)) + - "lastDuration=" + t.lastDuration.String() + + ", lastDuration=" + t.lastDuration.String() + ", accumulated=" + t.accumulated.String() + "}" } diff --git a/config/default.go b/config/default.go index 63f505ca0..226fb7b78 100644 --- a/config/default.go +++ b/config/default.go @@ -335,6 +335,7 @@ BlockFinalityForL1InfoTree = "{{AggSender.BlockFinalityForL1InfoTree}}" MaxParallelBlockHeaderRetrieval = 30 BlockFinality = "FinalizedBlock" WaitPeriodToCheckCatchUp = "10s" + PeriodToCheckReorgs = "5s" [L2Multidownloader] Enabled = false @@ -343,4 +344,5 @@ BlockFinalityForL1InfoTree = "{{AggSender.BlockFinalityForL1InfoTree}}" MaxParallelBlockHeaderRetrieval = 30 BlockFinality = "LatestBlock" WaitPeriodToCheckCatchUp = "10s" + PeriodToCheckReorgs = "5s" ` diff --git a/etherman/batch_requests.go b/etherman/batch_requests.go index d46033475..c6e5e9142 100644 --- a/etherman/batch_requests.go +++ b/etherman/batch_requests.go @@ -56,7 +56,7 @@ func RetrieveBlockHeaders(ctx context.Context, ethClient aggkittypes.BaseEthereumClienter, rpcClient aggkittypes.RPCClienter, blockNumbers []uint64, - maxConcurrency int) ([]*aggkittypes.BlockHeader, error) { + maxConcurrency int) (aggkittypes.ListBlockHeaders, error) { if rpcClient != nil { return RetrieveBlockHeadersBatch(ctx, log, rpcClient, blockNumbers, maxConcurrency) } @@ -69,11 +69,11 @@ func RetrieveBlockHeadersBatch(ctx context.Context, log aggkitcommon.Logger, rpcClient aggkittypes.RPCClienter, blockNumbers []uint64, - maxConcurrency int) ([]*aggkittypes.BlockHeader, error) { + maxConcurrency int) (aggkittypes.ListBlockHeaders, error) { return retrieveBlockHeadersInBatchParallel( ctx, log, - func(ctx context.Context, blocks []uint64) ([]*aggkittypes.BlockHeader, error) { + func(ctx context.Context, blocks []uint64) (aggkittypes.ListBlockHeaders, error) { return retrieveBlockHeadersInBatch(ctx, log, rpcClient, blocks) }, blockNumbers, batchRequestLimitHTTP, maxConcurrency) } @@ -88,8 +88,8 @@ func RetrieveBlockHeadersLegacy(ctx context.Context, return retrieveBlockHeadersInBatchParallel( ctx, log, - func(ctx context.Context, blocks []uint64) ([]*aggkittypes.BlockHeader, error) { - result := make([]*aggkittypes.BlockHeader, len(blocks)) + func(ctx context.Context, blocks []uint64) (aggkittypes.ListBlockHeaders, error) { + result := aggkittypes.NewListBlockHeadersEmpty(len(blocks)) for i, blockNumber := range blocks { header, err := ethClient.HeaderByNumber(ctx, big.NewInt(int64(blockNumber))) if err != nil { @@ -107,9 +107,9 @@ func retrieveBlockHeadersInBatch(ctx context.Context, log aggkitcommon.Logger, rpcClient aggkittypes.RPCClienter, blockNumbers []uint64, -) ([]*aggkittypes.BlockHeader, error) { +) (aggkittypes.ListBlockHeaders, error) { if len(blockNumbers) == 0 { - return make([]*aggkittypes.BlockHeader, 0), nil + return aggkittypes.NewListBlockHeadersEmpty(0), nil } headers := make([]*blockRawEth, len(blockNumbers)) timeTracker := aggkitcommon.NewTimeTracker() @@ -146,9 +146,9 @@ func retrieveBlockHeadersInBatch(ctx context.Context, func retrieveBlockHeadersInBatchParallel( ctx context.Context, logger aggkitcommon.Logger, - funcRetrieval func(context.Context, []uint64) ([]*aggkittypes.BlockHeader, error), + funcRetrieval func(context.Context, []uint64) (aggkittypes.ListBlockHeaders, error), blockNumbers []uint64, - chunckSize, maxConcurrency int) ([]*aggkittypes.BlockHeader, error) { + chunckSize, maxConcurrency int) (aggkittypes.ListBlockHeaders, error) { var mu sync.Mutex g, ctx := errgroup.WithContext(ctx) g.SetLimit(maxConcurrency) diff --git a/etherman/batch_requests_test.go b/etherman/batch_requests_test.go index d7b37179c..0f6b9d5a6 100644 --- a/etherman/batch_requests_test.go +++ b/etherman/batch_requests_test.go @@ -247,7 +247,7 @@ func TestRetrieveBlockHeadersInBatchParallel(t *testing.T) { result, err := retrieveBlockHeadersInBatchParallel( ctx, logger, - func(ctx context.Context, blocks []uint64) ([]*aggkittypes.BlockHeader, error) { + func(ctx context.Context, blocks []uint64) (aggkittypes.ListBlockHeaders, error) { t.Logf("Retrieving blocks in batch: %v", blocks) headers := make([]*aggkittypes.BlockHeader, len(blocks)) for i, bn := range blocks { diff --git a/etherman/block_notifier/block_notifier_polling.go b/etherman/block_notifier/block_notifier_polling.go index 51ed45dd8..7401b026a 100644 --- a/etherman/block_notifier/block_notifier_polling.go +++ b/etherman/block_notifier/block_notifier_polling.go @@ -161,7 +161,7 @@ func (b *BlockNotifierPolling) step(ctx context.Context, BlockFinalityType: b.config.BlockFinalityType, } if previousState.lastBlockSeen > currentBlock { - b.logger.Warnf("Block number decreased [finality:%s]: %d -> %d", + b.logger.Infof("Block number decreased [finality:%s]: %d -> %d", b.config.BlockFinalityType.String(), previousState.lastBlockSeen, currentBlock) // It start from scratch because something fails in calculation of block period newState := previousState.initialBlock(currentBlock) @@ -170,7 +170,7 @@ func (b *BlockNotifierPolling) step(ctx context.Context, if currentBlock-previousState.lastBlockSeen != 1 { if !b.config.BlockFinalityType.IsSafe() && !b.config.BlockFinalityType.IsFinalized() { - b.logger.Warnf("Missed block(s) [finality:%s]: %d -> %d", + b.logger.Infof("Missed block(s) [finality:%s]: %d -> %d", b.config.BlockFinalityType.String(), previousState.lastBlockSeen, currentBlock) } diff --git a/etherman/block_notifier/block_notifier_polling_test.go b/etherman/block_notifier/block_notifier_polling_test.go index 343f82f7b..71fe7be42 100644 --- a/etherman/block_notifier/block_notifier_polling_test.go +++ b/etherman/block_notifier/block_notifier_polling_test.go @@ -21,7 +21,7 @@ import ( ) func TestExploratoryBlockNotifierPolling(t *testing.T) { - t.Skip() + t.Skip("is an exploratory test that requires an external RPC") urlRPCL1 := os.Getenv("L1URL") fmt.Println("URL=", urlRPCL1) cfg := ðermanconfig.RPCClientConfig{ @@ -117,7 +117,8 @@ func TestBlockNotifierPollingStep(t *testing.T) { }, mockLoggerFn: func() aggkitcommon.Logger { mockLogger := commonmocks.NewLogger(t) - mockLogger.EXPECT().Warnf("Missed block(s) [finality:%s]: %d -> %d", aggkittypes.LatestBlock.String(), uint64(100), uint64(105)).Once() + mockLogger.EXPECT().Infof("Missed block(s) [finality:%s]: %d -> %d", aggkittypes.LatestBlock.String(), uint64(100), uint64(105)).Once() + mockLogger.EXPECT().Infof(mock.Anything, mock.Anything).Maybe() return mockLogger }, headerByNumberError: false, diff --git a/l1infotreesync/e2e_test.go b/l1infotreesync/e2e_test.go index d9621346e..fa901d25c 100644 --- a/l1infotreesync/e2e_test.go +++ b/l1infotreesync/e2e_test.go @@ -93,6 +93,7 @@ func TestE2E(t *testing.T) { nil, // rpcClient nil, nil, + nil, // reorgProcessor will be created internally ) require.NoError(t, err) } else { @@ -175,8 +176,9 @@ func TestWithReorgs(t *testing.T) { "testMD", etherman.NewDefaultEthClient(client.Client(), nil, nil), nil, // rpcClient - nil, - nil, + nil, // Storage will be created internally + nil, // blockNotifierManager will be created internally + nil, // reorgProcessor will be created internally ) require.NoError(t, err) } else { @@ -328,8 +330,9 @@ func TestStressAndReorgs(t *testing.T) { "testMD", etherman.NewDefaultEthClient(client.Client(), nil, nil), nil, // rpcClient - nil, - nil, + nil, // Storage will be created internally + nil, // blockNotifierManager will be created internally + nil, // reorgProcessor will be created internally ) require.NoError(t, err) } else { diff --git a/multidownloader/config.go b/multidownloader/config.go index 8e9a13847..442b57b5e 100644 --- a/multidownloader/config.go +++ b/multidownloader/config.go @@ -34,12 +34,16 @@ type Config struct { BlockFinality aggkittypes.BlockNumberFinality // WaitPeriodToCheckCatchUp is the duration to wait before checking again if logs are not yet available WaitPeriodToCheckCatchUp types.Duration + // PeriodToCheckReorgs is the duration to wait before checking for reorgs + // If is 0 reorgs are checked only when a new block appears + PeriodToCheckReorgs types.Duration } const ( defaultBlockChunkSize = 10000 defaultMaxParallelBlockHeaderRetrieval = 30 defaultWaitPeriodToCheckCatchUp = time.Second * 10 + defaultPeriodToCheckReorgs = time.Second * 5 ) func NewConfigDefault(name string, basePathDB string) Config { @@ -54,6 +58,7 @@ func NewConfigDefault(name string, basePathDB string) Config { MaxParallelBlockHeaderRetrieval: defaultMaxParallelBlockHeaderRetrieval, BlockFinality: aggkittypes.FinalizedBlock, WaitPeriodToCheckCatchUp: types.NewDuration(defaultWaitPeriodToCheckCatchUp), + PeriodToCheckReorgs: types.NewDuration(defaultPeriodToCheckReorgs), } } @@ -75,10 +80,12 @@ func (cfg *Config) Validate() error { func (cfg *Config) String() string { return fmt.Sprintf("MultidownloaderConfig{Enabled:%t, BlockChunkSize:%d, "+ - "MaxParallelBlockHeaderRetrieval:%d, BlockFinality:%s, WaitPeriodToCheckCatchUp:%s}", + "MaxParallelBlockHeaderRetrieval:%d, BlockFinality:%s, WaitPeriodToCheckCatchUp:%s, "+ + "PeriodToCheckReorgs:%s}", cfg.Enabled, cfg.BlockChunkSize, cfg.MaxParallelBlockHeaderRetrieval, cfg.BlockFinality.String(), - cfg.WaitPeriodToCheckCatchUp.String()) + cfg.WaitPeriodToCheckCatchUp.String(), + cfg.PeriodToCheckReorgs.String()) } diff --git a/multidownloader/config_test.go b/multidownloader/config_test.go index 2a628857a..a501a73b8 100644 --- a/multidownloader/config_test.go +++ b/multidownloader/config_test.go @@ -2,7 +2,6 @@ package multidownloader import ( "testing" - "time" "github.com/agglayer/aggkit/config/types" aggkittypes "github.com/agglayer/aggkit/types" @@ -13,10 +12,12 @@ func TestNewConfigDefault(t *testing.T) { cfg := NewConfigDefault("l1", "/tmp/aggkit/") require.Equal(t, false, cfg.Enabled) require.Equal(t, "/tmp/aggkit/l1_multidownloader.sqlite", cfg.StoragePath) - require.Equal(t, uint32(10000), cfg.BlockChunkSize, "BlockChunkSize should be 10000") - require.Equal(t, 30, cfg.MaxParallelBlockHeaderRetrieval, "MaxParallelBlockHeaderRetrieval should be 30") + require.Equal(t, uint32(defaultBlockChunkSize), cfg.BlockChunkSize, "BlockChunkSize should be 10000") + require.Equal(t, defaultMaxParallelBlockHeaderRetrieval, cfg.MaxParallelBlockHeaderRetrieval, "MaxParallelBlockHeaderRetrieval should be 30") require.Equal(t, aggkittypes.FinalizedBlock, cfg.BlockFinality, "BlockFinality should be FinalizedBlock") - require.Equal(t, types.NewDuration(time.Second*10), cfg.WaitPeriodToCheckCatchUp, "WaitPeriodToCheckCatchUp should be 10 seconds") + require.Equal(t, types.NewDuration(defaultWaitPeriodToCheckCatchUp), cfg.WaitPeriodToCheckCatchUp, "WaitPeriodToCheckCatchUp should be 10 seconds") + require.Equal(t, types.NewDuration(defaultPeriodToCheckReorgs), cfg.PeriodToCheckReorgs, "PeriodToCheckReorgs should be 5 seconds") + require.False(t, cfg.Enabled, "Enabled should be false by default") } @@ -102,5 +103,6 @@ func TestConfig_String(t *testing.T) { require.Contains(t, str, "MaxParallelBlockHeaderRetrieval", "String() should contain MaxParallelBlockHeaderRetrieval") require.Contains(t, str, "BlockFinality", "String() should contain BlockFinality") require.Contains(t, str, "WaitPeriodToCheckCatchUp", "String() should contain WaitPeriodToCheckCatchUp") + require.Contains(t, str, "PeriodToCheckReorgs", "String() should contain PeriodToCheckReorgs") require.Contains(t, str, "Enabled", "String() should contain Enabled") } diff --git a/multidownloader/evm_multidownloader.go b/multidownloader/evm_multidownloader.go index 653cbffac..44b20a687 100644 --- a/multidownloader/evm_multidownloader.go +++ b/multidownloader/evm_multidownloader.go @@ -8,6 +8,7 @@ import ( "strconv" "strings" "sync" + "time" jRPC "github.com/0xPolygon/cdk-rpc/rpc" aggkitcommon "github.com/agglayer/aggkit/common" @@ -19,12 +20,14 @@ import ( "github.com/agglayer/aggkit/multidownloader/storage" mdrtypes "github.com/agglayer/aggkit/multidownloader/types" aggkittypes "github.com/agglayer/aggkit/types" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" ethrpc "github.com/ethereum/go-ethereum/rpc" ) const ( - safeMode = true + safeMode = mdrtypes.Finalized + unsafeMode = mdrtypes.NotFinalized chunkSizeReductionFactor = 10 minChunkSize = 1 ) @@ -38,15 +41,12 @@ type EVMMultidownloader struct { blockNotifierManager ethermantypes.BlockNotifierManager name string syncersConfig mdrtypes.SetSyncerConfig + reorgProcessor mdrtypes.ReorgProcessor - mutex sync.Mutex - isInitialized bool - // These are the segments that we need to sync - pendingSync *mdrtypes.SetSyncSegment - // These are the segments that we have already synced - // when a syncer does a `FilterLogs`, it is used to check what is already synced - syncedSegments mdrtypes.SetSyncSegment - statistics *Statistics + mutex sync.Mutex + state *State // current state of synced and pending segments if nil not initialized + + statistics *Statistics } var _ aggkittypes.MultiDownloader = (*EVMMultidownloader)(nil) @@ -59,6 +59,7 @@ func NewEVMMultidownloader(log aggkitcommon.Logger, rpcClient aggkittypes.RPCClienter, storageDB mdrtypes.Storager, blockNotifierManager ethermantypes.BlockNotifierManager, + reorgProcessor mdrtypes.ReorgProcessor, ) (*EVMMultidownloader, error) { if blockNotifierManager == nil { blockNotifierManager = ethermanblocknotifier.NewBlockNotifierManager(log, @@ -83,6 +84,11 @@ func NewEVMMultidownloader(log aggkitcommon.Logger, } } + if reorgProcessor == nil { + log.Infof("NewEVMMultidownloader: creating default ReorgProcessor for multidownloader (%s)", name) + reorgProcessor = NewReorgProcessor(log, ethClient, rpcClient, storageDB) + } + return &EVMMultidownloader{ log: log, ethClient: ethClient, @@ -93,6 +99,7 @@ func NewEVMMultidownloader(log aggkitcommon.Logger, syncersConfig: mdrtypes.NewSetSyncerConfig(), statistics: NewStatistics(), name: name, + reorgProcessor: reorgProcessor, }, nil } @@ -100,27 +107,90 @@ func (dh *EVMMultidownloader) RegisterSyncer(data aggkittypes.SyncerConfig) erro dh.mutex.Lock() defer dh.mutex.Unlock() - if dh.isInitialized { + if dh.isInitializedNoMutex() { return fmt.Errorf("registerSyncer: cannot add new syncer config after initialization") } + dh.syncersConfig.Add(data) return nil } -func (dh *EVMMultidownloader) Start(ctx context.Context) error { - err := dh.Initialize(ctx) +func (dh *EVMMultidownloader) MoveUnsafeToSafeIfPossible(ctx context.Context) error { + dh.mutex.Lock() + defer dh.mutex.Unlock() + + finalizedBlockNumber, err := dh.GetFinalizedBlockNumber(ctx) if err != nil { - return err + return fmt.Errorf("MoveUnsafeToSafeIfPossible: cannot get finalized block number: %w", err) } - err = dh.sync(ctx, dh.StepSafe, "safe") + committed := false + tx, err := dh.storage.NewTx(ctx) if err != nil { - return err + return fmt.Errorf("MoveUnsafeToSafeIfPossible: cannot create new tx: %w", err) + } + defer func() { + if !committed { + dh.log.Debugf("MoveUnsafeToSafeIfPossible: rolling back tx") + if err := tx.Rollback(); err != nil { + dh.log.Errorf("MoveUnsafeToSafeIfPossible: error rolling back tx: %v", err) + } + } + }() + + blocks, err := dh.storage.GetBlockHeadersNotFinalized(tx, finalizedBlockNumber) + if err != nil { + return fmt.Errorf("MoveUnsafeToSafeIfPossible: cannot get unsafe block bases: %w", err) + } + dh.log.Infof("MoveUnsafeToSafeIfPossible: finalizedBlockNumber=%d, "+ + "unsafe blocks to finalize=%d", finalizedBlockNumber, len(blocks)) + err = dh.detectReorgs(ctx, blocks) + if err != nil { + return fmt.Errorf("MoveUnsafeToSafeIfPossible: error detecting reorgs: %w", err) + } + err = dh.storage.UpdateBlockToFinalized(tx, blocks.BlockNumbers()) + if err != nil { + return fmt.Errorf("MoveUnsafeToSafeIfPossible: cannot update is_final for block bases: %w", err) + } + committed = true + if err := tx.Commit(); err != nil { + return fmt.Errorf("MoveUnsafeToSafeIfPossible: cannot commit tx: %w", err) } return nil } +func (dh *EVMMultidownloader) detectReorgs(ctx context.Context, + blocks aggkittypes.ListBlockHeaders) error { + // TODO: optimize this to don't check all blocks + // TODO: Find the first block to reorg + blocksNumber := blocks.BlockNumbers() + currentBlockHeaders, err := etherman.RetrieveBlockHeaders(ctx, dh.log, dh.ethClient, dh.rpcClient, + blocksNumber, dh.cfg.MaxParallelBlockHeaderRetrieval) + if err != nil { + return fmt.Errorf("detectReorgs: cannot retrieve block headers: %w", err) + } + // check blocks vs currentBlockHeaders. Must match by number and hash + storageBlocks := blocks.ToMap() + rpcBlocks := currentBlockHeaders.ToMap() + for _, number := range blocksNumber { + rpcBlock, exists := rpcBlocks[number] + if !exists { + return fmt.Errorf("detectReorgs: block number %d not found in RPC", number) + } + storageBlock, exists := storageBlocks[number] + if !exists { + return fmt.Errorf("detectReorgs: block number %d not found in storage", number) + } + if storageBlock.Hash != rpcBlock.Hash { + return mdrtypes.NewReorgError(storageBlock.Number, storageBlock.Hash, rpcBlock.Hash, + fmt.Sprintf("detectReorgs: reorg detected at block number %d: storage hash %s != rpc hash %s", + number, storageBlock.Hash.String(), rpcBlock.Hash.String())) + } + } + return nil +} + func (dh *EVMMultidownloader) GetRPCServices() []jRPC.Service { logger := log.WithFields("module", "multidownloader-rpc-"+dh.name) return []jRPC.Service{ @@ -156,14 +226,16 @@ func (dh *EVMMultidownloader) CheckDatabase(ctx context.Context) error { func (dh *EVMMultidownloader) Initialize(ctx context.Context) error { dh.mutex.Lock() defer dh.mutex.Unlock() - if dh.isInitialized { + if dh.isInitializedNoMutex() { return fmt.Errorf("initialize: already initialized") } + dh.log.Infof("Initializing multidownloader...") // Check DB compatibility err := dh.CheckDatabase(ctx) if err != nil { return err } + dh.log.Infof("Saving syncer configs to storage...") // Save syncer configs to storage; it overrides previous ones but keeps // the synced segments err = dh.storage.UpsertSyncerConfigs(nil, dh.syncersConfig.ContractConfigs()) @@ -185,16 +257,108 @@ func (dh *EVMMultidownloader) Initialize(ctx context.Context) error { if err != nil { return err } - // What is pending to download? - dh.pendingSync = syncSegments.Clone() - err = dh.pendingSync.SubtractSegments(&storageSyncSegments) + newState, err := NewStateFromStorageSyncedBlocks(storageSyncSegments, *syncSegments) if err != nil { - return fmt.Errorf("Initialize: cannot calculate pendingSync: %w", err) + return err } - dh.syncedSegments = storageSyncSegments - dh.isInitialized = true + // What is pending to download? + dh.state = newState + dh.log.Infof("Initialization completed. state: %s", + dh.state.String()) return nil } +func (dh *EVMMultidownloader) Start(ctx context.Context) error { + err := dh.Initialize(ctx) + if err != nil { + return err + } + for { + err = dh.StartStep(ctx) + if err != nil { + reorgErr := mdrtypes.CastReorgError(err) + if reorgErr == nil { + // TODO: Remove this panic and handle properly + panic("Error running multidownloader: " + err.Error()) + } + dh.log.Warnf("Reorg detected: %s", reorgErr.Error()) + err = dh.reorgProcessor.ProcessReorg(ctx, reorgErr.OffendingBlockNumber) + if err != nil { + panic("Error running multidownloader: " + err.Error()) + } + } + // Breathing, just in case + dh.log.Infof("relauncing sync loop... (waiting 1 second)") + time.Sleep(1 * time.Second) + } +} + +func (dh *EVMMultidownloader) StartStep(ctx context.Context) error { + dh.log.Infof("checking unsafe blocks on DB...") + var err error + if err = dh.MoveUnsafeToSafeIfPossible(ctx); err != nil { + return err + } + if err = dh.sync(ctx, dh.StepSafe, "safe"); err != nil { + return err + } + for { + dh.log.Infof("Unsafe sync iteration starting...") + if err = dh.sync(ctx, dh.StepUnsafe, "unsafe"); err != nil { + return err + } + dh.log.Infof("waiting new block...") + if err = dh.checkReorgUntilNewBlock(ctx); err != nil { + return err + } + } +} + +// This function check the tip of the chain to prevent any reorg, meanwhile +// wait for a new block to arrive +func (dh *EVMMultidownloader) checkReorgUntilNewBlock(ctx context.Context) error { + initialFinalizedBlockNumber, err := dh.GetFinalizedBlockNumber(ctx) + if err != nil { + return fmt.Errorf("checkReorgUntilNewBlock: cannot get finalized block number: %w", err) + } + lowestBlock, highestBlock, err := dh.storage.GetRangeBlockHeader(nil, mdrtypes.NotFinalized) + if err != nil { + return fmt.Errorf("checkReorgUntilNewBlock: cannot get highest unsafe block: %w", err) + } + if lowestBlock == nil || highestBlock == nil { + dh.log.Infof("checkReorgUntilNewBlock: no unsafe blocks to check for reorgs") + return nil + } + + for { + select { + case <-time.After(dh.cfg.PeriodToCheckReorgs.Duration): + if err := dh.detectReorgs(ctx, []*aggkittypes.BlockHeader{highestBlock}); err != nil { + return fmt.Errorf("checkReorgUntilNewBlock: cannot check reorg on tip block %d: %w", + highestBlock.Number, err) + } + if err := dh.state.UpdateTargetBlockToNumber(ctx, dh.blockNotifierManager); err != nil { + return fmt.Errorf("checkReorgUntilNewBlock: cannot update TargetToBlock in pendingSync: %w", err) + } + highestBlockPendingToSync := dh.state.GetHighestBlockNumberPendingToSync() + if highestBlockPendingToSync > highestBlock.Number { + dh.log.Infof("checkReorgUntilNewBlock: new block to sync (old: %d, new: %d), ", + highestBlock.Number, highestBlockPendingToSync) + return nil + } + finalizedBlockNumber, err := dh.GetFinalizedBlockNumber(ctx) + if err != nil { + return fmt.Errorf("checkReorgUntilNewBlock: cannot get finalized block number: %w", err) + } + if finalizedBlockNumber != initialFinalizedBlockNumber { + dh.log.Infof("checkReorgUntilNewBlock: finalized block advanced from %d to %d, re-checking reorgs", + initialFinalizedBlockNumber, finalizedBlockNumber) + return nil + } + case <-ctx.Done(): + return fmt.Errorf("checkReorgUntilNewBlock: context done: %w", ctx.Err()) + } + } +} // sync is an internal function that executes the given stepFunc until it returns done=true or error func (dh *EVMMultidownloader) sync(ctx context.Context, @@ -219,7 +383,6 @@ func (dh *EVMMultidownloader) sync(ctx context.Context, } dh.log.Infof("πŸŽ‰πŸŽ‰πŸŽ‰πŸŽ‰πŸŽ‰ sync %s completed after %d iterations.", name, iteration) dh.statistics.FinishSyncing() - dh.ShowStatistics(iteration) return nil } @@ -235,11 +398,171 @@ func getBlockNumbers(logs []types.Log) []uint64 { } return result } +func (dh *EVMMultidownloader) IsInitialized() bool { + dh.mutex.Lock() + defer dh.mutex.Unlock() + return dh.state != nil +} + +func (dh *EVMMultidownloader) isInitializedNoMutex() bool { + return dh.state != nil +} func (dh *EVMMultidownloader) IsAvailable(query mdrtypes.LogQuery) bool { dh.mutex.Lock() defer dh.mutex.Unlock() - return dh.syncedSegments.IsAvailable(query) + return dh.state.IsAvailable(query) +} + +// getTotalPendingBlockRange returns the full pending block range without taking in +// consideration addrs +func (dh *EVMMultidownloader) getTotalPendingBlockRange() *aggkitcommon.BlockRange { + dh.mutex.Lock() + defer dh.mutex.Unlock() + br := dh.state.GetTotalPendingBlockRange() + return br +} + +func (dh *EVMMultidownloader) getUnsafeLogQueries(blockHeaders []*aggkittypes.BlockHeader) []mdrtypes.LogQuery { + dh.mutex.Lock() + defer dh.mutex.Unlock() + logQueries := make([]mdrtypes.LogQuery, 0, len(blockHeaders)) + for _, bh := range blockHeaders { + logQueries = append(logQueries, mdrtypes.NewLogQueryBlockHash( + bh.Number, + bh.Hash, + dh.state.GetAddressesToSyncForBlockNumber(bh.Number), + )) + } + return logQueries +} + +func (dh *EVMMultidownloader) newState(queries []mdrtypes.LogQuery) (*State, error) { + dh.mutex.Lock() + state := dh.state.Clone() + dh.mutex.Unlock() + for _, logQueryData := range queries { + err := state.Synced.AddLogQuery(&logQueryData) + if err != nil { + return nil, fmt.Errorf("Safe/Step: cannot extend synced segments: %w", err) + } + err = state.Pending.SubtractLogQuery(&logQueryData) + if err != nil { + return nil, fmt.Errorf("Safe/Step: cannot subtract log query from pending segments: %w", err) + } + } + return state, nil +} +func getContracts(logQueries []mdrtypes.LogQuery) []common.Address { + addressMap := make(map[common.Address]struct{}) + for _, lq := range logQueries { + for _, addr := range lq.Addrs { + addressMap[addr] = struct{}{} + } + } + addresses := make([]common.Address, 0, len(addressMap)) + for addr := range addressMap { + addresses = append(addresses, addr) + } + return addresses +} + +func (dh *EVMMultidownloader) checkIntegrityNewLogsBlockHeaders(logs []types.Log, + blockHeaders aggkittypes.ListBlockHeaders) error { + blockMap := blockHeaders.ToMap() + for _, lg := range logs { + bh, exists := blockMap[lg.BlockNumber] + if !exists { + return fmt.Errorf("checkIntegrityNewLogsBlockHeaders: "+ + "block header for log block number %d not found", lg.BlockNumber) + } + if bh.Hash != lg.BlockHash { + return fmt.Errorf("checkIntegrityNewLogsBlockHeaders: "+ + "log block hash %s does not match block header hash %s for block number %d", + lg.BlockHash.String(), bh.Hash.String(), lg.BlockNumber) + } + } + return nil +} + +// TODO: ??? why I did this function?? +// TODO: remove +// func (dh *EVMMultidownloader) checkParent(blockHeader *aggkittypes.BlockHeader) error { +// if blockHeader.Number == 0 { +// return nil +// } +// parentHeader, isFinalized, err := dh.storage.GetBlockHeaderByNumber(nil, blockHeader.Number-1) +// if err != nil { +// return fmt.Errorf("checkParent: cannot get parent block header for block number %d: %w", blockHeader.Number, err) +// } +// if parentHeader == nil { +// return fmt.Errorf("checkParent: parent block header for block number %d not found in storage", +// blockHeader.Number-1) +// } +// // Parenthash (from DB) doesn't match parent Hash of first blockHeader, but parent is finalized +// // so the discrepancy is the new block that is discarded without reorg (still not in DB) +// if isFinalized && blockHeader.ParentHash != nil && parentHeader.Hash != *blockHeader.ParentHash { +// return fmt.Errorf("checkParent: "+ +// "parent hash mismatch for block number %d: expected %s, got %s (but parent is finalized)", +// blockHeader.Number, blockHeader.ParentHash.String(), parentHeader.Hash.String()) +// } +// if blockHeader.ParentHash != nil && parentHeader.Hash != *blockHeader.ParentHash { +// // Parenthash mismatch, reorg detected +// return mdrtypes.NewReorgError(parentHeader.Number, parentHeader.Hash, +// *blockHeader.ParentHash, fmt.Sprintf("checkParent: parent hash mismatch for block number %d: expected %s, got %s", +// blockHeader.Number, blockHeader.ParentHash.String(), parentHeader.Hash.String())) +// } +// return nil +// } + +func (dh *EVMMultidownloader) StepUnsafe(ctx context.Context) (bool, error) { + if err := ctx.Err(); err != nil { + return false, err + } + pendingBlockRange := dh.getTotalPendingBlockRange() + blocks := pendingBlockRange.ListBlockNumbers() + // TODO: Check that the blocks are all inside unsafe range + blockHeaders, err := etherman.RetrieveBlockHeaders(ctx, dh.log, dh.ethClient, dh.rpcClient, + blocks, dh.cfg.MaxParallelBlockHeaderRetrieval) + if err != nil { + return false, fmt.Errorf("Unsafe/Step: failed to retrieve %s block headers: %w", pendingBlockRange.String(), err) + } + dh.log.Debugf("Unsafe/Step: querying logs for %s", pendingBlockRange.String()) + logQueries := dh.getUnsafeLogQueries(blockHeaders) + logs, err := dh.requestMultiplesLogs(ctx, logQueries) + if err != nil { + return false, fmt.Errorf("Unsafe/Step: failed to retrieve logs for %s: %w", pendingBlockRange.String(), err) + } + if err = dh.checkIntegrityNewLogsBlockHeaders(logs, blockHeaders); err != nil { + return false, err + } + newState, err := dh.newState(logQueries) + if err != nil { + return false, fmt.Errorf("Unsafe/Step: failed to create new state: %w", err) + } + updatedSegments := newState.Synced.SegmentsByContract(getContracts(logQueries)) + // Store data in storage + dh.log.Debugf("Unsafe/Step: storing data for %s", pendingBlockRange.String()) + err = dh.storeData(ctx, logs, blockHeaders, + updatedSegments, unsafeMode) + if err != nil { + return false, fmt.Errorf("Safe/Step: cannot store data: %w", err) + } + + dh.mutex.Lock() + defer dh.mutex.Unlock() + dh.log.Debugf("Unsafe/Step: updating state in memory %s", pendingBlockRange.String()) + dh.state = newState + finished := dh.state.IsSyncFinished() + totalBlocksPendingToSync := dh.state.TotalBlocksPendingToSync() + dh.log.Infof("Unsafe/Step: elapsed=%s finished br=%s logs=%d blocksHeaders=%d pendingBlocks=%d ETA=%s ", + dh.statistics.ElapsedSyncing().String(), + pendingBlockRange.String(), + len(logs), + len(blockHeaders), + totalBlocksPendingToSync, + dh.statistics.ETA(totalBlocksPendingToSync)) + return finished, nil } // StepSafe performs a safe step syncing logs and block headers from historical data @@ -269,43 +592,39 @@ func (dh *EVMMultidownloader) StepSafe(ctx context.Context) (bool, error) { // Calculate new state (not set in memory until commit is successful) dh.mutex.Lock() - newSyncedSegments := dh.syncedSegments.Clone() - newPendingSegments := dh.pendingSync.Clone() + newState := dh.state.Clone() dh.mutex.Unlock() // Update synced segments - err = newSyncedSegments.AddLogQuery(logQueryData) - if err != nil { - return false, fmt.Errorf("Safe/Step: cannot extend synced segments: %w", err) - } - // from pending blocks remove current query - err = newPendingSegments.SubtractLogQuery(logQueryData) + err = newState.OnNewSyncedLogQuery(logQueryData) if err != nil { - return false, fmt.Errorf("Safe/Step: cannot subtract log query from pending segments: %w", err) + return false, fmt.Errorf("Safe/Step: fails OnNewSyncedLogQuery(%s): %w", + logQueryData.String(), err) } + // Update ToBlock in pending segments to be able to calculate if finished - err = newPendingSegments.UpdateTargetBlockToNumber(ctx, dh.blockNotifierManager) + err = newState.UpdateTargetBlockToNumber(ctx, dh.blockNotifierManager) if err != nil { return false, fmt.Errorf("Safe/Step: cannot update ToBlock in pendingSync: %w", err) } // Store data in storage err = dh.storeData(ctx, logs, blockHeaders, - newSyncedSegments.SegmentsByContract(logQueryData.Addrs), true) + newState.SyncedSegmentsByContract(logQueryData.Addrs), true) if err != nil { return false, fmt.Errorf("Safe/Step: cannot store data: %w", err) } // Update in-memory synced segments (after valid commit) dh.mutex.Lock() defer dh.mutex.Unlock() - dh.syncedSegments = *newSyncedSegments - dh.pendingSync = newPendingSegments - finished := dh.pendingSync.Finished() + dh.state = newState + finished := dh.state.IsSyncFinished() + totalBlocksPendingToSync := dh.state.TotalBlocksPendingToSync() dh.log.Infof("Safe/Step: elapsed=%s finished br=%s logs=%d blocksHeaders=%d pendingBlocks=%d ETA=%s ", dh.statistics.ElapsedSyncing().String(), logQueryData.BlockRange.String(), len(logs), len(blockHeaders), - dh.pendingSync.TotalBlocks(), - dh.statistics.ETA(dh.pendingSync.TotalBlocks())) + totalBlocksPendingToSync, + dh.statistics.ETA(totalBlocksPendingToSync)) return finished, nil } func (dh *EVMMultidownloader) storeData( @@ -400,7 +719,16 @@ func extractSuggestedBlockRangeFromErrorMsg(msg string) *aggkitcommon.BlockRange return nil } -func (dh *EVMMultidownloader) getFinalizedBlockNumber(ctx context.Context) (uint64, error) { +func (dh *EVMMultidownloader) GetLatestBlockNumber(ctx context.Context) (uint64, error) { + bn, err := dh.blockNotifierManager.GetCurrentBlockNumber(ctx, aggkittypes.LatestBlock) + if err != nil { + return 0, fmt.Errorf("GetLatestBlockNumber: cannot get latest block (%s): %w", + aggkittypes.LatestBlock.String(), err) + } + return bn, nil +} + +func (dh *EVMMultidownloader) GetFinalizedBlockNumber(ctx context.Context) (uint64, error) { bn, err := dh.blockNotifierManager.GetCurrentBlockNumber(ctx, dh.cfg.BlockFinality) if err != nil { return 0, fmt.Errorf("Safe/Step: cannot get finalized block (%s): %w", @@ -415,20 +743,41 @@ func (dh *EVMMultidownloader) getNextQuery(ctx context.Context, chunk uint32, sa var err error var maxBlock uint64 if safe { - maxBlock, err = dh.getFinalizedBlockNumber(ctx) + maxBlock, err = dh.GetFinalizedBlockNumber(ctx) if err != nil { return nil, fmt.Errorf("getNextQuery: cannot get finalized block number: %w", err) } } else { maxBlock = 0 } - logQueryData, err := dh.pendingSync.NextQuery(chunk, maxBlock) + logQueryData, err := dh.state.NextQueryToSync(chunk, maxBlock) if err != nil { return nil, fmt.Errorf("getNextQuery: cannot get NextQuery: %w", err) } return logQueryData, nil } +func (dh *EVMMultidownloader) requestMultiplesLogs( + ctx context.Context, + queries []mdrtypes.LogQuery) ([]types.Log, error) { + var allLogs []types.Log + for _, query := range queries { + dh.log.Debugf("request: querying logs for blockHash=%s", query.String()) + if err := ctx.Err(); err != nil { + return nil, fmt.Errorf("requestMultiplesLogs: context error: %w", err) + } + logs, err := dh.requestLogsSingleTry(ctx, &query) + if err != nil { + return nil, fmt.Errorf("requestMultiplesLogs: ethClient.FilterLogs(%v) failed: %w", + query.String(), err) + } + dh.log.Debugf("request: successfully queried logs for blockHash=%s: returned %d logs", + query.String(), len(logs)) + allLogs = append(allLogs, logs...) + } + return allLogs, nil +} + func (dh *EVMMultidownloader) requestLogs( ctx context.Context) ([]types.Log, *mdrtypes.LogQuery, error) { currentSyncBlockChunkSize := dh.cfg.BlockChunkSize diff --git a/multidownloader/evm_multidownloader_rpc.go b/multidownloader/evm_multidownloader_rpc.go index 5753bd23f..4d8492688 100644 --- a/multidownloader/evm_multidownloader_rpc.go +++ b/multidownloader/evm_multidownloader_rpc.go @@ -1,6 +1,8 @@ package multidownloader import ( + "context" + "github.com/0xPolygon/cdk-rpc/rpc" aggkitcommon "github.com/agglayer/aggkit/common" ) @@ -22,12 +24,32 @@ func NewEVMMultidownloaderRPC( // Status returns the status of the L1InfoTreeSync component // curl -X POST http://localhost:5576/ "Content-Type: application/json" \ -// -d '{"method":"l1infotreesync_status", "params":[], "id":1}' +// -d '{"method":"multidownloader-l1_status", "params":[], "id":1}' func (b *EVMMultidownloaderRPC) Status() (interface{}, rpc.Error) { + finalizedBlockNumber, err := b.downloader.GetFinalizedBlockNumber(context.Background()) + if err != nil { + return nil, rpc.NewRPCError(rpc.DefaultErrorCode, + "EVMMultidownloaderRPC.Status: getting finalized block number: %v", err) + } + latestBlockNumber, err := b.downloader.GetLatestBlockNumber(context.Background()) + if err != nil { + return nil, rpc.NewRPCError(rpc.DefaultErrorCode, + "EVMMultidownloaderRPC.Status: getting latest block number: %v", err) + } + b.downloader.mutex.Lock() + defer b.downloader.mutex.Unlock() + info := struct { - Status string `json:"status"` + Status string `json:"status"` + State string `json:"state,omitempty"` + Pending string `json:"pending,omitempty"` + FinalizedBlockNumber uint64 `json:"finalizedBlockNumber,omitempty"` + LatestBlockNumber uint64 `json:"latestBlockNumber,omitempty"` }{ - Status: "running", + Status: "running", + State: b.downloader.state.String(), + FinalizedBlockNumber: finalizedBlockNumber, + LatestBlockNumber: latestBlockNumber, } return info, nil } diff --git a/multidownloader/evm_multidownloader_rpc_test.go b/multidownloader/evm_multidownloader_rpc_test.go index a84983731..66baae50b 100644 --- a/multidownloader/evm_multidownloader_rpc_test.go +++ b/multidownloader/evm_multidownloader_rpc_test.go @@ -1,9 +1,11 @@ package multidownloader import ( + "fmt" "testing" "github.com/agglayer/aggkit/log" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) @@ -20,17 +22,16 @@ func TestNewEVMMultidownloaderRPC(t *testing.T) { func TestEVMMultidownloaderRPC_Status(t *testing.T) { logger := log.WithFields("module", "test") - downloader := &EVMMultidownloader{} - rpcService := NewEVMMultidownloaderRPC(logger, downloader) + testData := newEVMMultidownloaderTestData(t, false) + testData.mdr.state = NewEmptyState() + testData.mockBlockNotifierManager.EXPECT().GetCurrentBlockNumber(mock.Anything, + mock.Anything).Return(uint64(100), nil) + rpcService := NewEVMMultidownloaderRPC(logger, testData.mdr) result, err := rpcService.Status() require.Nil(t, err) require.NotNil(t, result) - statusInfo, ok := result.(struct { - Status string `json:"status"` - }) - require.True(t, ok) - require.Equal(t, "running", statusInfo.Status) + require.Contains(t, fmt.Sprintf("%+v", result), "Status") } diff --git a/multidownloader/evm_multidownloader_syncers.go b/multidownloader/evm_multidownloader_syncers.go index 76349cf49..a2c671296 100644 --- a/multidownloader/evm_multidownloader_syncers.go +++ b/multidownloader/evm_multidownloader_syncers.go @@ -45,6 +45,9 @@ func (dh *EVMMultidownloader) BlockHeader(ctx context.Context, // FilterLogs filters the logs. It gets them from storage or waits until they are available func (dh *EVMMultidownloader) FilterLogs(ctx context.Context, query ethereum.FilterQuery) ([]types.Log, error) { + if !dh.IsInitialized() { + return nil, fmt.Errorf("EVMMultidownloader.FilterLogs: multidownloader not initialized") + } dh.log.Debugf("EVMMultidownloader.FilterLogs: received query: %+v", query) defer dh.log.Debugf("EVMMultidownloader.FilterLogs: finished query: %+v", query) logQuery := mdrtypes.NewLogQueryFromEthereumFilter(query) diff --git a/multidownloader/evm_multidownloader_syncers_test.go b/multidownloader/evm_multidownloader_syncers_test.go index e8b5a453b..143eabdb7 100644 --- a/multidownloader/evm_multidownloader_syncers_test.go +++ b/multidownloader/evm_multidownloader_syncers_test.go @@ -167,7 +167,7 @@ func TestEVMMultidownloader_FilterLogs(t *testing.T) { t.Run("FilterLogs context canceled waiting to catch up", func(t *testing.T) { // Setup testData := newEVMMultidownloaderTestData(t, true) - + testData.FakeInitialized(t) query := ethereum.FilterQuery{ Addresses: []common.Address{addr1}, FromBlock: big.NewInt(100), @@ -195,6 +195,9 @@ func TestEVMMultidownloader_FilterLogs(t *testing.T) { ToBlock: aggkittypes.LatestBlock, }) require.NoError(t, err) + testData.MockInitialize(t, 1) + err = testData.mdr.Initialize(t.Context()) + require.NoError(t, err) query := ethereum.FilterQuery{ Addresses: []common.Address{addr1}, @@ -203,7 +206,7 @@ func TestEVMMultidownloader_FilterLogs(t *testing.T) { } mdQuery := mdrtypes.NewLogQueryFromEthereumFilter(query) // It updated the syncedSegments with the new one to be available - err = testData.mdr.syncedSegments.AddLogQuery(&mdQuery) + err = testData.mdr.state.OnNewSyncedLogQuery(&mdQuery) require.NoError(t, err) testData.mockStorage.EXPECT().GetEthLogs(mock.Anything, mock.Anything). Return(nil, errStorageExample) diff --git a/multidownloader/evm_multidownloader_test.go b/multidownloader/evm_multidownloader_test.go index e3849a163..d2bf48d74 100644 --- a/multidownloader/evm_multidownloader_test.go +++ b/multidownloader/evm_multidownloader_test.go @@ -3,13 +3,16 @@ package multidownloader import ( "context" "fmt" + "math/big" "os" "sync" "testing" "time" + jRPC "github.com/0xPolygon/cdk-rpc/rpc" aggkitcommon "github.com/agglayer/aggkit/common" "github.com/agglayer/aggkit/config/types" + "github.com/agglayer/aggkit/db" "github.com/agglayer/aggkit/etherman" mockethermantypes "github.com/agglayer/aggkit/etherman/types/mocks" "github.com/agglayer/aggkit/l1infotreesync" @@ -31,9 +34,10 @@ import ( const runL1InfoTree = true const l1InfoTreeUseMultidownloader = true +const storagePath = "../tmp/ut/" func TestEVMMultidownloader(t *testing.T) { - t.Skip("code to test/debug not real unittest") + // t.Skip("code to test/debug not real unittest") cfgLog := log.Config{ Environment: "development", Level: "info", @@ -54,7 +58,7 @@ func TestEVMMultidownloader(t *testing.T) { logger := log.WithFields("test", "test") db, err := storage.NewMultidownloaderStorage(logger, storage.MultidownloaderStorageConfig{ - DBPath: "/tmp/mdr_test.sqlite", + DBPath: storagePath + "mdr_test.sqlite", }) require.NoError(t, err) cfg := Config{ @@ -62,10 +66,12 @@ func TestEVMMultidownloader(t *testing.T) { MaxParallelBlockHeaderRetrieval: 50, BlockFinality: aggkittypes.FinalizedBlock, WaitPeriodToCheckCatchUp: types.NewDuration(time.Second), + PeriodToCheckReorgs: types.NewDuration(time.Second * 10), } + var rpcServices []jRPC.Service mdr, err := NewEVMMultidownloader(logger, cfg, "l1", ethClient, ethRPCClient, - db, nil) + db, nil, nil) require.NoError(t, err) require.NotNil(t, mdr) err = mdr.RegisterSyncer(aggkittypes.SyncerConfig{ @@ -78,6 +84,7 @@ func TestEVMMultidownloader(t *testing.T) { ToBlock: aggkittypes.LatestBlock, }) require.NoError(t, err) + rpcServices = append(rpcServices, mdr.GetRPCServices()...) ctx := context.TODO() var l1infotree *l1infotreesync.L1InfoTreeSync if runL1InfoTree == true { @@ -85,13 +92,13 @@ func TestEVMMultidownloader(t *testing.T) { var dbPath string if l1InfoTreeUseMultidownloader { multidownloader = mdr - dbPath = "/tmp/l1infotree_md.sqlite" + dbPath = storagePath + "l1infotree_md.sqlite" } else { multidownloader = aggkitsync.NewAdapterEthClientToMultidownloader(ethClient) - dbPath = "/tmp/l1infotree_eth.sqlite" + dbPath = storagePath + "l1infotree_eth.sqlite" } reorgDetector, err := reorgdetector.New(ethClient, reorgdetector.Config{ - DBPath: "/tmp/l1_reorgdetector.sqlite", + DBPath: storagePath + "l1_reorgdetector.sqlite", CheckReorgsInterval: types.NewDuration(time.Second * 10), FinalizedBlock: aggkittypes.FinalizedBlock, }, reorgdetector.L1) @@ -112,9 +119,29 @@ func TestEVMMultidownloader(t *testing.T) { }, multidownloader, reorgDetector, - l1infotreesync.FlagStopOnFinalizedBlockReached, + // l1infotreesync.FlagStopOnFinalizedBlockReached, + l1infotreesync.FlagNone, ) require.NoError(t, err) + rpcServices = append(rpcServices, l1infotree.GetRPCServices()...) + } + if len(rpcServices) > 0 { + log.Infof("Registering %d RPC services", len(rpcServices)) + logger := log.WithFields("module", "RPC") + jRPCServer := jRPC.NewServer( + jRPC.Config{ + Host: "127.0.0.1", + Port: 5576, + MaxRequestsPerIPAndSecond: 10000.0, + }, + rpcServices, + jRPC.WithLogger(logger.GetSugaredLogger()), + ) + go func() { + if err := jRPCServer.Start(); err != nil { + log.Fatal(err) + } + }() } var wg sync.WaitGroup @@ -232,7 +259,7 @@ func getBlockHeader(bn uint64, headers []*aggkittypes.BlockHeader) *aggkittypes. func TestEVMMultidownloader_NewEVMMultidownloader(t *testing.T) { logger := log.WithFields("test", "evm_multidownloader_test") cfg := NewConfigDefault("test.sqlite", t.TempDir()) - sut, err := NewEVMMultidownloader(logger, cfg, "test", nil, nil, nil, nil) + sut, err := NewEVMMultidownloader(logger, cfg, "test", nil, nil, nil, nil, nil) require.NoError(t, err) require.NotNil(t, sut) require.NotNil(t, sut.blockNotifierManager) @@ -306,7 +333,7 @@ func TestEVMMultidownloader_GetRPCServices(t *testing.T) { require.NoError(t, err) customName := "custom-name" - mdr, err := NewEVMMultidownloader(logger, cfg, customName, ethClient, nil, db, nil) + mdr, err := NewEVMMultidownloader(logger, cfg, customName, ethClient, nil, db, nil, nil) require.NoError(t, err) services := mdr.GetRPCServices() @@ -428,6 +455,26 @@ type testDataEVMMultidownloader struct { mockBlockNotifierManager *mockethermantypes.BlockNotifierManager } +func (td *testDataEVMMultidownloader) FakeInitialized(t *testing.T) { + t.Helper() + td.mdr.state = NewEmptyState() +} + +func (td *testDataEVMMultidownloader) MockInitialize(t *testing.T, chainID uint64) { + t.Helper() + chainIDBig := big.NewInt(0).SetUint64(chainID) + td.mockEthClient.EXPECT().ChainID(mock.Anything).Return(chainIDBig, nil).Maybe() + if td.mockStorage != nil { + td.mockStorage.EXPECT().GetValue(mock.Anything, mock.Anything, mock.Anything).Return("", db.ErrNotFound).Maybe() + td.mockStorage.EXPECT().InsertValue(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + td.mockStorage.EXPECT().UpsertSyncerConfigs(mock.Anything, mock.Anything).Return(nil).Maybe() + td.mockStorage.EXPECT().GetSyncedBlockRangePerContract(mock.Anything).Return(mdrtypes.NewSetSyncSegment(), nil).Maybe() + } + if td.mockBlockNotifierManager != nil { + td.mockBlockNotifierManager.EXPECT().GetCurrentBlockNumber(mock.Anything, mock.Anything).Return(uint64(200), nil).Maybe() + } +} + func newEVMMultidownloaderTestData(t *testing.T, mockStorage bool) *testDataEVMMultidownloader { t.Helper() logger := log.WithFields("test", "evm_multidownloader_test") @@ -453,8 +500,7 @@ func newEVMMultidownloaderTestData(t *testing.T, mockStorage bool) *testDataEVMM require.NoError(t, err) useDB = realDB } - // TODO: Add mock for ethRPCClient if needed - mdr, err := NewEVMMultidownloader(logger, cfg, "test", ethClient, nil, useDB, mockBlockNotifierManager) + mdr, err := NewEVMMultidownloader(logger, cfg, "test", ethClient, nil, useDB, mockBlockNotifierManager, nil) require.NoError(t, err) return &testDataEVMMultidownloader{ mockEthClient: ethClient, diff --git a/multidownloader/reorg_processor.go b/multidownloader/reorg_processor.go new file mode 100644 index 000000000..4464ebd19 --- /dev/null +++ b/multidownloader/reorg_processor.go @@ -0,0 +1,150 @@ +package multidownloader + +import ( + "context" + "fmt" + "time" + + aggkitcommon "github.com/agglayer/aggkit/common" + dbtypes "github.com/agglayer/aggkit/db/types" + mdtypes "github.com/agglayer/aggkit/multidownloader/types" + aggkittypes "github.com/agglayer/aggkit/types" +) + +type ReorgPorter interface { + NewTx(ctx context.Context) (dbtypes.Txer, error) + GetBlockStorageAndRPC(ctx context.Context, tx dbtypes.Querier, blockNumber uint64) (*compareBlockHeaders, error) + GetLastBlockNumberInStorage(tx dbtypes.Querier) (uint64, error) + // Return ChainID of the inserted reorg + MoveReorgedBlocks(tx dbtypes.Querier, reorgData mdtypes.ReorgData) (uint64, error) + // Return latest block number in RPC + GetLatestBlockNumberInRPC(ctx context.Context) (uint64, error) +} + +type ReorgProcessor struct { + log aggkitcommon.Logger + port ReorgPorter + funcNow func() uint64 +} + +func NewReorgProcessor(log aggkitcommon.Logger, + ethClient aggkittypes.BaseEthereumClienter, + rpcClient aggkittypes.RPCClienter, + storage mdtypes.Storager) *ReorgProcessor { + return &ReorgProcessor{ + log: log, + port: &ReorgPort{ + ethClient: ethClient, + rpcClient: rpcClient, + storage: storage, + }, + funcNow: func() uint64 { + return uint64(time.Now().Unix()) + }, + } +} + +// After detecting a reorg at offendingBlockNumber, +// - find affected blocks +// - store the reorg info in storage +func (rm *ReorgProcessor) ProcessReorg(ctx context.Context, + offendingBlockNumber uint64) error { + // We known that offendingBlockNumber is affected, so we go backwards until we find + // the first unaffected block + currentBlockNumber := offendingBlockNumber + tx, err := rm.port.NewTx(ctx) + if err != nil { + return fmt.Errorf("ProcessReorg: error starting new tx: %w", err) + } + committed := false + defer func() { + if !committed { + rm.log.Debugf("ProcessReorg: rolling back tx") + if err := tx.Rollback(); err != nil { + rm.log.Errorf("ProcessReorg: error rolling back tx: %v", err) + } + } + }() + + firstUnaffectedBlock, err := rm.findFirstUnaffectedBlock(ctx, tx, currentBlockNumber-1) + if err != nil { + return fmt.Errorf("ProcessReorg: error finding first unaffected block: %w", err) + } + lastBlockNumberInStorage, err := rm.port.GetLastBlockNumberInStorage(tx) + if err != nil { + return fmt.Errorf("ProcessReorg: error getting last block number in storage: %w", err) + } + latestBlockNumberInRPC, err := rm.port.GetLatestBlockNumberInRPC(ctx) + if err != nil { + return fmt.Errorf("ProcessReorg: error getting latest block number in RPC: %w", err) + } + rm.log.Infof("ProcessReorg: reorg detected from block %d to block %d", + currentBlockNumber+1, lastBlockNumberInStorage) + + reorgData := mdtypes.ReorgData{ + BlockRangeAffected: aggkitcommon.NewBlockRange(firstUnaffectedBlock+1, lastBlockNumberInStorage), + DetectedAtBlock: lastBlockNumberInStorage, + DetectedTimestamp: rm.funcNow(), + NetworkLatestBlock: latestBlockNumberInRPC, + NetworkFinalizedBlock: firstUnaffectedBlock, + NetworkFinalizedBlockName: aggkittypes.FinalizedBlock, + } + chainID, err := rm.port.MoveReorgedBlocks(tx, reorgData) + if err != nil { + return fmt.Errorf("ProcessReorg: error moving reorged blocks: %w", err) + } + reorgData.ChainID = chainID + committed = true + if err := tx.Commit(); err != nil { + return fmt.Errorf("ProcessReorg: cannot commit tx: %w", err) + } + rm.log.Warnf("ProcessReorg: finalized reorgProcess: %s", reorgData.String()) + return nil +} + +func (rm *ReorgProcessor) findFirstUnaffectedBlock(ctx context.Context, + tx dbtypes.Querier, + startBlockNumber uint64) (uint64, error) { + currentBlockNumber := startBlockNumber + for { + if currentBlockNumber == 0 { + // Genesis block reached, stop here + return 0, fmt.Errorf("findFirstUnaffectedBlock: genesis block reached while checking reorgs, "+ + "cannot find unaffected block. First block checked: %d", startBlockNumber) + } + data, err := rm.port.GetBlockStorageAndRPC(ctx, tx, currentBlockNumber) + if err != nil { + return 0, err + } + match, err := rm.checkBlocks(data) + if err != nil { + return 0, err + } + if match { + // Found the first unaffected block + return currentBlockNumber, nil + } + currentBlockNumber-- + } +} + +// checkBlocks compares storage and rpc block headers and returns true if they match +func (rm *ReorgProcessor) checkBlocks(blocks *compareBlockHeaders) (bool, error) { + if blocks == nil || blocks.StorageHeader == nil || blocks.RpcHeader == nil { + // Block not in storage, so it is a reorg + return false, fmt.Errorf("checkBlocks bad input data (nil)") + } + if blocks.StorageHeader.Number != blocks.RpcHeader.Number { + return false, fmt.Errorf("checkBlocks block numbers do not match: storage=%d rpc=%d", + blocks.StorageHeader.Number, blocks.RpcHeader.Number) + } + // This is a sanity check, never have to happen because we trust in finalized blocks! + if blocks.StorageHeader.Hash != blocks.RpcHeader.Hash { + if blocks.IsFinalized == mdtypes.Finalized { + rm.log.Warnf("checkBlocks: block %d is finalized and mismatch hash %s!=%s", blocks.StorageHeader.Number, + blocks.StorageHeader.Hash.Hex(), blocks.RpcHeader.Hash.Hex()) + } + return false, nil + } + return true, nil +} diff --git a/multidownloader/reorg_processor_port.go b/multidownloader/reorg_processor_port.go new file mode 100644 index 000000000..accaeb6bb --- /dev/null +++ b/multidownloader/reorg_processor_port.go @@ -0,0 +1,66 @@ +package multidownloader + +import ( + "context" + "fmt" + + dbtypes "github.com/agglayer/aggkit/db/types" + mdtypes "github.com/agglayer/aggkit/multidownloader/types" + aggkittypes "github.com/agglayer/aggkit/types" +) + +type compareBlockHeaders struct { + StorageHeader *aggkittypes.BlockHeader + IsFinalized mdtypes.FinalizedType + RpcHeader *aggkittypes.BlockHeader +} + +type ReorgPort struct { + ethClient aggkittypes.BaseEthereumClienter + rpcClient aggkittypes.RPCClienter + storage mdtypes.Storager +} + +func (r *ReorgPort) NewTx(ctx context.Context) (dbtypes.Txer, error) { + return r.storage.NewTx(ctx) +} + +func (r *ReorgPort) GetBlockStorageAndRPC(ctx context.Context, tx dbtypes.Querier, + blockNumber uint64) (*compareBlockHeaders, error) { + currentStorageBlock, finalized, err := r.storage.GetBlockHeaderByNumber(tx, blockNumber) + if err != nil { + return nil, err + } + rpcBlock, err := r.ethClient.CustomHeaderByNumber(ctx, aggkittypes.NewBlockNumber(blockNumber)) + if err != nil { + return nil, err + } + return &compareBlockHeaders{ + StorageHeader: currentStorageBlock, + IsFinalized: finalized, + RpcHeader: rpcBlock, + }, nil +} + +func (r *ReorgPort) GetLastBlockNumberInStorage(tx dbtypes.Querier) (uint64, error) { + highestBlock, _, err := r.storage.GetRangeBlockHeader(nil, mdtypes.NotFinalized) + if err != nil { + return 0, fmt.Errorf("GetLastBlockNumberInStorage: error getting highest block from storage: %w", err) + } + if highestBlock == nil { + return 0, fmt.Errorf("GetLastBlockNumberInStorage: error getting highest block (=nil) from storage") + } + return highestBlock.Number, nil +} + +func (r *ReorgPort) MoveReorgedBlocks(tx dbtypes.Querier, reorgData mdtypes.ReorgData) (uint64, error) { + return r.storage.InsertReorgAndMoveReorgedBlocksAndLogs(tx, reorgData) +} + +func (r *ReorgPort) GetLatestBlockNumberInRPC(ctx context.Context) (uint64, error) { + latestBlockNumber, err := r.ethClient.BlockNumber(ctx) + if err != nil { + return 0, fmt.Errorf("GetLatestBlockNumber: error getting latest block number from RPC: %w", err) + } + return latestBlockNumber, nil +} diff --git a/multidownloader/state.go b/multidownloader/state.go new file mode 100644 index 000000000..eaa45f8c8 --- /dev/null +++ b/multidownloader/state.go @@ -0,0 +1,100 @@ +package multidownloader + +import ( + "context" + "fmt" + + aggkitcommon "github.com/agglayer/aggkit/common" + "github.com/agglayer/aggkit/etherman/types" + mdrtypes "github.com/agglayer/aggkit/multidownloader/types" + "github.com/ethereum/go-ethereum/common" +) + +type State struct { + // These are the segments that we have already synced + // when a syncer does a `FilterLogs`, it is used to check what is already synced + Synced mdrtypes.SetSyncSegment + // These are the segments that we need to sync + Pending mdrtypes.SetSyncSegment +} + +func NewEmptyState() *State { + return &State{ + Synced: mdrtypes.NewSetSyncSegment(), + Pending: mdrtypes.NewSetSyncSegment(), + } +} + +func NewState(synced *mdrtypes.SetSyncSegment, pending *mdrtypes.SetSyncSegment) *State { + return &State{ + Synced: *synced, + Pending: *pending, + } +} + +func NewStateFromStorageSyncedBlocks(storageSynced mdrtypes.SetSyncSegment, + totalToSync mdrtypes.SetSyncSegment) (*State, error) { + err := totalToSync.SubtractSegments(&storageSynced) + if err != nil { + return nil, fmt.Errorf("Initialize: cannot calculate pendingSync: %w", err) + } + return NewState(&storageSynced, &totalToSync), nil +} + +func (s *State) Clone() *State { + return &State{ + Synced: s.Synced, + Pending: s.Pending, + } +} +func (s *State) String() string { + return "State{Synced: " + s.Synced.String() + + ", Pending: " + s.Pending.String() + "}" +} + +func (s *State) UpdateTargetBlockToNumber(ctx context.Context, blockNotifier types.BlockNotifierManager) error { + return s.Pending.UpdateTargetBlockToNumber(ctx, blockNotifier) +} + +func (s *State) GetHighestBlockNumberPendingToSync() uint64 { + return s.Pending.GetHighestBlockNumber() +} + +func (s *State) IsAvailable(query mdrtypes.LogQuery) bool { + return s.Synced.IsAvailable(query) +} + +func (s *State) GetTotalPendingBlockRange() *aggkitcommon.BlockRange { + return s.Pending.GetTotalPendingBlockRange() +} + +func (s *State) GetAddressesToSyncForBlockNumber(blockNumber uint64) []common.Address { + return s.Pending.GetAddressesForBlock(blockNumber) +} +func (s *State) IsSyncFinished() bool { + return s.Pending.Finished() +} + +func (s *State) TotalBlocksPendingToSync() uint64 { + return s.Pending.TotalBlocks() +} + +func (s *State) OnNewSyncedLogQuery(logQuery *mdrtypes.LogQuery) error { + err := s.Synced.AddLogQuery(logQuery) + if err != nil { + return fmt.Errorf("OnNewSyncedLogQuery: addding syned segment: %w", err) + } + err = s.Pending.SubtractLogQuery(logQuery) + if err != nil { + return fmt.Errorf("OnNewSyncedLogQuery: subtracting pending segment: %w", err) + } + return nil +} + +func (s *State) SyncedSegmentsByContract(addrs []common.Address) []mdrtypes.SyncSegment { + return s.Synced.SegmentsByContract(addrs) +} + +func (s *State) NextQueryToSync(syncBlockChunkSize uint32, maxBlockNumber uint64) (*mdrtypes.LogQuery, error) { + return s.Pending.NextQuery(syncBlockChunkSize, maxBlockNumber) +} diff --git a/multidownloader/state_test.go b/multidownloader/state_test.go new file mode 100644 index 000000000..10a54f179 --- /dev/null +++ b/multidownloader/state_test.go @@ -0,0 +1,46 @@ +package multidownloader + +import ( + "testing" + + aggkitcommon "github.com/agglayer/aggkit/common" + mdtypes "github.com/agglayer/aggkit/multidownloader/types" + aggkittypes "github.com/agglayer/aggkit/types" + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/require" +) + +func TestStateInitial(t *testing.T) { + addr1 := common.HexToAddress("0x10") + addr2 := common.HexToAddress("0x20") + storageData := mdtypes.NewSetSyncSegment() + storageData.Add(mdtypes.NewSyncSegment(addr1, + aggkitcommon.BlockRangeZero, aggkittypes.FinalizedBlock, + false)) + storageData.Add(mdtypes.NewSyncSegment(addr2, + aggkitcommon.BlockRangeZero, aggkittypes.LatestBlock, + false)) + configData := mdtypes.NewSetSyncSegment() + segment1 := mdtypes.NewSyncSegment(addr1, + aggkitcommon.NewBlockRange(0, 1000), aggkittypes.FinalizedBlock, + false) + segment2 := mdtypes.NewSyncSegment(addr2, + aggkitcommon.NewBlockRange(0, 2000), aggkittypes.LatestBlock, + false) + configData.Add(segment1) + configData.Add(segment2) + + state, err := NewStateFromStorageSyncedBlocks(storageData, configData) + require.NoError(t, err) + require.NotNil(t, state) + logQuery := mdtypes.NewLogQuery( + 1, 456, []common.Address{addr1}) + + err = state.OnNewSyncedLogQuery(&logQuery) + require.NoError(t, err) + pendingSegments := state.SyncedSegmentsByContract([]common.Address{addr1}) + require.Equal(t, 1, len(pendingSegments)) + require.Equal(t, addr1, pendingSegments[0].ContractAddr) + require.Equal(t, aggkitcommon.NewBlockRange(0, 456), pendingSegments[0].BlockRange) + require.Equal(t, aggkittypes.FinalizedBlock, pendingSegments[0].TargetToBlock) +} diff --git a/multidownloader/storage/migrations/0002.sql b/multidownloader/storage/migrations/0002.sql new file mode 100644 index 000000000..db16b81c0 --- /dev/null +++ b/multidownloader/storage/migrations/0002.sql @@ -0,0 +1,39 @@ +-- +migrate Down +DROP TABLE IF EXISTS logs_reorged; +-- +migrate Up + +CREATE TABLE logs_reorged ( + chain_id BIGINT NOT NULL , + block_number BIGINT NOT NULL , + address TEXT NOT NULL, -- + topics TEXT NOT NULL, -- list of hashes in JSON + data BLOB, -- + tx_hash TEXT NOT NULL, + tx_index INTEGER NOT NULL, + log_index INTEGER NOT NULL, -- β€œindex” is a reserved keyword + PRIMARY KEY (address, chain_id,block_number, log_index), + FOREIGN KEY (chain_id, block_number) REFERENCES blocks_reorged(chain_id, block_number) +); + +CREATE INDEX idx_logs_reorged_block_number ON logs_reorged(block_number); + +CREATE TABLE blocks_reorged ( + chain_id BIGINT NOT NULL REFERENCES reorgs(chain_id), + block_number BIGINT NOT NULL, + block_hash TEXT NOT NULL, + block_timestamp INTEGER NOT NULL, + block_parent_hash TEXT NOT NULL, + PRIMARY KEY (chain_id, block_number) +); + +CREATE TABLE reorgs ( + chain_id BIGINT PRIMARY KEY, + detected_at_block BIGINT NOT NULL, + reorged_from_block BIGINT NOT NULL, + reorged_to_block BIGINT NOT NULL, + detected_timestamp INTEGER NOT NULL, + network_latest_block INTEGER NOT NULL, -- which was the latest block in the detection moment + network_finalized_block INTEGER NOT NULL, -- which was the finalized block in the detection moment + network_finalized_block_name TEXT NOT NULL, -- name of the finalized block (e.g., "finalized", "safe", etc.) + description TEXT -- extran information, can be null +); \ No newline at end of file diff --git a/multidownloader/storage/migrations/migrations.go b/multidownloader/storage/migrations/migrations.go index 679c8ffde..dab2e080d 100644 --- a/multidownloader/storage/migrations/migrations.go +++ b/multidownloader/storage/migrations/migrations.go @@ -12,11 +12,18 @@ import ( //go:embed 0001.sql var mig001 string +//go:embed 0002.sql +var mig002 string + var Migrations = []types.Migration{ { ID: "0001", SQL: mig001, }, + { + ID: "0002", + SQL: mig002, + }, } func RunMigrations(logger aggkitcommon.Logger, database *sql.DB) error { diff --git a/multidownloader/storage/storage.go b/multidownloader/storage/storage.go index b38fc78e8..e650cb4c6 100644 --- a/multidownloader/storage/storage.go +++ b/multidownloader/storage/storage.go @@ -60,15 +60,6 @@ func NewLogRowsFromEthLogs(logs []types.Log) []*logRow { return rows } -type syncStatusRow struct { - Address common.Address `meddler:"contract_address,address"` - TargetFromBlock uint64 `meddler:"target_from_block"` - TargetToBlock string `meddler:"target_to_block"` - SyncedFromBlock uint64 `meddler:"synced_from_block"` - SyncedToBlock uint64 `meddler:"synced_to_block"` - SyncersIDs string `meddler:"syncers_id"` -} - func NewLogRowFromEthLog(log types.Log) *logRow { topicsJSON, err := json.Marshal(log.Topics) if err != nil { @@ -142,7 +133,7 @@ func NewBlockRowsFromLogs(logs []types.Log, isFinal bool) map[uint64]*blockRow { return blockMap } -func NewBlockRowsFromAggkitBlock(blockHeaders []*aggkittypes.BlockHeader, isFinal bool) map[uint64]*blockRow { +func NewBlockRowsFromAggkitBlock(blockHeaders aggkittypes.ListBlockHeaders, isFinal bool) map[uint64]*blockRow { blockMap := make(map[uint64]*blockRow) for _, header := range blockHeaders { blockMap[header.Number] = newBlockRowFromAggkitBlock(header, isFinal) @@ -247,7 +238,7 @@ func (a *MultidownloaderStorage) SaveEthLogs(tx dbtypes.Querier, logs []types.Lo } func (a *MultidownloaderStorage) SaveEthLogsWithHeaders(tx dbtypes.Querier, - blockHeaders []*aggkittypes.BlockHeader, logs []types.Log, isFinal bool) error { + blockHeaders aggkittypes.ListBlockHeaders, logs []types.Log, isFinal bool) error { return a.saveLogsAndBlocks(tx, NewBlockRowsFromAggkitBlock(blockHeaders, isFinal), NewLogRowsFromEthLogs(logs)) } @@ -293,104 +284,3 @@ func (a *MultidownloaderStorage) saveLogsNoMutex(tx dbtypes.Querier, logRows []* } return nil } - -func (r *syncStatusRow) ToSyncSegment() (mdrtypes.SyncSegment, error) { - targetToBlock, err := aggkittypes.NewBlockNumberFinality(r.TargetToBlock) - if err != nil { - return mdrtypes.SyncSegment{}, fmt.Errorf("ToSyncSegment: error parsing target to block finality (%s): %w", - r.TargetToBlock, err) - } - return mdrtypes.SyncSegment{ - ContractAddr: r.Address, - TargetToBlock: *targetToBlock, - BlockRange: aggkitcommon.NewBlockRange(r.SyncedFromBlock, r.SyncedToBlock), - }, nil -} - -func (a *MultidownloaderStorage) GetSyncedBlockRangePerContract(tx dbtypes.Querier) (mdrtypes.SetSyncSegment, error) { - a.mutex.RLock() - defer a.mutex.RUnlock() - result := make([]*syncStatusRow, 0) - if tx == nil { - tx = a.db - } - err := meddler.QueryAll(tx, &result, "SELECT * FROM sync_status") - if err != nil { - return mdrtypes.SetSyncSegment{}, fmt.Errorf("error querying sync status: %w", err) - } - setSegments := mdrtypes.NewSetSyncSegment() - for _, row := range result { - segment, err := row.ToSyncSegment() - if err != nil { - return mdrtypes.SetSyncSegment{}, - fmt.Errorf("GetSyncedBlockRangePerContract: error converting row to sync segment: %w", err) - } - setSegments.Add(segment) - } - return setSegments, nil -} - -func (a *MultidownloaderStorage) UpdateSyncedStatus(tx dbtypes.Querier, - segments []mdrtypes.SyncSegment) error { - if tx == nil { - tx = a.db - } - query := ` - UPDATE sync_status SET - synced_from_block = ?, - synced_to_block = ? - WHERE contract_address = ?; - ` - a.mutex.Lock() - defer a.mutex.Unlock() - for _, segment := range segments { - result, err := tx.Exec(query, segment.BlockRange.FromBlock, - segment.BlockRange.ToBlock, segment.ContractAddr.Hex()) - if err != nil { - return fmt.Errorf("error updating %s sync status: %w", segment.String(), err) - } - rowsAffected, err := result.RowsAffected() - if err != nil { - return fmt.Errorf("error getting rows affected for contract %s: %w", - segment.ContractAddr.Hex(), err) - } - if rowsAffected == 0 { - return fmt.Errorf("no rows updated for contract %s", segment.ContractAddr.Hex()) - } - } - return nil -} - -func (a *MultidownloaderStorage) UpsertSyncerConfigs(tx dbtypes.Querier, configs []mdrtypes.ContractConfig) error { - if tx == nil { - tx = a.db - } - a.mutex.Lock() - defer a.mutex.Unlock() - for _, config := range configs { - row := syncStatusRow{ - Address: config.Address, - TargetFromBlock: config.FromBlock, - TargetToBlock: config.ToBlock.String(), - SyncedFromBlock: 0, - SyncedToBlock: 0, - SyncersIDs: fmt.Sprintf("%v", config.Syncers), - } - // Upsert logic - query := ` - INSERT INTO sync_status (contract_address, target_from_block, - target_to_block, synced_from_block, synced_to_block, syncers_id) - VALUES (?, ?, ?, ?, ?, ?) - ON CONFLICT(contract_address) DO UPDATE SET - target_from_block = excluded.target_from_block, - target_to_block = excluded.target_to_block, - syncers_id = excluded.syncers_id - ` - _, err := tx.Exec(query, row.Address.Hex(), row.TargetFromBlock, row.TargetToBlock, - row.SyncedFromBlock, row.SyncedToBlock, row.SyncersIDs) - if err != nil { - return fmt.Errorf("error updating sync status: %w", err) - } - } - return nil -} diff --git a/multidownloader/storage/storage_block.go b/multidownloader/storage/storage_block.go index 352ce504a..cf6293872 100644 --- a/multidownloader/storage/storage_block.go +++ b/multidownloader/storage/storage_block.go @@ -6,6 +6,7 @@ import ( "fmt" dbtypes "github.com/agglayer/aggkit/db/types" + mdtypes "github.com/agglayer/aggkit/multidownloader/types" aggkittypes "github.com/agglayer/aggkit/types" "github.com/jmoiron/sqlx" "github.com/russross/meddler" @@ -40,8 +41,8 @@ func (b *Blocks) Get(number uint64) (*aggkittypes.BlockHeader, bool, error) { return header, isFinal, nil } -func (b *Blocks) ListHeaders() []*aggkittypes.BlockHeader { - headers := make([]*aggkittypes.BlockHeader, 0, len(b.Headers)) +func (b *Blocks) ListHeaders() aggkittypes.ListBlockHeaders { + headers := aggkittypes.NewListBlockHeadersEmpty(len(b.Headers)) for _, header := range b.Headers { headers = append(headers, header) } @@ -51,8 +52,11 @@ func (b *Blocks) ListHeaders() []*aggkittypes.BlockHeader { func (b *Blocks) IsEmpty() bool { return len(b.Headers) == 0 } +func (b *Blocks) Len() int { + return len(b.Headers) +} -func (a *MultidownloaderStorage) saveAggkitBlock(tx dbtypes.Querier, +func (a *MultidownloaderStorage) saveAggkitBlock(tx dbtypes.Querier, //nolint:unparam header *aggkittypes.BlockHeader, isFinal bool) error { blockRows := map[uint64]*blockRow{ header.Number: newBlockRowFromAggkitBlock(header, isFinal), @@ -62,7 +66,10 @@ func (a *MultidownloaderStorage) saveAggkitBlock(tx dbtypes.Querier, return a.saveBlocksNoMutex(tx, blockRows) } -func (a *MultidownloaderStorage) updateIsFinal(tx dbtypes.Querier, blockNumbers []uint64) error { +func (a *MultidownloaderStorage) UpdateBlockToFinalized(tx dbtypes.Querier, blockNumbers []uint64) error { + if len(blockNumbers) == 0 { + return nil + } if tx == nil { tx = a.db } @@ -81,11 +88,39 @@ func (a *MultidownloaderStorage) updateIsFinal(tx dbtypes.Querier, blockNumbers } return nil } -func (a *MultidownloaderStorage) GetBlockHeaderByNumber(tx dbtypes.Querier, - blockNumber uint64) (*aggkittypes.BlockHeader, bool, error) { - if tx == nil { - tx = a.db + +// GetRangeBlockHeader retrieves the highest block header stored in the database +// return lowest and highest block headers +func (a *MultidownloaderStorage) GetRangeBlockHeader(tx dbtypes.Querier, + isFinal mdtypes.FinalizedType) (*aggkittypes.BlockHeader, *aggkittypes.BlockHeader, error) { + highestBlock, err := a.getBlockHeadersNoMutex(tx, "SELECT * FROM blocks "+ + "WHERE is_final=? order by block_number DESC LIMIT 1", isFinal) + if err != nil { + return nil, nil, fmt.Errorf("GetRangeBlockHeader:highest: %w", err) + } + if highestBlock.IsEmpty() { + return nil, nil, nil + } + if highestBlock.Len() > 1 { + return nil, nil, fmt.Errorf("GetRangeBlockHeader:highest: more than one block returned (%d)", highestBlock.Len()) + } + + lowestBlock, err := a.getBlockHeadersNoMutex(tx, "SELECT * FROM blocks WHERE is_final=? "+ + "order by block_number DESC LIMIT 1", isFinal) + if err != nil { + return nil, nil, fmt.Errorf("GetRangeBlockHeader:highest: %w", err) + } + if lowestBlock.IsEmpty() { + return nil, nil, nil } + if lowestBlock.Len() > 1 { + return nil, nil, fmt.Errorf("GetRangeBlockHeader:lowest: more than one block returned (%d)", lowestBlock.Len()) + } + return highestBlock.ListHeaders()[0], lowestBlock.ListHeaders()[0], nil +} + +func (a *MultidownloaderStorage) GetBlockHeaderByNumber(tx dbtypes.Querier, + blockNumber uint64) (*aggkittypes.BlockHeader, mdtypes.FinalizedType, error) { a.mutex.RLock() defer a.mutex.RUnlock() blocks, err := a.getBlockHeadersNoMutex(tx, "SELECT * FROM blocks WHERE block_number = ?", blockNumber) @@ -128,3 +163,18 @@ func (a *MultidownloaderStorage) getBlockHeadersNoMutex(tx dbtypes.Querier, } return result, nil } + +// GetBlockHeadersNotFinalized retrieves all block headers that are not finalized <= maxBlock +func (a *MultidownloaderStorage) GetBlockHeadersNotFinalized(tx dbtypes.Querier, + maxBlock uint64) (aggkittypes.ListBlockHeaders, error) { + if tx == nil { + tx = a.db + } + a.mutex.RLock() + defer a.mutex.RUnlock() + blocks, err := a.getBlockHeadersNoMutex(tx, "SELECT * FROM blocks WHERE is_final = 0 AND block_number <= ?", maxBlock) + if err != nil { + return nil, err + } + return blocks.ListHeaders(), nil +} diff --git a/multidownloader/storage/storage_block_test.go b/multidownloader/storage/storage_block_test.go new file mode 100644 index 000000000..7083f66ff --- /dev/null +++ b/multidownloader/storage/storage_block_test.go @@ -0,0 +1,50 @@ +package storage + +import ( + "testing" + + mdtypes "github.com/agglayer/aggkit/multidownloader/types" + aggkittypes "github.com/agglayer/aggkit/types" + "github.com/stretchr/testify/require" +) + +func TestStorage_GetBlock(t *testing.T) { + storage := newStorageForTest(t, nil) + // BlockBase not present + blockHeader, _, err := storage.GetBlockHeaderByNumber(nil, 1234) + require.NoError(t, err, "cannot get BlockHeader") + require.Nil(t, blockHeader, "expected nil BlockHeader") + block := aggkittypes.NewBlockHeader(1234, exampleTestHash[0], 5678, &exampleTestHash[1]) + err = storage.saveAggkitBlock(nil, block, true) + require.NoError(t, err, "cannot insert BlockHeader") + // Get and verify block + readBlock, isFinal, err := storage.GetBlockHeaderByNumber(nil, 1234) + require.NoError(t, err, "cannot get BlockHeader") + require.NotNil(t, readBlock, "expected non-nil BlockHeader") + require.Equal(t, block, readBlock, "BlockHeader mismatch") + require.True(t, isFinal, "expected block to be final") + + blockNilParentHash := aggkittypes.NewBlockHeader(1235, exampleTestHash[0], 5678, nil) + err = storage.saveAggkitBlock(nil, blockNilParentHash, true) + require.NoError(t, err, "cannot get BlockHeader") + readBlock, _, err = storage.GetBlockHeaderByNumber(nil, blockNilParentHash.Number) + require.NoError(t, err, "cannot get BlockHeader") + require.Equal(t, blockNilParentHash, readBlock, "BlockHeader mismatch") +} + +func TestStorage_GetRangeBlockHeader(t *testing.T) { + storage := newStorageForTest(t, nil) + block := aggkittypes.NewBlockHeader(4000, exampleTestHash[5], 1630002000, nil) + err := storage.saveAggkitBlock(nil, block, mdtypes.NotFinalized) + require.NoError(t, err, "cannot insert BlockHeader") + + lowest, highest, err := storage.GetRangeBlockHeader(nil, mdtypes.NotFinalized) + require.NoError(t, err, "cannot get range BlockHeader") + require.Equal(t, block, lowest, "lowest BlockHeader mismatch") + require.Equal(t, block, highest, "highest BlockHeader mismatch") + + lowest, highest, err = storage.GetRangeBlockHeader(nil, mdtypes.Finalized) + require.NoError(t, err, "cannot get range BlockHeader") + require.True(t, lowest.Empty(), "lowest BlockHeader mismatch") + require.True(t, highest.Empty(), "highest BlockHeader mismatch") +} diff --git a/multidownloader/storage/storage_reorg.go b/multidownloader/storage/storage_reorg.go new file mode 100644 index 000000000..522803f78 --- /dev/null +++ b/multidownloader/storage/storage_reorg.go @@ -0,0 +1,96 @@ +package storage + +import ( + "fmt" + + aggkitcommon "github.com/agglayer/aggkit/common" + dbtypes "github.com/agglayer/aggkit/db/types" + mdrtypes "github.com/agglayer/aggkit/multidownloader/types" + "github.com/russross/meddler" +) + +type reorgRow struct { + ChainID uint64 `meddler:"chain_id"` + DetectedAtBlock uint64 `meddler:"detected_at_block"` + ReorgedFromBlock uint64 `meddler:"reorged_from_block"` + ReorgedToBlock uint64 `meddler:"reorged_to_block"` + DetectedTimestamp uint64 `meddler:"detected_timestamp"` + NetworkLatestBlock uint64 `meddler:"network_latest_block"` + NetworkFinalizedBlock uint64 `meddler:"network_finalized_block"` + NetworkFinalizedBlockName string `meddler:"network_finalized_block_name"` +} + +func newReorgRowFromReorgData(reorgData mdrtypes.ReorgData) *reorgRow { + return &reorgRow{ + ChainID: reorgData.ChainID, + DetectedAtBlock: reorgData.DetectedAtBlock, + ReorgedFromBlock: reorgData.BlockRangeAffected.FromBlock, + ReorgedToBlock: reorgData.BlockRangeAffected.ToBlock, + DetectedTimestamp: reorgData.DetectedTimestamp, + NetworkLatestBlock: reorgData.NetworkLatestBlock, + NetworkFinalizedBlock: reorgData.NetworkFinalizedBlock, + NetworkFinalizedBlockName: reorgData.NetworkFinalizedBlockName.String(), + } +} + +// returns ChainID of the inserted reorg +func (a *MultidownloaderStorage) InsertReorgAndMoveReorgedBlocksAndLogs(tx dbtypes.Querier, + reorgData mdrtypes.ReorgData) (uint64, error) { + if tx == nil { + return 0, fmt.Errorf("InsertNewReorg: require a tx because it done multiples operations") + } + reorgRow := newReorgRowFromReorgData(reorgData) + a.mutex.Lock() + defer a.mutex.Unlock() + // Get Next ChainID from storage using rowid + lastChainID := struct { + ChainID *uint64 `meddler:"chain_id"` + }{} + err := meddler.QueryRow(tx, &lastChainID, "SELECT MAX(chain_id) as chain_id FROM reorgs") + if err != nil { + return 0, fmt.Errorf("InsertNewReorg: error getting last chain_id: %w", err) + } + if lastChainID.ChainID == nil { + reorgRow.ChainID = 1 + } else { + reorgRow.ChainID = *lastChainID.ChainID + 1 + } + + if err := meddler.Insert(tx, "reorgs", reorgRow); err != nil { + return 0, fmt.Errorf("InsertNewReorg: error inserting reorgs (%s): %w", reorgData.String(), err) + } + if err := a.moveReorgedBlocksAndLogsNoMutex(tx, reorgRow.ChainID, + reorgData.BlockRangeAffected); err != nil { + return 0, fmt.Errorf("InsertNewReorg: error moving reorged blocks to block_reorged: %w", err) + } + return reorgRow.ChainID, nil +} + +func (a *MultidownloaderStorage) moveReorgedBlocksAndLogsNoMutex(tx dbtypes.Querier, chainID uint64, + blockRangeAffected aggkitcommon.BlockRange) error { + a.logger.Debugf("storage: moving blocks to blocks_reorged - chain_id: %d, range: %s", + chainID, blockRangeAffected.String()) + query := `INSERT INTO blocks_reorged (chain_id, block_number, block_hash,block_parent_hash, block_timestamp) + SELECT ?, block_number, block_hash, block_parent_hash, block_timestamp + FROM blocks + WHERE block_number >= ? AND block_number <= ?; + INSERT INTO logs_reorged (chain_id, block_number, address,topics, data, tx_hash, tx_index, log_index) + SELECT ?, block_number, address, topics, data, tx_hash, tx_index, log_index + FROM logs + WHERE block_number >= ? AND block_number <= ?; + DELETE FROM logs + WHERE block_number >= ? AND block_number <= ?; + DELETE FROM blocks + WHERE block_number >= ? AND block_number <= ?;` + _, err := tx.Exec(query, + chainID, + blockRangeAffected.FromBlock, blockRangeAffected.ToBlock, + chainID, + blockRangeAffected.FromBlock, blockRangeAffected.ToBlock, + blockRangeAffected.FromBlock, blockRangeAffected.ToBlock, + blockRangeAffected.FromBlock, blockRangeAffected.ToBlock) + if err != nil { + return fmt.Errorf("moveReorgedBlocks: error moving reorged blocks to block_reorged: %w", err) + } + return nil +} diff --git a/multidownloader/storage/storage_reorg_test.go b/multidownloader/storage/storage_reorg_test.go new file mode 100644 index 000000000..e2510855a --- /dev/null +++ b/multidownloader/storage/storage_reorg_test.go @@ -0,0 +1,67 @@ +package storage + +import ( + "testing" + + aggkitcommon "github.com/agglayer/aggkit/common" + mdrtypes "github.com/agglayer/aggkit/multidownloader/types" + aggkittypes "github.com/agglayer/aggkit/types" + "github.com/stretchr/testify/require" +) + +func TestStorage_InsertNewReorg(t *testing.T) { + storage := newStorageForTest(t, nil) + reorgData := mdrtypes.ReorgData{ + ChainID: 1, + BlockRangeAffected: aggkitcommon.NewBlockRange(5000, 5010), + DetectedAtBlock: 5020, + DetectedTimestamp: 1630003000, + NetworkLatestBlock: 6000, + NetworkFinalizedBlock: 5990, + NetworkFinalizedBlockName: aggkittypes.FinalizedBlock, + } + tx, err := storage.NewTx(t.Context()) + require.NoError(t, err, "cannot start new transaction") + chainID, err := storage.InsertReorgAndMoveReorgedBlocksAndLogs(tx, reorgData) + require.NoError(t, err, "cannot insert new reorg") + require.Equal(t, uint64(1), chainID, "first chain ID must be 1") + err = tx.Commit() + require.NoError(t, err, "cannot commit transaction") + + tx, err = storage.NewTx(t.Context()) + require.NoError(t, err, "cannot start new transaction") + chainID, err = storage.InsertReorgAndMoveReorgedBlocksAndLogs(tx, reorgData) + require.NoError(t, err, "cannot insert new reorg") + require.Equal(t, uint64(2), chainID, "second chain ID must be 2") + err = tx.Commit() + require.NoError(t, err, "cannot commit transaction") +} + +func TestStorage_InsertNewReorgAndMoveBlocks(t *testing.T) { + storage := newStorageForTest(t, nil) + populateLogsAndBlocksForTest(t, storage, + 5000, 20, 5) + + reorgData := mdrtypes.ReorgData{ + ChainID: 0, // will be set by InsertNewReorg + BlockRangeAffected: aggkitcommon.NewBlockRange(5005, 5015), + DetectedAtBlock: 5020, + DetectedTimestamp: 1630003000, + NetworkLatestBlock: 6000, + NetworkFinalizedBlock: 5990, + NetworkFinalizedBlockName: aggkittypes.FinalizedBlock, + } + tx, err := storage.NewTx(t.Context()) + require.NoError(t, err, "cannot start new transaction") + chainID, err := storage.InsertReorgAndMoveReorgedBlocksAndLogs(tx, reorgData) + require.NoError(t, err, "cannot insert new reorg") + require.Equal(t, uint64(1), chainID, "first chain ID must be 1") + err = tx.Commit() + require.NoError(t, err, "cannot commit transaction") + // Now check that blocks from 5005 to 5015 are in block_reorged + for i := uint64(5005); i <= 5015; i++ { + hdr, _, err := storage.GetBlockHeaderByNumber(nil, i) + require.NoError(t, err) + require.Nil(t, hdr, "block header should not be in blocks table anymore") + } +} diff --git a/multidownloader/storage/storage_sync.go b/multidownloader/storage/storage_sync.go new file mode 100644 index 000000000..29fb45e1f --- /dev/null +++ b/multidownloader/storage/storage_sync.go @@ -0,0 +1,122 @@ +package storage + +import ( + "fmt" + + aggkitcommon "github.com/agglayer/aggkit/common" + dbtypes "github.com/agglayer/aggkit/db/types" + mdrtypes "github.com/agglayer/aggkit/multidownloader/types" + aggkittypes "github.com/agglayer/aggkit/types" + "github.com/ethereum/go-ethereum/common" + "github.com/russross/meddler" +) + +type syncStatusRow struct { + Address common.Address `meddler:"contract_address,address"` + TargetFromBlock uint64 `meddler:"target_from_block"` + TargetToBlock string `meddler:"target_to_block"` + SyncedFromBlock uint64 `meddler:"synced_from_block"` + SyncedToBlock uint64 `meddler:"synced_to_block"` + SyncersIDs string `meddler:"syncers_id"` +} + +func (r *syncStatusRow) ToSyncSegment() (mdrtypes.SyncSegment, error) { + targetToBlock, err := aggkittypes.NewBlockNumberFinality(r.TargetToBlock) + if err != nil { + return mdrtypes.SyncSegment{}, fmt.Errorf("ToSyncSegment: error parsing target to block finality (%s): %w", + r.TargetToBlock, err) + } + return mdrtypes.SyncSegment{ + ContractAddr: r.Address, + TargetToBlock: *targetToBlock, + BlockRange: aggkitcommon.NewBlockRange(r.SyncedFromBlock, r.SyncedToBlock), + }, nil +} + +func (a *MultidownloaderStorage) GetSyncedBlockRangePerContract(tx dbtypes.Querier) (mdrtypes.SetSyncSegment, error) { + a.mutex.RLock() + defer a.mutex.RUnlock() + result := make([]*syncStatusRow, 0) + if tx == nil { + tx = a.db + } + err := meddler.QueryAll(tx, &result, "SELECT * FROM sync_status") + if err != nil { + return mdrtypes.SetSyncSegment{}, fmt.Errorf("error querying sync status: %w", err) + } + setSegments := mdrtypes.NewSetSyncSegment() + for _, row := range result { + segment, err := row.ToSyncSegment() + if err != nil { + return mdrtypes.SetSyncSegment{}, + fmt.Errorf("GetSyncedBlockRangePerContract: error converting row to sync segment: %w", err) + } + setSegments.Add(segment) + } + return setSegments, nil +} + +func (a *MultidownloaderStorage) UpdateSyncedStatus(tx dbtypes.Querier, + segments []mdrtypes.SyncSegment) error { + if tx == nil { + tx = a.db + } + query := ` + UPDATE sync_status SET + synced_from_block = ?, + synced_to_block = ? + WHERE contract_address = ?; + ` + a.mutex.Lock() + defer a.mutex.Unlock() + for _, segment := range segments { + result, err := tx.Exec(query, segment.BlockRange.FromBlock, + segment.BlockRange.ToBlock, segment.ContractAddr.Hex()) + if err != nil { + return fmt.Errorf("error updating %s sync status: %w", segment.String(), err) + } + rowsAffected, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("error getting rows affected for contract %s: %w", + segment.ContractAddr.Hex(), err) + } + if rowsAffected == 0 { + return fmt.Errorf("no rows updated for contract %s", segment.ContractAddr.Hex()) + } + } + return nil +} + +func (a *MultidownloaderStorage) UpsertSyncerConfigs(tx dbtypes.Querier, configs []mdrtypes.ContractConfig) error { + if tx == nil { + tx = a.db + } + a.mutex.Lock() + defer a.mutex.Unlock() + for _, config := range configs { + row := syncStatusRow{ + Address: config.Address, + TargetFromBlock: config.FromBlock, + TargetToBlock: config.ToBlock.String(), + SyncedFromBlock: 0, + SyncedToBlock: 0, + SyncersIDs: fmt.Sprintf("%v", config.Syncers), + } + // Upsert logic + query := ` + INSERT INTO sync_status (contract_address, target_from_block, + target_to_block, synced_from_block, synced_to_block, syncers_id) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT(contract_address) DO UPDATE SET + target_from_block = excluded.target_from_block, + target_to_block = excluded.target_to_block, + syncers_id = excluded.syncers_id + ` + _, err := tx.Exec(query, row.Address.Hex(), row.TargetFromBlock, row.TargetToBlock, + row.SyncedFromBlock, row.SyncedToBlock, row.SyncersIDs) + if err != nil { + return fmt.Errorf("error updating sync status: %w", err) + } + } + return nil +} diff --git a/multidownloader/storage/storage_sync_test.go b/multidownloader/storage/storage_sync_test.go new file mode 100644 index 000000000..3d9334cac --- /dev/null +++ b/multidownloader/storage/storage_sync_test.go @@ -0,0 +1,113 @@ +package storage + +import ( + "testing" + + aggkitcommon "github.com/agglayer/aggkit/common" + mdrtypes "github.com/agglayer/aggkit/multidownloader/types" + aggkittypes "github.com/agglayer/aggkit/types" + "github.com/stretchr/testify/require" +) + +func TestStorage_GetSyncedBlockRangePerContract(t *testing.T) { + storage := newStorageForTest(t, nil) + data, err := storage.GetSyncedBlockRangePerContract(nil) + require.NoError(t, err) + require.Equal(t, "SetSyncSegment: ", data.String()) +} + +func TestStorage_UpsertSyncerConfigs(t *testing.T) { + storage := newStorageForTest(t, nil) + configs := []mdrtypes.ContractConfig{ + { + Address: exampleAddr1, + FromBlock: 1000, + ToBlock: aggkittypes.FinalizedBlock, + }, + { + Address: exampleAddr2, + FromBlock: 2000, + ToBlock: aggkittypes.LatestBlock, + }, + } + err := storage.UpsertSyncerConfigs(nil, configs) + require.NoError(t, err) + + // Upsert again with different start block + configsUpdated := []mdrtypes.ContractConfig{ + { + Address: exampleAddr1, + FromBlock: 1300, + ToBlock: aggkittypes.FinalizedBlock, + }, + { + Address: exampleAddr2, + FromBlock: 1600, + ToBlock: aggkittypes.FinalizedBlock, + }, + } + err = storage.UpsertSyncerConfigs(nil, configsUpdated) + require.NoError(t, err) + + syncSegments, err := storage.GetSyncedBlockRangePerContract(nil) + require.NoError(t, err) + require.Equal(t, 2, len(syncSegments.GetAddressesForBlockRange( + aggkitcommon.NewBlockRange(0, 10000), + ))) + seg1, exists := syncSegments.GetByContract(exampleAddr1) + require.True(t, exists) + require.Equal(t, aggkittypes.FinalizedBlock, seg1.TargetToBlock) + require.Equal(t, aggkitcommon.BlockRangeZero, seg1.BlockRange) + + seg2, exists := syncSegments.GetByContract(exampleAddr2) + require.True(t, exists) + require.Equal(t, aggkittypes.FinalizedBlock, seg2.TargetToBlock) +} + +func TestStorage_UpdateSyncedStatus(t *testing.T) { + storage := newStorageForTest(t, nil) + segments := []mdrtypes.SyncSegment{ + mdrtypes.NewSyncSegment( + exampleAddr1, + aggkitcommon.NewBlockRange(1000, 2000), + aggkittypes.FinalizedBlock, + true, + ), + mdrtypes.NewSyncSegment( + exampleAddr2, + aggkitcommon.NewBlockRange(1500, 2500), + aggkittypes.LatestBlock, + false, + ), + } + err := storage.UpsertSyncerConfigs(nil, []mdrtypes.ContractConfig{ + { + Address: exampleAddr1, + FromBlock: 1000, + ToBlock: aggkittypes.FinalizedBlock, + }, + { + Address: exampleAddr2, + FromBlock: 1500, + ToBlock: aggkittypes.LatestBlock, + }, + }) + require.NoError(t, err) + err = storage.UpdateSyncedStatus(nil, segments) + require.NoError(t, err) + + syncedSegments, err := storage.GetSyncedBlockRangePerContract(nil) + require.NoError(t, err) + require.Equal(t, 2, len(syncedSegments.GetAddressesForBlockRange( + aggkitcommon.NewBlockRange(0, 3000), + ))) + seg1, exists := syncedSegments.GetByContract(exampleAddr1) + require.True(t, exists) + require.Equal(t, aggkitcommon.NewBlockRange(1000, 2000), seg1.BlockRange) + require.Equal(t, aggkittypes.FinalizedBlock, seg1.TargetToBlock) + + seg2, exists := syncedSegments.GetByContract(exampleAddr2) + require.True(t, exists) + require.Equal(t, aggkitcommon.NewBlockRange(1500, 2500), seg2.BlockRange) + require.Equal(t, aggkittypes.LatestBlock, seg2.TargetToBlock) +} diff --git a/multidownloader/storage/storage_test.go b/multidownloader/storage/storage_test.go index 08256c9e9..7fae3f2ca 100644 --- a/multidownloader/storage/storage_test.go +++ b/multidownloader/storage/storage_test.go @@ -4,7 +4,6 @@ import ( "path" "testing" - aggkitcommon "github.com/agglayer/aggkit/common" "github.com/agglayer/aggkit/log" mdrtypes "github.com/agglayer/aggkit/multidownloader/types" aggkittypes "github.com/agglayer/aggkit/types" @@ -48,30 +47,6 @@ func TestStorage_Exploratory(t *testing.T) { log.Infof("Retrieved block: %+v", block) } -func TestStorage_GetBlock(t *testing.T) { - storage := newStorageForTest(t, nil) - // BlockBase not present - blockHeader, _, err := storage.GetBlockHeaderByNumber(nil, 1234) - require.NoError(t, err, "cannot get BlockHeader") - require.Nil(t, blockHeader, "expected nil BlockHeader") - block := aggkittypes.NewBlockHeader(1234, exampleTestHash[0], 5678, &exampleTestHash[1]) - err = storage.saveAggkitBlock(nil, block, true) - require.NoError(t, err, "cannot insert BlockHeader") - // Get and verify block - readBlock, isFinal, err := storage.GetBlockHeaderByNumber(nil, 1234) - require.NoError(t, err, "cannot get BlockHeader") - require.NotNil(t, readBlock, "expected non-nil BlockHeader") - require.Equal(t, block, readBlock, "BlockHeader mismatch") - require.True(t, isFinal, "expected block to be final") - - blockNilParentHash := aggkittypes.NewBlockHeader(1235, exampleTestHash[0], 5678, nil) - err = storage.saveAggkitBlock(nil, blockNilParentHash, true) - require.NoError(t, err, "cannot get BlockHeader") - readBlock, _, err = storage.GetBlockHeaderByNumber(nil, blockNilParentHash.Number) - require.NoError(t, err, "cannot get BlockHeader") - require.Equal(t, blockNilParentHash, readBlock, "BlockHeader mismatch") -} - func TestStorage_GetLogs(t *testing.T) { storage := newStorageForTest(t, nil) // Logs not present @@ -197,108 +172,6 @@ func TestStorage_SaveEthLogsWithHeaders(t *testing.T) { require.Equal(t, logs[1], readLogs[1]) } -func TestStorage_GetSyncedBlockRangePerContract(t *testing.T) { - storage := newStorageForTest(t, nil) - data, err := storage.GetSyncedBlockRangePerContract(nil) - require.NoError(t, err) - require.Equal(t, "SetSyncSegment: ", data.String()) -} - -func TestStorage_UpsertSyncerConfigs(t *testing.T) { - storage := newStorageForTest(t, nil) - configs := []mdrtypes.ContractConfig{ - { - Address: exampleAddr1, - FromBlock: 1000, - ToBlock: aggkittypes.FinalizedBlock, - }, - { - Address: exampleAddr2, - FromBlock: 2000, - ToBlock: aggkittypes.LatestBlock, - }, - } - err := storage.UpsertSyncerConfigs(nil, configs) - require.NoError(t, err) - - // Upsert again with different start block - configsUpdated := []mdrtypes.ContractConfig{ - { - Address: exampleAddr1, - FromBlock: 1300, - ToBlock: aggkittypes.FinalizedBlock, - }, - { - Address: exampleAddr2, - FromBlock: 1600, - ToBlock: aggkittypes.FinalizedBlock, - }, - } - err = storage.UpsertSyncerConfigs(nil, configsUpdated) - require.NoError(t, err) - - syncSegments, err := storage.GetSyncedBlockRangePerContract(nil) - require.NoError(t, err) - require.Equal(t, 2, len(syncSegments.GetAddressesForBlockRange( - aggkitcommon.NewBlockRange(0, 10000), - ))) - seg1, exists := syncSegments.GetByContract(exampleAddr1) - require.True(t, exists) - require.Equal(t, aggkittypes.FinalizedBlock, seg1.TargetToBlock) - - seg2, exists := syncSegments.GetByContract(exampleAddr2) - require.True(t, exists) - require.Equal(t, aggkittypes.FinalizedBlock, seg2.TargetToBlock) -} - -func TestStorage_UpdateSyncedStatus(t *testing.T) { - storage := newStorageForTest(t, nil) - segments := []mdrtypes.SyncSegment{ - mdrtypes.NewSyncSegment( - exampleAddr1, - aggkitcommon.NewBlockRange(1000, 2000), - aggkittypes.FinalizedBlock, - true, - ), - mdrtypes.NewSyncSegment( - exampleAddr2, - aggkitcommon.NewBlockRange(1500, 2500), - aggkittypes.LatestBlock, - false, - ), - } - err := storage.UpsertSyncerConfigs(nil, []mdrtypes.ContractConfig{ - { - Address: exampleAddr1, - FromBlock: 1000, - ToBlock: aggkittypes.FinalizedBlock, - }, - { - Address: exampleAddr2, - FromBlock: 1500, - ToBlock: aggkittypes.LatestBlock, - }, - }) - require.NoError(t, err) - err = storage.UpdateSyncedStatus(nil, segments) - require.NoError(t, err) - - syncedSegments, err := storage.GetSyncedBlockRangePerContract(nil) - require.NoError(t, err) - require.Equal(t, 2, len(syncedSegments.GetAddressesForBlockRange( - aggkitcommon.NewBlockRange(0, 3000), - ))) - seg1, exists := syncedSegments.GetByContract(exampleAddr1) - require.True(t, exists) - require.Equal(t, aggkitcommon.NewBlockRange(1000, 2000), seg1.BlockRange) - require.Equal(t, aggkittypes.FinalizedBlock, seg1.TargetToBlock) - - seg2, exists := syncedSegments.GetByContract(exampleAddr2) - require.True(t, exists) - require.Equal(t, aggkitcommon.NewBlockRange(1500, 2500), seg2.BlockRange) - require.Equal(t, aggkittypes.LatestBlock, seg2.TargetToBlock) -} - func TestStorage_UpdateIsFinal(t *testing.T) { storage := newStorageForTest(t, nil) block := aggkittypes.NewBlockHeader(4000, exampleTestHash[5], 1630002000, nil) @@ -311,7 +184,10 @@ func TestStorage_UpdateIsFinal(t *testing.T) { require.Equal(t, block, readBlock, "BlockHeader mismatch") require.False(t, isFinal, "expected block to not be final") - err = storage.updateIsFinal(nil, []uint64{block.Number}) + err = storage.UpdateBlockToFinalized(nil, []uint64{}) + require.NoError(t, err, "if no blocks provided, should be no-op") + + err = storage.UpdateBlockToFinalized(nil, []uint64{block.Number}) require.NoError(t, err, "cannot update IsFinal") readBlock, isFinal, err = storage.GetBlockHeaderByNumber(nil, block.Number) @@ -372,3 +248,40 @@ func newStorageForTest(t *testing.T, dbFileFullPath *string) *MultidownloaderSto require.NoError(t, err, "cannot create storage") return storage } + +func populateLogsAndBlocksForTest(t *testing.T, storage *MultidownloaderStorage, + startingBlock uint64, numBlocks int, logsPerBlock int) { + t.Helper() + var blocks []*aggkittypes.BlockHeader + var logs []types.Log + for i := 0; i < numBlocks; i++ { + blockNumber := startingBlock + uint64(i) + blockHash := exampleTestHash[i%len(exampleTestHash)] + var parentHash *common.Hash + if i > 0 { + parentHash = &exampleTestHash[(i-1)%len(exampleTestHash)] + } + block := aggkittypes.NewBlockHeader(blockNumber, blockHash, 1630000000+uint64(i*60), parentHash) + blocks = append(blocks, block) + + for j := 0; j < logsPerBlock; j++ { + logEntry := types.Log{ + Address: exampleAddr1, + BlockNumber: blockNumber, + BlockHash: blockHash, + BlockTimestamp: 1630000000 + uint64(i*60), + Topics: []common.Hash{ + exampleTestHash[j%len(exampleTestHash)], + }, + Data: []byte{0x01, 0x02, byte(j)}, + TxHash: exampleTestHash[(i+j)%len(exampleTestHash)], + TxIndex: uint(100 + j), + Index: uint(10 + j), + } + logs = append(logs, logEntry) + } + } + + err := storage.SaveEthLogsWithHeaders(nil, blocks, logs, true) + require.NoError(t, err, "cannot populate logs and blocks") +} diff --git a/multidownloader/types/log_query.go b/multidownloader/types/log_query.go index ea3e442eb..af1009e97 100644 --- a/multidownloader/types/log_query.go +++ b/multidownloader/types/log_query.go @@ -13,6 +13,8 @@ import ( type LogQuery struct { Addrs []common.Address BlockRange aggkitcommon.BlockRange + // If BlockHash is set BlockRange contains the corresponding blockNumber + BlockHash *common.Hash } // NewLogQuery creates a new LogQuery @@ -23,12 +25,28 @@ func NewLogQuery(fromBlock uint64, toBlock uint64, addrs []common.Address) LogQu } } +func NewLogQueryBlockHash(blockNumber uint64, blockHash common.Hash, addrs []common.Address) LogQuery { + blockRange := aggkitcommon.BlockRangeZero + if blockNumber != 0 { + blockRange = aggkitcommon.NewBlockRange(blockNumber, blockNumber) + } + return LogQuery{ + Addrs: addrs, + BlockRange: blockRange, + BlockHash: &blockHash, + } +} + // NewLogQueryFromEthereumFilter creates a new LogQuery from an Ethereum FilterQuery func NewLogQueryFromEthereumFilter(query ethereum.FilterQuery) LogQuery { - return LogQuery{ - Addrs: query.Addresses, - BlockRange: aggkitcommon.NewBlockRange(query.FromBlock.Uint64(), query.ToBlock.Uint64()), + if query.BlockHash != nil { + blockNumber := uint64(0) + if query.FromBlock != nil { + blockNumber = query.FromBlock.Uint64() + } + return NewLogQueryBlockHash(blockNumber, *query.BlockHash, query.Addresses) } + return NewLogQuery(query.FromBlock.Uint64(), query.ToBlock.Uint64(), query.Addresses) } // String returns a string representation of the LogQuery @@ -36,11 +54,24 @@ func (l *LogQuery) String() string { if l == nil { return "LogQuery: " } + if l.BlockHash != nil { + bn := " (?)" + if !l.BlockRange.IsEmpty() { + bn = fmt.Sprintf(" (%d)", l.BlockRange.FromBlock) + } + return fmt.Sprintf("LogQuery: addrs=%v, blockHash=%s%s", l.Addrs, l.BlockHash.String(), bn) + } return fmt.Sprintf("LogQuery: addrs=%v, blockRange=%s", l.Addrs, l.BlockRange.String()) } // ToRPCFilterQuery converts the LogQuery to an Ethereum FilterQuery func (l *LogQuery) ToRPCFilterQuery() ethereum.FilterQuery { + if l.BlockHash != nil { + return ethereum.FilterQuery{ + Addresses: l.Addrs, + BlockHash: l.BlockHash, + } + } return ethereum.FilterQuery{ Addresses: l.Addrs, FromBlock: new(big.Int).SetUint64(l.BlockRange.FromBlock), diff --git a/multidownloader/types/log_query_test.go b/multidownloader/types/log_query_test.go index 99a98923d..56bedf50e 100644 --- a/multidownloader/types/log_query_test.go +++ b/multidownloader/types/log_query_test.go @@ -64,3 +64,22 @@ func TestLogQuery_ToRPCFilterQuery(t *testing.T) { require.Equal(t, big.NewInt(1), filter.FromBlock) require.Equal(t, big.NewInt(10), filter.ToBlock) } + +func TestLogQuery_BlockHash(t *testing.T) { + lq := NewLogQueryBlockHash(1234, common.HexToHash("0xabc"), []common.Address{common.HexToAddress("0x123")}) + require.Equal(t, common.HexToHash("0xabc"), *lq.BlockHash) + require.Equal(t, []common.Address{common.HexToAddress("0x123")}, lq.Addrs) + blockHash := common.HexToHash("0xabc") + lq2 := NewLogQueryFromEthereumFilter(ethereum.FilterQuery{ + Addresses: []common.Address{common.HexToAddress("0x123")}, + BlockHash: &blockHash, + }) + require.Equal(t, "LogQuery: addrs=[0x0000000000000000000000000000000000000123], blockHash=0x0000000000000000000000000000000000000000000000000000000000000abc (?)", + lq2.String()) + + rpcFilter := lq.ToRPCFilterQuery() + require.Equal(t, common.HexToHash("0xabc"), *rpcFilter.BlockHash) + require.Equal(t, []common.Address{common.HexToAddress("0x123")}, rpcFilter.Addresses) + require.Equal(t, "LogQuery: addrs=[0x0000000000000000000000000000000000000123], blockHash=0x0000000000000000000000000000000000000000000000000000000000000abc (1234)", + lq.String()) +} diff --git a/multidownloader/types/mocks/mock_reorg_processor.go b/multidownloader/types/mocks/mock_reorg_processor.go new file mode 100644 index 000000000..e9b869d3e --- /dev/null +++ b/multidownloader/types/mocks/mock_reorg_processor.go @@ -0,0 +1,83 @@ +// Code generated by mockery. DO NOT EDIT. + +package mocks + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" +) + +// ReorgProcessor is an autogenerated mock type for the ReorgProcessor type +type ReorgProcessor struct { + mock.Mock +} + +type ReorgProcessor_Expecter struct { + mock *mock.Mock +} + +func (_m *ReorgProcessor) EXPECT() *ReorgProcessor_Expecter { + return &ReorgProcessor_Expecter{mock: &_m.Mock} +} + +// ProcessReorg provides a mock function with given fields: ctx, offendingBlockNumber +func (_m *ReorgProcessor) ProcessReorg(ctx context.Context, offendingBlockNumber uint64) error { + ret := _m.Called(ctx, offendingBlockNumber) + + if len(ret) == 0 { + panic("no return value specified for ProcessReorg") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, uint64) error); ok { + r0 = rf(ctx, offendingBlockNumber) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// ReorgProcessor_ProcessReorg_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ProcessReorg' +type ReorgProcessor_ProcessReorg_Call struct { + *mock.Call +} + +// ProcessReorg is a helper method to define mock.On call +// - ctx context.Context +// - offendingBlockNumber uint64 +func (_e *ReorgProcessor_Expecter) ProcessReorg(ctx interface{}, offendingBlockNumber interface{}) *ReorgProcessor_ProcessReorg_Call { + return &ReorgProcessor_ProcessReorg_Call{Call: _e.mock.On("ProcessReorg", ctx, offendingBlockNumber)} +} + +func (_c *ReorgProcessor_ProcessReorg_Call) Run(run func(ctx context.Context, offendingBlockNumber uint64)) *ReorgProcessor_ProcessReorg_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(uint64)) + }) + return _c +} + +func (_c *ReorgProcessor_ProcessReorg_Call) Return(_a0 error) *ReorgProcessor_ProcessReorg_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *ReorgProcessor_ProcessReorg_Call) RunAndReturn(run func(context.Context, uint64) error) *ReorgProcessor_ProcessReorg_Call { + _c.Call.Return(run) + return _c +} + +// NewReorgProcessor creates a new instance of ReorgProcessor. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewReorgProcessor(t interface { + mock.TestingT + Cleanup(func()) +}) *ReorgProcessor { + mock := &ReorgProcessor{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/multidownloader/types/mocks/mock_storager.go b/multidownloader/types/mocks/mock_storager.go index 2319d32c0..4ed827dfa 100644 --- a/multidownloader/types/mocks/mock_storager.go +++ b/multidownloader/types/mocks/mock_storager.go @@ -95,6 +95,65 @@ func (_c *Storager_GetBlockHeaderByNumber_Call) RunAndReturn(run func(types.Quer return _c } +// GetBlockHeadersNotFinalized provides a mock function with given fields: tx, maxBlock +func (_m *Storager) GetBlockHeadersNotFinalized(tx types.Querier, maxBlock uint64) (aggkittypes.ListBlockHeaders, error) { + ret := _m.Called(tx, maxBlock) + + if len(ret) == 0 { + panic("no return value specified for GetBlockHeadersNotFinalized") + } + + var r0 aggkittypes.ListBlockHeaders + var r1 error + if rf, ok := ret.Get(0).(func(types.Querier, uint64) (aggkittypes.ListBlockHeaders, error)); ok { + return rf(tx, maxBlock) + } + if rf, ok := ret.Get(0).(func(types.Querier, uint64) aggkittypes.ListBlockHeaders); ok { + r0 = rf(tx, maxBlock) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(aggkittypes.ListBlockHeaders) + } + } + + if rf, ok := ret.Get(1).(func(types.Querier, uint64) error); ok { + r1 = rf(tx, maxBlock) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Storager_GetBlockHeadersNotFinalized_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetBlockHeadersNotFinalized' +type Storager_GetBlockHeadersNotFinalized_Call struct { + *mock.Call +} + +// GetBlockHeadersNotFinalized is a helper method to define mock.On call +// - tx types.Querier +// - maxBlock uint64 +func (_e *Storager_Expecter) GetBlockHeadersNotFinalized(tx interface{}, maxBlock interface{}) *Storager_GetBlockHeadersNotFinalized_Call { + return &Storager_GetBlockHeadersNotFinalized_Call{Call: _e.mock.On("GetBlockHeadersNotFinalized", tx, maxBlock)} +} + +func (_c *Storager_GetBlockHeadersNotFinalized_Call) Run(run func(tx types.Querier, maxBlock uint64)) *Storager_GetBlockHeadersNotFinalized_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(types.Querier), args[1].(uint64)) + }) + return _c +} + +func (_c *Storager_GetBlockHeadersNotFinalized_Call) Return(_a0 aggkittypes.ListBlockHeaders, _a1 error) *Storager_GetBlockHeadersNotFinalized_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *Storager_GetBlockHeadersNotFinalized_Call) RunAndReturn(run func(types.Querier, uint64) (aggkittypes.ListBlockHeaders, error)) *Storager_GetBlockHeadersNotFinalized_Call { + _c.Call.Return(run) + return _c +} + // GetEthLogs provides a mock function with given fields: tx, query func (_m *Storager) GetEthLogs(tx types.Querier, query multidownloadertypes.LogQuery) ([]coretypes.Log, error) { ret := _m.Called(tx, query) @@ -154,6 +213,74 @@ func (_c *Storager_GetEthLogs_Call) RunAndReturn(run func(types.Querier, multido return _c } +// GetRangeBlockHeader provides a mock function with given fields: tx, isFinal +func (_m *Storager) GetRangeBlockHeader(tx types.Querier, isFinal multidownloadertypes.FinalizedType) (*aggkittypes.BlockHeader, *aggkittypes.BlockHeader, error) { + ret := _m.Called(tx, isFinal) + + if len(ret) == 0 { + panic("no return value specified for GetRangeBlockHeader") + } + + var r0 *aggkittypes.BlockHeader + var r1 *aggkittypes.BlockHeader + var r2 error + if rf, ok := ret.Get(0).(func(types.Querier, multidownloadertypes.FinalizedType) (*aggkittypes.BlockHeader, *aggkittypes.BlockHeader, error)); ok { + return rf(tx, isFinal) + } + if rf, ok := ret.Get(0).(func(types.Querier, multidownloadertypes.FinalizedType) *aggkittypes.BlockHeader); ok { + r0 = rf(tx, isFinal) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*aggkittypes.BlockHeader) + } + } + + if rf, ok := ret.Get(1).(func(types.Querier, multidownloadertypes.FinalizedType) *aggkittypes.BlockHeader); ok { + r1 = rf(tx, isFinal) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(*aggkittypes.BlockHeader) + } + } + + if rf, ok := ret.Get(2).(func(types.Querier, multidownloadertypes.FinalizedType) error); ok { + r2 = rf(tx, isFinal) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// Storager_GetRangeBlockHeader_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetRangeBlockHeader' +type Storager_GetRangeBlockHeader_Call struct { + *mock.Call +} + +// GetRangeBlockHeader is a helper method to define mock.On call +// - tx types.Querier +// - isFinal multidownloadertypes.FinalizedType +func (_e *Storager_Expecter) GetRangeBlockHeader(tx interface{}, isFinal interface{}) *Storager_GetRangeBlockHeader_Call { + return &Storager_GetRangeBlockHeader_Call{Call: _e.mock.On("GetRangeBlockHeader", tx, isFinal)} +} + +func (_c *Storager_GetRangeBlockHeader_Call) Run(run func(tx types.Querier, isFinal multidownloadertypes.FinalizedType)) *Storager_GetRangeBlockHeader_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(types.Querier), args[1].(multidownloadertypes.FinalizedType)) + }) + return _c +} + +func (_c *Storager_GetRangeBlockHeader_Call) Return(lowest *aggkittypes.BlockHeader, highest *aggkittypes.BlockHeader, err error) *Storager_GetRangeBlockHeader_Call { + _c.Call.Return(lowest, highest, err) + return _c +} + +func (_c *Storager_GetRangeBlockHeader_Call) RunAndReturn(run func(types.Querier, multidownloadertypes.FinalizedType) (*aggkittypes.BlockHeader, *aggkittypes.BlockHeader, error)) *Storager_GetRangeBlockHeader_Call { + _c.Call.Return(run) + return _c +} + // GetSyncedBlockRangePerContract provides a mock function with given fields: tx func (_m *Storager) GetSyncedBlockRangePerContract(tx types.Querier) (multidownloadertypes.SetSyncSegment, error) { ret := _m.Called(tx) @@ -268,6 +395,63 @@ func (_c *Storager_GetValue_Call) RunAndReturn(run func(types.Querier, string, s return _c } +// InsertReorgAndMoveReorgedBlocksAndLogs provides a mock function with given fields: tx, reorgData +func (_m *Storager) InsertReorgAndMoveReorgedBlocksAndLogs(tx types.Querier, reorgData multidownloadertypes.ReorgData) (uint64, error) { + ret := _m.Called(tx, reorgData) + + if len(ret) == 0 { + panic("no return value specified for InsertReorgAndMoveReorgedBlocksAndLogs") + } + + var r0 uint64 + var r1 error + if rf, ok := ret.Get(0).(func(types.Querier, multidownloadertypes.ReorgData) (uint64, error)); ok { + return rf(tx, reorgData) + } + if rf, ok := ret.Get(0).(func(types.Querier, multidownloadertypes.ReorgData) uint64); ok { + r0 = rf(tx, reorgData) + } else { + r0 = ret.Get(0).(uint64) + } + + if rf, ok := ret.Get(1).(func(types.Querier, multidownloadertypes.ReorgData) error); ok { + r1 = rf(tx, reorgData) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Storager_InsertReorgAndMoveReorgedBlocksAndLogs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'InsertReorgAndMoveReorgedBlocksAndLogs' +type Storager_InsertReorgAndMoveReorgedBlocksAndLogs_Call struct { + *mock.Call +} + +// InsertReorgAndMoveReorgedBlocksAndLogs is a helper method to define mock.On call +// - tx types.Querier +// - reorgData multidownloadertypes.ReorgData +func (_e *Storager_Expecter) InsertReorgAndMoveReorgedBlocksAndLogs(tx interface{}, reorgData interface{}) *Storager_InsertReorgAndMoveReorgedBlocksAndLogs_Call { + return &Storager_InsertReorgAndMoveReorgedBlocksAndLogs_Call{Call: _e.mock.On("InsertReorgAndMoveReorgedBlocksAndLogs", tx, reorgData)} +} + +func (_c *Storager_InsertReorgAndMoveReorgedBlocksAndLogs_Call) Run(run func(tx types.Querier, reorgData multidownloadertypes.ReorgData)) *Storager_InsertReorgAndMoveReorgedBlocksAndLogs_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(types.Querier), args[1].(multidownloadertypes.ReorgData)) + }) + return _c +} + +func (_c *Storager_InsertReorgAndMoveReorgedBlocksAndLogs_Call) Return(_a0 uint64, _a1 error) *Storager_InsertReorgAndMoveReorgedBlocksAndLogs_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *Storager_InsertReorgAndMoveReorgedBlocksAndLogs_Call) RunAndReturn(run func(types.Querier, multidownloadertypes.ReorgData) (uint64, error)) *Storager_InsertReorgAndMoveReorgedBlocksAndLogs_Call { + _c.Call.Return(run) + return _c +} + // InsertValue provides a mock function with given fields: tx, owner, key, value func (_m *Storager) InsertValue(tx types.Querier, owner string, key string, value string) error { ret := _m.Called(tx, owner, key, value) @@ -376,7 +560,7 @@ func (_c *Storager_NewTx_Call) RunAndReturn(run func(context.Context) (types.Txe } // SaveEthLogsWithHeaders provides a mock function with given fields: tx, blockHeaders, logs, isFinal -func (_m *Storager) SaveEthLogsWithHeaders(tx types.Querier, blockHeaders []*aggkittypes.BlockHeader, logs []coretypes.Log, isFinal bool) error { +func (_m *Storager) SaveEthLogsWithHeaders(tx types.Querier, blockHeaders aggkittypes.ListBlockHeaders, logs []coretypes.Log, isFinal bool) error { ret := _m.Called(tx, blockHeaders, logs, isFinal) if len(ret) == 0 { @@ -384,7 +568,7 @@ func (_m *Storager) SaveEthLogsWithHeaders(tx types.Querier, blockHeaders []*agg } var r0 error - if rf, ok := ret.Get(0).(func(types.Querier, []*aggkittypes.BlockHeader, []coretypes.Log, bool) error); ok { + if rf, ok := ret.Get(0).(func(types.Querier, aggkittypes.ListBlockHeaders, []coretypes.Log, bool) error); ok { r0 = rf(tx, blockHeaders, logs, isFinal) } else { r0 = ret.Error(0) @@ -400,16 +584,16 @@ type Storager_SaveEthLogsWithHeaders_Call struct { // SaveEthLogsWithHeaders is a helper method to define mock.On call // - tx types.Querier -// - blockHeaders []*aggkittypes.BlockHeader +// - blockHeaders aggkittypes.ListBlockHeaders // - logs []coretypes.Log // - isFinal bool func (_e *Storager_Expecter) SaveEthLogsWithHeaders(tx interface{}, blockHeaders interface{}, logs interface{}, isFinal interface{}) *Storager_SaveEthLogsWithHeaders_Call { return &Storager_SaveEthLogsWithHeaders_Call{Call: _e.mock.On("SaveEthLogsWithHeaders", tx, blockHeaders, logs, isFinal)} } -func (_c *Storager_SaveEthLogsWithHeaders_Call) Run(run func(tx types.Querier, blockHeaders []*aggkittypes.BlockHeader, logs []coretypes.Log, isFinal bool)) *Storager_SaveEthLogsWithHeaders_Call { +func (_c *Storager_SaveEthLogsWithHeaders_Call) Run(run func(tx types.Querier, blockHeaders aggkittypes.ListBlockHeaders, logs []coretypes.Log, isFinal bool)) *Storager_SaveEthLogsWithHeaders_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(types.Querier), args[1].([]*aggkittypes.BlockHeader), args[2].([]coretypes.Log), args[3].(bool)) + run(args[0].(types.Querier), args[1].(aggkittypes.ListBlockHeaders), args[2].([]coretypes.Log), args[3].(bool)) }) return _c } @@ -419,7 +603,54 @@ func (_c *Storager_SaveEthLogsWithHeaders_Call) Return(_a0 error) *Storager_Save return _c } -func (_c *Storager_SaveEthLogsWithHeaders_Call) RunAndReturn(run func(types.Querier, []*aggkittypes.BlockHeader, []coretypes.Log, bool) error) *Storager_SaveEthLogsWithHeaders_Call { +func (_c *Storager_SaveEthLogsWithHeaders_Call) RunAndReturn(run func(types.Querier, aggkittypes.ListBlockHeaders, []coretypes.Log, bool) error) *Storager_SaveEthLogsWithHeaders_Call { + _c.Call.Return(run) + return _c +} + +// UpdateBlockToFinalized provides a mock function with given fields: tx, blockNumbers +func (_m *Storager) UpdateBlockToFinalized(tx types.Querier, blockNumbers []uint64) error { + ret := _m.Called(tx, blockNumbers) + + if len(ret) == 0 { + panic("no return value specified for UpdateBlockToFinalized") + } + + var r0 error + if rf, ok := ret.Get(0).(func(types.Querier, []uint64) error); ok { + r0 = rf(tx, blockNumbers) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Storager_UpdateBlockToFinalized_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateBlockToFinalized' +type Storager_UpdateBlockToFinalized_Call struct { + *mock.Call +} + +// UpdateBlockToFinalized is a helper method to define mock.On call +// - tx types.Querier +// - blockNumbers []uint64 +func (_e *Storager_Expecter) UpdateBlockToFinalized(tx interface{}, blockNumbers interface{}) *Storager_UpdateBlockToFinalized_Call { + return &Storager_UpdateBlockToFinalized_Call{Call: _e.mock.On("UpdateBlockToFinalized", tx, blockNumbers)} +} + +func (_c *Storager_UpdateBlockToFinalized_Call) Run(run func(tx types.Querier, blockNumbers []uint64)) *Storager_UpdateBlockToFinalized_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(types.Querier), args[1].([]uint64)) + }) + return _c +} + +func (_c *Storager_UpdateBlockToFinalized_Call) Return(_a0 error) *Storager_UpdateBlockToFinalized_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Storager_UpdateBlockToFinalized_Call) RunAndReturn(run func(types.Querier, []uint64) error) *Storager_UpdateBlockToFinalized_Call { _c.Call.Return(run) return _c } diff --git a/multidownloader/types/mocks/mock_storager_for_reorg.go b/multidownloader/types/mocks/mock_storager_for_reorg.go new file mode 100644 index 000000000..74bf29868 --- /dev/null +++ b/multidownloader/types/mocks/mock_storager_for_reorg.go @@ -0,0 +1,162 @@ +// Code generated by mockery. DO NOT EDIT. + +package mocks + +import ( + aggkittypes "github.com/agglayer/aggkit/types" + mock "github.com/stretchr/testify/mock" + + multidownloadertypes "github.com/agglayer/aggkit/multidownloader/types" + + types "github.com/agglayer/aggkit/db/types" +) + +// StoragerForReorg is an autogenerated mock type for the StoragerForReorg type +type StoragerForReorg struct { + mock.Mock +} + +type StoragerForReorg_Expecter struct { + mock *mock.Mock +} + +func (_m *StoragerForReorg) EXPECT() *StoragerForReorg_Expecter { + return &StoragerForReorg_Expecter{mock: &_m.Mock} +} + +// GetBlockHeaderByNumber provides a mock function with given fields: tx, blockNumber +func (_m *StoragerForReorg) GetBlockHeaderByNumber(tx types.Querier, blockNumber uint64) (*aggkittypes.BlockHeader, multidownloadertypes.FinalizedType, error) { + ret := _m.Called(tx, blockNumber) + + if len(ret) == 0 { + panic("no return value specified for GetBlockHeaderByNumber") + } + + var r0 *aggkittypes.BlockHeader + var r1 multidownloadertypes.FinalizedType + var r2 error + if rf, ok := ret.Get(0).(func(types.Querier, uint64) (*aggkittypes.BlockHeader, multidownloadertypes.FinalizedType, error)); ok { + return rf(tx, blockNumber) + } + if rf, ok := ret.Get(0).(func(types.Querier, uint64) *aggkittypes.BlockHeader); ok { + r0 = rf(tx, blockNumber) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*aggkittypes.BlockHeader) + } + } + + if rf, ok := ret.Get(1).(func(types.Querier, uint64) multidownloadertypes.FinalizedType); ok { + r1 = rf(tx, blockNumber) + } else { + r1 = ret.Get(1).(multidownloadertypes.FinalizedType) + } + + if rf, ok := ret.Get(2).(func(types.Querier, uint64) error); ok { + r2 = rf(tx, blockNumber) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// StoragerForReorg_GetBlockHeaderByNumber_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetBlockHeaderByNumber' +type StoragerForReorg_GetBlockHeaderByNumber_Call struct { + *mock.Call +} + +// GetBlockHeaderByNumber is a helper method to define mock.On call +// - tx types.Querier +// - blockNumber uint64 +func (_e *StoragerForReorg_Expecter) GetBlockHeaderByNumber(tx interface{}, blockNumber interface{}) *StoragerForReorg_GetBlockHeaderByNumber_Call { + return &StoragerForReorg_GetBlockHeaderByNumber_Call{Call: _e.mock.On("GetBlockHeaderByNumber", tx, blockNumber)} +} + +func (_c *StoragerForReorg_GetBlockHeaderByNumber_Call) Run(run func(tx types.Querier, blockNumber uint64)) *StoragerForReorg_GetBlockHeaderByNumber_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(types.Querier), args[1].(uint64)) + }) + return _c +} + +func (_c *StoragerForReorg_GetBlockHeaderByNumber_Call) Return(_a0 *aggkittypes.BlockHeader, _a1 multidownloadertypes.FinalizedType, _a2 error) *StoragerForReorg_GetBlockHeaderByNumber_Call { + _c.Call.Return(_a0, _a1, _a2) + return _c +} + +func (_c *StoragerForReorg_GetBlockHeaderByNumber_Call) RunAndReturn(run func(types.Querier, uint64) (*aggkittypes.BlockHeader, multidownloadertypes.FinalizedType, error)) *StoragerForReorg_GetBlockHeaderByNumber_Call { + _c.Call.Return(run) + return _c +} + +// InsertReorgAndMoveReorgedBlocksAndLogs provides a mock function with given fields: tx, reorgData +func (_m *StoragerForReorg) InsertReorgAndMoveReorgedBlocksAndLogs(tx types.Querier, reorgData multidownloadertypes.ReorgData) (uint64, error) { + ret := _m.Called(tx, reorgData) + + if len(ret) == 0 { + panic("no return value specified for InsertReorgAndMoveReorgedBlocksAndLogs") + } + + var r0 uint64 + var r1 error + if rf, ok := ret.Get(0).(func(types.Querier, multidownloadertypes.ReorgData) (uint64, error)); ok { + return rf(tx, reorgData) + } + if rf, ok := ret.Get(0).(func(types.Querier, multidownloadertypes.ReorgData) uint64); ok { + r0 = rf(tx, reorgData) + } else { + r0 = ret.Get(0).(uint64) + } + + if rf, ok := ret.Get(1).(func(types.Querier, multidownloadertypes.ReorgData) error); ok { + r1 = rf(tx, reorgData) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// StoragerForReorg_InsertReorgAndMoveReorgedBlocksAndLogs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'InsertReorgAndMoveReorgedBlocksAndLogs' +type StoragerForReorg_InsertReorgAndMoveReorgedBlocksAndLogs_Call struct { + *mock.Call +} + +// InsertReorgAndMoveReorgedBlocksAndLogs is a helper method to define mock.On call +// - tx types.Querier +// - reorgData multidownloadertypes.ReorgData +func (_e *StoragerForReorg_Expecter) InsertReorgAndMoveReorgedBlocksAndLogs(tx interface{}, reorgData interface{}) *StoragerForReorg_InsertReorgAndMoveReorgedBlocksAndLogs_Call { + return &StoragerForReorg_InsertReorgAndMoveReorgedBlocksAndLogs_Call{Call: _e.mock.On("InsertReorgAndMoveReorgedBlocksAndLogs", tx, reorgData)} +} + +func (_c *StoragerForReorg_InsertReorgAndMoveReorgedBlocksAndLogs_Call) Run(run func(tx types.Querier, reorgData multidownloadertypes.ReorgData)) *StoragerForReorg_InsertReorgAndMoveReorgedBlocksAndLogs_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(types.Querier), args[1].(multidownloadertypes.ReorgData)) + }) + return _c +} + +func (_c *StoragerForReorg_InsertReorgAndMoveReorgedBlocksAndLogs_Call) Return(_a0 uint64, _a1 error) *StoragerForReorg_InsertReorgAndMoveReorgedBlocksAndLogs_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *StoragerForReorg_InsertReorgAndMoveReorgedBlocksAndLogs_Call) RunAndReturn(run func(types.Querier, multidownloadertypes.ReorgData) (uint64, error)) *StoragerForReorg_InsertReorgAndMoveReorgedBlocksAndLogs_Call { + _c.Call.Return(run) + return _c +} + +// NewStoragerForReorg creates a new instance of StoragerForReorg. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewStoragerForReorg(t interface { + mock.TestingT + Cleanup(func()) +}) *StoragerForReorg { + mock := &StoragerForReorg{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/multidownloader/types/reorg_data.go b/multidownloader/types/reorg_data.go new file mode 100644 index 000000000..dbfea44b7 --- /dev/null +++ b/multidownloader/types/reorg_data.go @@ -0,0 +1,30 @@ +package types + +import ( + "fmt" + + aggkitcommon "github.com/agglayer/aggkit/common" + aggkittypes "github.com/agglayer/aggkit/types" +) + +type ReorgData struct { + ChainID uint64 + BlockRangeAffected aggkitcommon.BlockRange + DetectedAtBlock uint64 + DetectedTimestamp uint64 + NetworkLatestBlock uint64 + NetworkFinalizedBlock uint64 + NetworkFinalizedBlockName aggkittypes.BlockNumberFinality +} + +func (r *ReorgData) String() string { + return fmt.Sprintf("ReorgData{ChainID: %d, BlockRangeAffected: %s, DetectedAtBlock: %d, DetectedTimestamp: %d, "+ + "NetworkLatestBlock: %d, NetworkFinalizedBlock: %d (%s)}", + r.ChainID, + r.BlockRangeAffected.String(), + r.DetectedAtBlock, + r.DetectedTimestamp, + r.NetworkLatestBlock, + r.NetworkFinalizedBlock, + r.NetworkFinalizedBlockName.String()) +} diff --git a/multidownloader/types/reorg_error.go b/multidownloader/types/reorg_error.go new file mode 100644 index 000000000..01ad209d1 --- /dev/null +++ b/multidownloader/types/reorg_error.go @@ -0,0 +1,62 @@ +package types + +import ( + "errors" + "fmt" + + "github.com/ethereum/go-ethereum/common" +) + +// ReorgError is an error that is raised when a reorg is detected +type ReorgError struct { + OffendingBlockNumber uint64 + OldHash common.Hash + NewHash common.Hash + Message string +} + +// IsReorgError checks if an error is a ReorgError +func IsReorgError(err error) bool { + c := CastReorgError(err) + return c != nil +} + +// NewReorgError creates a new ReorgError +func NewReorgError(offendingBlockNumber uint64, + oldHash, newHash common.Hash, msg string) *ReorgError { + return &ReorgError{ + OffendingBlockNumber: offendingBlockNumber, + OldHash: oldHash, + NewHash: newHash, + Message: msg, + } +} + +func (e *ReorgError) Error() string { + return fmt.Sprintf("reorgError: block number %d: old hash %s != new hash %s: %s", + e.OffendingBlockNumber, e.OldHash.String(), e.NewHash.String(), e.Message) +} + +func CastReorgError(err error) *ReorgError { + var reorgErr *ReorgError + if errors.As(err, &reorgErr) { + return reorgErr + } + return nil +} + +// // GetReorgErrorBlockNumber returns the block number that caused the reorg +// func GetReorgErrorBlockNumber(err error) uint64 { +// if reorgErr, ok := err.(*ReorgError); ok { +// return reorgErr.BlockNumber +// } +// return 0 +// } + +// // GetReorgErrorWrappedError returns the wrapped error that caused the reorg +// func GetReorgErrorWrappedError(err error) error { +// if reorgErr, ok := err.(*ReorgError); ok { +// return reorgErr.Err +// } +// return nil +// } diff --git a/multidownloader/types/reorg_processor.go b/multidownloader/types/reorg_processor.go new file mode 100644 index 000000000..537815a72 --- /dev/null +++ b/multidownloader/types/reorg_processor.go @@ -0,0 +1,10 @@ +package types + +import "context" + +type ReorgProcessor interface { + // ProcessReorg processes a detected reorg starting from the offending block number. + // It identifies the range of blocks affected by the reorg and takes necessary actions + // to handle the reorganization. + ProcessReorg(ctx context.Context, offendingBlockNumber uint64) error +} diff --git a/multidownloader/types/set_sync_segment.go b/multidownloader/types/set_sync_segment.go index 5ce083580..6d0416827 100644 --- a/multidownloader/types/set_sync_segment.go +++ b/multidownloader/types/set_sync_segment.go @@ -48,15 +48,6 @@ func NewSetSyncSegmentFromLogQuery(logQuery *LogQuery) SetSyncSegment { return set } -// Segments returns all SyncSegments in the SetSyncSegment -func (s *SetSyncSegment) Segments() []SyncSegment { - result := make([]SyncSegment, 0, len(s.segments)) - for _, segment := range s.segments { - result = append(result, *segment) - } - return result -} - // Add adds a new SyncSegment to the SetSyncSegment, merging block ranges // if the contract address already exists func (s *SetSyncSegment) Add(segment SyncSegment) { @@ -72,6 +63,7 @@ func (s *SetSyncSegment) Add(segment SyncSegment) { } // GetByContract returns the SyncSegment for the given contract address + func (s *SetSyncSegment) GetByContract(addr common.Address) (SyncSegment, bool) { if s == nil { return SyncSegment{}, false @@ -91,13 +83,13 @@ func (f *SetSyncSegment) SubtractSegments(segments *SetSyncSegment) error { return nil } newSegments := f.Clone() - for _, segment := range segments.Segments() { + for _, segment := range segments.segments { previousSegment, exists := newSegments.GetByContract(segment.ContractAddr) if exists { brs := previousSegment.BlockRange.Subtract(segment.BlockRange) switch len(brs) { case 0: - newSegments.Remove(&previousSegment) + newSegments.Empty(&previousSegment) case 1: newSegments.UpdateBlockRange(&previousSegment, brs[0]) default: @@ -215,6 +207,35 @@ func (f *SetSyncSegment) NextQuery(syncBlockChunkSize uint32, maxBlockNumber uin BlockRange: br, }, nil } +func (f *SetSyncSegment) GetHighestBlockNumber() uint64 { + if f == nil || len(f.segments) == 0 { + return 0 + } + highest := uint64(0) + for _, segment := range f.segments { + if segment.BlockRange.ToBlock > highest { + highest = segment.BlockRange.ToBlock + } + } + return highest +} + +func (f *SetSyncSegment) GetTotalPendingBlockRange() *aggkitcommon.BlockRange { + if f == nil || len(f.segments) == 0 { + return nil + } + var totalRange *aggkitcommon.BlockRange + for _, segment := range f.segments { + if totalRange == nil { + br := segment.BlockRange + totalRange = &br + } else { + extended := totalRange.Extend(segment.BlockRange) + totalRange = &extended + } + } + return totalRange +} func (f *SetSyncSegment) GetLowestFromBlockSegment() *SyncSegment { if f == nil || len(f.segments) == 0 { @@ -239,8 +260,21 @@ func (f *SetSyncSegment) GetAddressesForBlockRange(blockRange aggkitcommon.Block return addresses } +func (f *SetSyncSegment) GetAddressesForBlock(blockNumber uint64) []common.Address { + blockRange := aggkitcommon.NewBlockRange(blockNumber, blockNumber) + return f.GetAddressesForBlockRange(blockRange) +} + func (f *SetSyncSegment) Finished() bool { - return f == nil || len(f.segments) == 0 + if f == nil || len(f.segments) == 0 { + return true + } + for _, segment := range f.segments { + if !segment.IsEmpty() { + return false + } + } + return true } func (f *SetSyncSegment) Clone() *SetSyncSegment { @@ -254,6 +288,15 @@ func (f *SetSyncSegment) Clone() *SetSyncSegment { return &newSet } +func (f *SetSyncSegment) Empty(segment *SyncSegment) { + for _, s := range f.segments { + if s.Equal(*segment) { + s.Empty() + return + } + } +} + func (f *SetSyncSegment) Remove(segmentToRemove *SyncSegment) { if f == nil || segmentToRemove == nil { return diff --git a/multidownloader/types/set_sync_segment_test.go b/multidownloader/types/set_sync_segment_test.go index 27a5fff6f..bf29775c4 100644 --- a/multidownloader/types/set_sync_segment_test.go +++ b/multidownloader/types/set_sync_segment_test.go @@ -30,19 +30,6 @@ func TestSetSyncSegment_String(t *testing.T) { require.Contains(t, result, "SyncSegment[0]=") } -func TestSetSyncSegment_Segments(t *testing.T) { - set := NewSetSyncSegment() - segment := SyncSegment{ - ContractAddr: common.HexToAddress("0x123"), - BlockRange: aggkitcommon.NewBlockRange(1, 10), - } - set.segments = []*SyncSegment{&segment} - - result := set.Segments() - require.Len(t, result, 1) - require.Equal(t, segment, result[0]) -} - func TestSetSyncSegment_Add(t *testing.T) { t.Run("add new segment", func(t *testing.T) { set := NewSetSyncSegment() @@ -270,6 +257,16 @@ func TestSetSyncSegment_Finished(t *testing.T) { set.segments = []*SyncSegment{segment} require.False(t, set.Finished()) }) + t.Run("empty segment", func(t *testing.T) { + set := NewSetSyncSegment() + segment := &SyncSegment{ + ContractAddr: common.HexToAddress("0x123"), + BlockRange: aggkitcommon.NewBlockRange(1, 10), + } + segment.Empty() + set.segments = []*SyncSegment{segment} + require.True(t, set.Finished()) + }) } func TestSetSyncSegment_Clone(t *testing.T) { @@ -367,7 +364,7 @@ func TestSetSyncSegment_RemoveLogQuerySegment(t *testing.T) { require.Equal(t, uint64(100), res.BlockRange.ToBlock) }) - t.Run("remove totally a segment", func(t *testing.T) { + t.Run("fulfill totally a segment,set it as empty ", func(t *testing.T) { set := NewSetSyncSegment() addr := common.HexToAddress("0x123") segment := SyncSegment{ @@ -383,8 +380,9 @@ func TestSetSyncSegment_RemoveLogQuerySegment(t *testing.T) { err := set.SubtractLogQuery(logQuery) require.NoError(t, err) - _, exists := set.GetByContract(addr) - require.False(t, exists) + segment, exists := set.GetByContract(addr) + require.True(t, segment.IsEmpty(), "segment is empty") + require.True(t, exists, "is empty but exists") }) t.Run("bad removed segment (middle segment)", func(t *testing.T) { @@ -405,3 +403,36 @@ func TestSetSyncSegment_RemoveLogQuerySegment(t *testing.T) { require.Error(t, err) }) } +func TestSetSyncSegment_AfterFullySync(t *testing.T) { + set := NewSetSyncSegment() + addr := common.HexToAddress("0x123124543423") + segment := SyncSegment{ + ContractAddr: addr, + BlockRange: aggkitcommon.NewBlockRange(1, 100), + TargetToBlock: aggkittypes.LatestBlock, + } + set.Add(segment) + + logQuery := &LogQuery{ + Addrs: []common.Address{addr}, + BlockRange: aggkitcommon.NewBlockRange(1, 100), + } + + err := set.SubtractLogQuery(logQuery) + require.NoError(t, err) + // The segment is empty so is not returned by GetByContract + segment, exists := set.GetByContract(addr) + require.True(t, exists) + require.True(t, segment.IsEmpty()) + require.True(t, set.Finished()) + require.Equal(t, uint64(0), set.TotalBlocks()) + + mockBlockManager := mocks.NewBlockNotifierManager(t) + mockBlockManager.EXPECT().GetCurrentBlockNumber(mock.Anything, aggkittypes.LatestBlock).Return(uint64(150), nil).Once() + err = set.UpdateTargetBlockToNumber(t.Context(), mockBlockManager) + require.NoError(t, err) + require.Equal(t, uint64(50), set.TotalBlocks()) + segment, exists = set.GetByContract(addr) + require.True(t, exists) + require.Equal(t, "From: 101, To: 150 (50)", segment.BlockRange.String()) +} diff --git a/multidownloader/types/storager.go b/multidownloader/types/storager.go index 6e2db6c9c..4f0d8d330 100644 --- a/multidownloader/types/storager.go +++ b/multidownloader/types/storager.go @@ -8,15 +8,33 @@ import ( "github.com/ethereum/go-ethereum/core/types" ) +type FinalizedType = bool + +const ( + NotFinalized FinalizedType = false + Finalized FinalizedType = true +) + type Storager interface { + StoragerForReorg dbtypes.KeyValueStorager // GetSyncedBlockRangePerContract It returns the synced block range stored in DB GetSyncedBlockRangePerContract(tx dbtypes.Querier) (SetSyncSegment, error) - SaveEthLogsWithHeaders(tx dbtypes.Querier, blockHeaders []*aggkittypes.BlockHeader, + SaveEthLogsWithHeaders(tx dbtypes.Querier, blockHeaders aggkittypes.ListBlockHeaders, logs []types.Log, isFinal bool) error GetEthLogs(tx dbtypes.Querier, query LogQuery) ([]types.Log, error) UpdateSyncedStatus(tx dbtypes.Querier, segments []SyncSegment) error UpsertSyncerConfigs(tx dbtypes.Querier, configs []ContractConfig) error GetBlockHeaderByNumber(tx dbtypes.Querier, blockNumber uint64) (*aggkittypes.BlockHeader, bool, error) NewTx(ctx context.Context) (dbtypes.Txer, error) + + GetBlockHeadersNotFinalized(tx dbtypes.Querier, maxBlock uint64) (aggkittypes.ListBlockHeaders, error) + UpdateBlockToFinalized(tx dbtypes.Querier, blockNumbers []uint64) error + GetRangeBlockHeader(tx dbtypes.Querier, isFinal FinalizedType) (lowest *aggkittypes.BlockHeader, + highest *aggkittypes.BlockHeader, err error) +} + +type StoragerForReorg interface { + GetBlockHeaderByNumber(tx dbtypes.Querier, blockNumber uint64) (*aggkittypes.BlockHeader, FinalizedType, error) + InsertReorgAndMoveReorgedBlocksAndLogs(tx dbtypes.Querier, reorgData ReorgData) (uint64, error) } diff --git a/multidownloader/types/sync_segment.go b/multidownloader/types/sync_segment.go index a1d4594d5..af99ff1a8 100644 --- a/multidownloader/types/sync_segment.go +++ b/multidownloader/types/sync_segment.go @@ -11,7 +11,8 @@ import ( // SyncSegment represents a segment of blocks, it is used for synced segments but also // for representing segments to be synced type SyncSegment struct { - ContractAddr common.Address + ContractAddr common.Address + // If FromBlock is 0 means that is empty BlockRange aggkitcommon.BlockRange TargetToBlock aggkittypes.BlockNumberFinality } @@ -60,6 +61,24 @@ func (s *SyncSegment) UpdateToBlock(newToBlock uint64) { s.BlockRange.ToBlock = newToBlock } +func (s *SyncSegment) Empty() { + if s == nil { + return + } + // Set FromBlock greater than ToBlock to indicate empty segment + s.BlockRange = aggkitcommon.NewBlockRange( + s.BlockRange.ToBlock+1, + 0, + ) +} + +func (s *SyncSegment) IsEmpty() bool { + if s == nil { + return true + } + return s.BlockRange.FromBlock > s.BlockRange.ToBlock +} + // Equal checks if two SyncSegments are equal func (s SyncSegment) Equal(other SyncSegment) bool { return s.ContractAddr == other.ContractAddr && diff --git a/test/helpers/e2e.go b/test/helpers/e2e.go index 3fa6c9c03..8d2f0f662 100644 --- a/test/helpers/e2e.go +++ b/test/helpers/e2e.go @@ -167,8 +167,9 @@ func L1Setup(t *testing.T, cfg *EnvironmentConfig) *L1Environment { "testMD", l1EthClient, nil, // RPC client is not simulated - nil, - nil, + nil, // Storage will be created internally + nil, // blockNotifierManager will be created internally + nil, // reorgProcessor will be created internally ) require.NoError(t, err) } else { diff --git a/types/block_header.go b/types/block_header.go index 184107dcc..3dc08a53f 100644 --- a/types/block_header.go +++ b/types/block_header.go @@ -34,6 +34,9 @@ func NewBlockHeaderFromEthHeader(ethHeader *types.Header) *BlockHeader { ethHeader.Time, ðHeader.ParentHash) } +func (gb *BlockHeader) Empty() bool { + return gb == nil || gb.Number == 0 +} func (gb *BlockHeader) String() string { if gb == nil { diff --git a/types/list_block_header.go b/types/list_block_header.go new file mode 100644 index 000000000..081d3ec16 --- /dev/null +++ b/types/list_block_header.go @@ -0,0 +1,31 @@ +package types + +import "sort" + +type ListBlockHeaders []*BlockHeader + +func NewListBlockHeadersEmpty(preAllocatedSize int) ListBlockHeaders { + return ListBlockHeaders(make([]*BlockHeader, 0, preAllocatedSize)) +} +func (lbs ListBlockHeaders) Len() int { + return len(lbs) +} + +func (lbs ListBlockHeaders) ToMap() MapBlockHeaders { + result := NewMapBlockHeadersEmpty(lbs.Len()) + for _, header := range lbs { + result[header.Number] = header + } + return result +} + +func (lbs ListBlockHeaders) BlockNumbers() []uint64 { + result := make([]uint64, 0, len(lbs)) + for _, header := range lbs { + result = append(result, header.Number) + } + sort.Slice(result, func(i, j int) bool { + return result[i] < result[j] + }) + return result +} diff --git a/types/map_block_header.go b/types/map_block_header.go new file mode 100644 index 000000000..76d9529eb --- /dev/null +++ b/types/map_block_header.go @@ -0,0 +1,7 @@ +package types + +type MapBlockHeaders map[uint64]*BlockHeader + +func NewMapBlockHeadersEmpty(preAllocatedSize int) MapBlockHeaders { + return MapBlockHeaders(make(map[uint64]*BlockHeader, preAllocatedSize)) +}