Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,7 @@ func runL1MultiDownloaderIfNeeded(
l1Client, // rpcClient
nil, // storage
nil, // blockNotifierManager
nil, // reorgProcessor
)
if err != nil {
return nil, nil, fmt.Errorf("failed to create L1 MultiDownloader: %w", err)
Expand Down
11 changes: 11 additions & 0 deletions common/block_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,14 @@ func ChunkedRangeQuery[T any](

return all, nil
}

func (b BlockRange) ListBlockNumbers() []uint64 {
if b.IsEmpty() {
return []uint64{}
}
blockNumbers := make([]uint64, 0, b.CountBlocks())
for i := b.FromBlock; i <= b.ToBlock; i++ {
blockNumbers = append(blockNumbers, i)
}
return blockNumbers
}
9 changes: 9 additions & 0 deletions common/block_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,3 +471,12 @@ func TestChunkedRangeQuery_EmptyRange(t *testing.T) {
require.NoError(t, err)
require.Equal(t, empty, result)
}

func TestBlockRange_ListBlockNumbers(t *testing.T) {
bn1 := NewBlockRange(1, 1)
require.Equal(t, []uint64{1}, bn1.ListBlockNumbers())
bn2 := NewBlockRange(3, 5)
require.Equal(t, []uint64{3, 4, 5}, bn2.ListBlockNumbers())
bn3 := NewBlockRange(0, 0)
require.Equal(t, []uint64{}, bn3.ListBlockNumbers())
}
2 changes: 1 addition & 1 deletion common/time_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type TimeTracker struct {

func (t *TimeTracker) String() string {
return "TimeTracker{times=" + strconv.Itoa(int(t.times)) +
"lastDuration=" + t.lastDuration.String() +
", lastDuration=" + t.lastDuration.String() +
", accumulated=" + t.accumulated.String() +
"}"
}
Expand Down
2 changes: 2 additions & 0 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ BlockFinalityForL1InfoTree = "{{AggSender.BlockFinalityForL1InfoTree}}"
MaxParallelBlockHeaderRetrieval = 30
BlockFinality = "FinalizedBlock"
WaitPeriodToCheckCatchUp = "10s"
PeriodToCheckReorgs = "5s"

[L2Multidownloader]
Enabled = false
Expand All @@ -343,4 +344,5 @@ BlockFinalityForL1InfoTree = "{{AggSender.BlockFinalityForL1InfoTree}}"
MaxParallelBlockHeaderRetrieval = 30
BlockFinality = "LatestBlock"
WaitPeriodToCheckCatchUp = "10s"
PeriodToCheckReorgs = "5s"
`
18 changes: 9 additions & 9 deletions etherman/batch_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func RetrieveBlockHeaders(ctx context.Context,
ethClient aggkittypes.BaseEthereumClienter,
rpcClient aggkittypes.RPCClienter,
blockNumbers []uint64,
maxConcurrency int) ([]*aggkittypes.BlockHeader, error) {
maxConcurrency int) (aggkittypes.ListBlockHeaders, error) {
if rpcClient != nil {
return RetrieveBlockHeadersBatch(ctx, log, rpcClient, blockNumbers, maxConcurrency)
}
Expand All @@ -69,11 +69,11 @@ func RetrieveBlockHeadersBatch(ctx context.Context,
log aggkitcommon.Logger,
rpcClient aggkittypes.RPCClienter,
blockNumbers []uint64,
maxConcurrency int) ([]*aggkittypes.BlockHeader, error) {
maxConcurrency int) (aggkittypes.ListBlockHeaders, error) {
return retrieveBlockHeadersInBatchParallel(
ctx,
log,
func(ctx context.Context, blocks []uint64) ([]*aggkittypes.BlockHeader, error) {
func(ctx context.Context, blocks []uint64) (aggkittypes.ListBlockHeaders, error) {
return retrieveBlockHeadersInBatch(ctx, log, rpcClient, blocks)
}, blockNumbers, batchRequestLimitHTTP, maxConcurrency)
}
Expand All @@ -88,8 +88,8 @@ func RetrieveBlockHeadersLegacy(ctx context.Context,
return retrieveBlockHeadersInBatchParallel(
ctx,
log,
func(ctx context.Context, blocks []uint64) ([]*aggkittypes.BlockHeader, error) {
result := make([]*aggkittypes.BlockHeader, len(blocks))
func(ctx context.Context, blocks []uint64) (aggkittypes.ListBlockHeaders, error) {
result := aggkittypes.NewListBlockHeadersEmpty(len(blocks))
for i, blockNumber := range blocks {
header, err := ethClient.HeaderByNumber(ctx, big.NewInt(int64(blockNumber)))
if err != nil {
Expand All @@ -107,9 +107,9 @@ func retrieveBlockHeadersInBatch(ctx context.Context,
log aggkitcommon.Logger,
rpcClient aggkittypes.RPCClienter,
blockNumbers []uint64,
) ([]*aggkittypes.BlockHeader, error) {
) (aggkittypes.ListBlockHeaders, error) {
if len(blockNumbers) == 0 {
return make([]*aggkittypes.BlockHeader, 0), nil
return aggkittypes.NewListBlockHeadersEmpty(0), nil
}
headers := make([]*blockRawEth, len(blockNumbers))
timeTracker := aggkitcommon.NewTimeTracker()
Expand Down Expand Up @@ -146,9 +146,9 @@ func retrieveBlockHeadersInBatch(ctx context.Context,
func retrieveBlockHeadersInBatchParallel(
ctx context.Context,
logger aggkitcommon.Logger,
funcRetrieval func(context.Context, []uint64) ([]*aggkittypes.BlockHeader, error),
funcRetrieval func(context.Context, []uint64) (aggkittypes.ListBlockHeaders, error),
blockNumbers []uint64,
chunckSize, maxConcurrency int) ([]*aggkittypes.BlockHeader, error) {
chunckSize, maxConcurrency int) (aggkittypes.ListBlockHeaders, error) {
var mu sync.Mutex
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(maxConcurrency)
Expand Down
2 changes: 1 addition & 1 deletion etherman/batch_requests_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func TestRetrieveBlockHeadersInBatchParallel(t *testing.T) {
result, err := retrieveBlockHeadersInBatchParallel(
ctx,
logger,
func(ctx context.Context, blocks []uint64) ([]*aggkittypes.BlockHeader, error) {
func(ctx context.Context, blocks []uint64) (aggkittypes.ListBlockHeaders, error) {
t.Logf("Retrieving blocks in batch: %v", blocks)
headers := make([]*aggkittypes.BlockHeader, len(blocks))
for i, bn := range blocks {
Expand Down
4 changes: 2 additions & 2 deletions etherman/block_notifier/block_notifier_polling.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (b *BlockNotifierPolling) step(ctx context.Context,
BlockFinalityType: b.config.BlockFinalityType,
}
if previousState.lastBlockSeen > currentBlock {
b.logger.Warnf("Block number decreased [finality:%s]: %d -> %d",
b.logger.Infof("Block number decreased [finality:%s]: %d -> %d",
b.config.BlockFinalityType.String(), previousState.lastBlockSeen, currentBlock)
// It start from scratch because something fails in calculation of block period
newState := previousState.initialBlock(currentBlock)
Expand All @@ -170,7 +170,7 @@ func (b *BlockNotifierPolling) step(ctx context.Context,

if currentBlock-previousState.lastBlockSeen != 1 {
if !b.config.BlockFinalityType.IsSafe() && !b.config.BlockFinalityType.IsFinalized() {
b.logger.Warnf("Missed block(s) [finality:%s]: %d -> %d",
b.logger.Infof("Missed block(s) [finality:%s]: %d -> %d",
b.config.BlockFinalityType.String(), previousState.lastBlockSeen, currentBlock)
}

Expand Down
5 changes: 3 additions & 2 deletions etherman/block_notifier/block_notifier_polling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
)

func TestExploratoryBlockNotifierPolling(t *testing.T) {
t.Skip()
t.Skip("is an exploratory test that requires an external RPC")
urlRPCL1 := os.Getenv("L1URL")
fmt.Println("URL=", urlRPCL1)
cfg := &ethermanconfig.RPCClientConfig{
Expand Down Expand Up @@ -117,7 +117,8 @@ func TestBlockNotifierPollingStep(t *testing.T) {
},
mockLoggerFn: func() aggkitcommon.Logger {
mockLogger := commonmocks.NewLogger(t)
mockLogger.EXPECT().Warnf("Missed block(s) [finality:%s]: %d -> %d", aggkittypes.LatestBlock.String(), uint64(100), uint64(105)).Once()
mockLogger.EXPECT().Infof("Missed block(s) [finality:%s]: %d -> %d", aggkittypes.LatestBlock.String(), uint64(100), uint64(105)).Once()
mockLogger.EXPECT().Infof(mock.Anything, mock.Anything).Maybe()
return mockLogger
},
headerByNumberError: false,
Expand Down
11 changes: 7 additions & 4 deletions l1infotreesync/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func TestE2E(t *testing.T) {
nil, // rpcClient
nil,
nil,
nil, // reorgProcessor will be created internally
)
require.NoError(t, err)
} else {
Expand Down Expand Up @@ -175,8 +176,9 @@ func TestWithReorgs(t *testing.T) {
"testMD",
etherman.NewDefaultEthClient(client.Client(), nil, nil),
nil, // rpcClient
nil,
nil,
nil, // Storage will be created internally
nil, // blockNotifierManager will be created internally
nil, // reorgProcessor will be created internally
)
require.NoError(t, err)
} else {
Expand Down Expand Up @@ -328,8 +330,9 @@ func TestStressAndReorgs(t *testing.T) {
"testMD",
etherman.NewDefaultEthClient(client.Client(), nil, nil),
nil, // rpcClient
nil,
nil,
nil, // Storage will be created internally
nil, // blockNotifierManager will be created internally
nil, // reorgProcessor will be created internally
)
require.NoError(t, err)
} else {
Expand Down
11 changes: 9 additions & 2 deletions multidownloader/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,16 @@ type Config struct {
BlockFinality aggkittypes.BlockNumberFinality
// WaitPeriodToCheckCatchUp is the duration to wait before checking again if logs are not yet available
WaitPeriodToCheckCatchUp types.Duration
// PeriodToCheckReorgs is the duration to wait before checking for reorgs
// If is 0 reorgs are checked only when a new block appears
PeriodToCheckReorgs types.Duration
}

const (
defaultBlockChunkSize = 10000
defaultMaxParallelBlockHeaderRetrieval = 30
defaultWaitPeriodToCheckCatchUp = time.Second * 10
defaultPeriodToCheckReorgs = time.Second * 5
)

func NewConfigDefault(name string, basePathDB string) Config {
Expand All @@ -54,6 +58,7 @@ func NewConfigDefault(name string, basePathDB string) Config {
MaxParallelBlockHeaderRetrieval: defaultMaxParallelBlockHeaderRetrieval,
BlockFinality: aggkittypes.FinalizedBlock,
WaitPeriodToCheckCatchUp: types.NewDuration(defaultWaitPeriodToCheckCatchUp),
PeriodToCheckReorgs: types.NewDuration(defaultPeriodToCheckReorgs),
}
}

Expand All @@ -75,10 +80,12 @@ func (cfg *Config) Validate() error {

func (cfg *Config) String() string {
return fmt.Sprintf("MultidownloaderConfig{Enabled:%t, BlockChunkSize:%d, "+
"MaxParallelBlockHeaderRetrieval:%d, BlockFinality:%s, WaitPeriodToCheckCatchUp:%s}",
"MaxParallelBlockHeaderRetrieval:%d, BlockFinality:%s, WaitPeriodToCheckCatchUp:%s, "+
"PeriodToCheckReorgs:%s}",
cfg.Enabled,
cfg.BlockChunkSize,
cfg.MaxParallelBlockHeaderRetrieval,
cfg.BlockFinality.String(),
cfg.WaitPeriodToCheckCatchUp.String())
cfg.WaitPeriodToCheckCatchUp.String(),
cfg.PeriodToCheckReorgs.String())
}
10 changes: 6 additions & 4 deletions multidownloader/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package multidownloader

import (
"testing"
"time"

"github.com/agglayer/aggkit/config/types"
aggkittypes "github.com/agglayer/aggkit/types"
Expand All @@ -13,10 +12,12 @@ func TestNewConfigDefault(t *testing.T) {
cfg := NewConfigDefault("l1", "/tmp/aggkit/")
require.Equal(t, false, cfg.Enabled)
require.Equal(t, "/tmp/aggkit/l1_multidownloader.sqlite", cfg.StoragePath)
require.Equal(t, uint32(10000), cfg.BlockChunkSize, "BlockChunkSize should be 10000")
require.Equal(t, 30, cfg.MaxParallelBlockHeaderRetrieval, "MaxParallelBlockHeaderRetrieval should be 30")
require.Equal(t, uint32(defaultBlockChunkSize), cfg.BlockChunkSize, "BlockChunkSize should be 10000")
require.Equal(t, defaultMaxParallelBlockHeaderRetrieval, cfg.MaxParallelBlockHeaderRetrieval, "MaxParallelBlockHeaderRetrieval should be 30")
require.Equal(t, aggkittypes.FinalizedBlock, cfg.BlockFinality, "BlockFinality should be FinalizedBlock")
require.Equal(t, types.NewDuration(time.Second*10), cfg.WaitPeriodToCheckCatchUp, "WaitPeriodToCheckCatchUp should be 10 seconds")
require.Equal(t, types.NewDuration(defaultWaitPeriodToCheckCatchUp), cfg.WaitPeriodToCheckCatchUp, "WaitPeriodToCheckCatchUp should be 10 seconds")
require.Equal(t, types.NewDuration(defaultPeriodToCheckReorgs), cfg.PeriodToCheckReorgs, "PeriodToCheckReorgs should be 5 seconds")

require.False(t, cfg.Enabled, "Enabled should be false by default")
}

Expand Down Expand Up @@ -102,5 +103,6 @@ func TestConfig_String(t *testing.T) {
require.Contains(t, str, "MaxParallelBlockHeaderRetrieval", "String() should contain MaxParallelBlockHeaderRetrieval")
require.Contains(t, str, "BlockFinality", "String() should contain BlockFinality")
require.Contains(t, str, "WaitPeriodToCheckCatchUp", "String() should contain WaitPeriodToCheckCatchUp")
require.Contains(t, str, "PeriodToCheckReorgs", "String() should contain PeriodToCheckReorgs")
require.Contains(t, str, "Enabled", "String() should contain Enabled")
}
Loading
Loading