From 66a7abf5295731e698186582c59984f652a7dc18 Mon Sep 17 00:00:00 2001 From: KirillPamPam Date: Mon, 9 Mar 2026 14:16:54 +0400 Subject: [PATCH 1/8] Refactor solana head processing --- internal/config/defaults.go | 12 +- internal/config/upstream_config.go | 14 ++ internal/protocol/blocks.go | 35 +++- internal/protocol/blocks_test.go | 21 ++- internal/protocol/data_test.go | 5 +- .../upstreams/blocks/block_processor_test.go | 9 +- internal/upstreams/blocks/head.go | 69 ++++--- internal/upstreams/blocks/head_test.go | 47 ++++- .../chains_specific/aztec_chain_specific.go | 8 +- .../chains_specific/evm_chain_specific.go | 19 +- .../evm_chain_specific_test.go | 35 ++-- .../chains_specific/solana_chain_specific.go | 175 +++++++++--------- .../solana_chain_specific_test.go | 140 +++++++++----- internal/upstreams/upstream.go | 38 +++- internal/upstreams/upstream_test.go | 36 ++-- pkg/blockchain/hash_id.go | 58 ++++++ pkg/blockchain/hash_id_test.go | 25 +++ pkg/test_utils/test_helpers.go | 2 +- 18 files changed, 504 insertions(+), 244 deletions(-) create mode 100644 pkg/blockchain/hash_id.go create mode 100644 pkg/blockchain/hash_id_test.go diff --git a/internal/config/defaults.go b/internal/config/defaults.go index 8da240a..2051fee 100644 --- a/internal/config/defaults.go +++ b/internal/config/defaults.go @@ -276,16 +276,8 @@ func (u *Upstream) setDefaults(defaults *ChainDefaults) { } } if u.HeadConnector == "" && len(u.Connectors) > 0 { - filteredConnectors := lo.Filter(u.Connectors, func(item *ApiConnectorConfig, index int) bool { - _, ok := connectorTypesRating[item.Type] - return ok - }) - - if len(filteredConnectors) > 0 { - defaultHeadConnectorType := lo.MinBy(filteredConnectors, func(a *ApiConnectorConfig, b *ApiConnectorConfig) bool { - return connectorTypesRating[a.Type] < connectorTypesRating[b.Type] - }).Type - u.HeadConnector = defaultHeadConnectorType + if headConnector := u.GetBestConnector(); headConnector != "" { + u.HeadConnector = headConnector } } if u.RateLimitAutoTune != nil { diff --git a/internal/config/upstream_config.go b/internal/config/upstream_config.go index 3c4e9d4..9d28276 100644 --- a/internal/config/upstream_config.go +++ b/internal/config/upstream_config.go @@ -39,6 +39,20 @@ type Upstream struct { RateLimitAutoTune *RateLimitAutoTuneConfig `yaml:"rate-limit-auto-tune"` } +func (u *Upstream) GetBestConnector() ApiConnectorType { + filteredConnectors := lo.Filter(u.Connectors, func(item *ApiConnectorConfig, index int) bool { + _, ok := connectorTypesRating[item.Type] + return ok + }) + + if len(filteredConnectors) > 0 { + return lo.MinBy(filteredConnectors, func(a *ApiConnectorConfig, b *ApiConnectorConfig) bool { + return connectorTypesRating[a.Type] < connectorTypesRating[b.Type] + }).Type + } + return "" +} + type UpstreamOptions struct { InternalTimeout time.Duration `yaml:"internal-timeout"` ValidationInterval time.Duration `yaml:"validation-interval"` diff --git a/internal/protocol/blocks.go b/internal/protocol/blocks.go index a824d4a..51b93a2 100644 --- a/internal/protocol/blocks.go +++ b/internal/protocol/blocks.go @@ -1,33 +1,48 @@ package protocol +import "github.com/drpcorg/nodecore/pkg/blockchain" + type Block struct { BlockData *BlockData } type BlockData struct { - Height uint64 - Slot uint64 - Hash string + Height uint64 + Slot uint64 + Hash blockchain.HashId + ParentHash blockchain.HashId } func (b *BlockData) IsEmpty() bool { - return b.Height == 0 && b.Slot == 0 && b.Hash == "" + return b.Height == 0 && b.Slot == 0 && len(b.Hash) == 0 && len(b.ParentHash) == 0 } func NewBlockDataWithHeight(height uint64) *BlockData { return &BlockData{Height: height} } -func NewBlockData(height, slot uint64, hash string) *BlockData { - return &BlockData{Height: height, Slot: slot, Hash: hash} +func NewBlockData(height, slot uint64, hash, parentHash blockchain.HashId) *BlockData { + return &BlockData{ + Height: height, + Slot: slot, + Hash: hash, + ParentHash: parentHash, + } } -func NewBlock(height, slot uint64, hash string) *Block { +func NewBlock(height, slot uint64, hash, parentHash blockchain.HashId) *Block { return &Block{ BlockData: &BlockData{ - Height: height, - Slot: slot, - Hash: hash, + Height: height, + Slot: slot, + Hash: hash, + ParentHash: parentHash, }, } } + +func NewBlockWithHeights(height, slot uint64) *Block { + return &Block{ + BlockData: &BlockData{Height: height, Slot: slot}, + } +} diff --git a/internal/protocol/blocks_test.go b/internal/protocol/blocks_test.go index e0eee16..92c4172 100644 --- a/internal/protocol/blocks_test.go +++ b/internal/protocol/blocks_test.go @@ -4,13 +4,24 @@ import ( "testing" "github.com/drpcorg/nodecore/internal/protocol" + "github.com/drpcorg/nodecore/pkg/blockchain" "github.com/stretchr/testify/assert" ) func TestBlockCreation(t *testing.T) { - block := protocol.NewBlock(uint64(15), uint64(55), "hash") + block := protocol.NewBlock( + uint64(15), + uint64(55), + blockchain.NewHashIdFromString("0x2345"), + blockchain.NewHashIdFromString("0x1111"), + ) expectedBlock := protocol.Block{ - BlockData: &protocol.BlockData{Height: uint64(15), Slot: uint64(55), Hash: "hash"}, + BlockData: &protocol.BlockData{ + Height: uint64(15), + Slot: uint64(55), + Hash: blockchain.NewHashIdFromString("0x2345"), + ParentHash: blockchain.NewHashIdFromString("0x1111"), + }, } assert.Equal(t, &expectedBlock, block) @@ -47,7 +58,7 @@ func TestBlockData(t *testing.T) { }, { name: "data with hash", - blockData: protocol.BlockData{Hash: "hash"}, + blockData: protocol.BlockData{Hash: blockchain.NewHashIdFromString("0x2345")}, expected: false, }, { @@ -57,12 +68,12 @@ func TestBlockData(t *testing.T) { }, { name: "data with height and hash", - blockData: protocol.BlockData{Height: uint64(12), Hash: "hash"}, + blockData: protocol.BlockData{Height: uint64(12), Hash: blockchain.NewHashIdFromString("0x2345")}, expected: false, }, { name: "data with slot and hash", - blockData: protocol.BlockData{Slot: uint64(12), Hash: "hash"}, + blockData: protocol.BlockData{Slot: uint64(12), Hash: blockchain.NewHashIdFromString("0x2345")}, expected: false, }, } diff --git a/internal/protocol/data_test.go b/internal/protocol/data_test.go index b2f808c..c823d70 100644 --- a/internal/protocol/data_test.go +++ b/internal/protocol/data_test.go @@ -6,6 +6,7 @@ import ( mapset "github.com/deckarep/golang-set/v2" "github.com/drpcorg/nodecore/internal/protocol" + "github.com/drpcorg/nodecore/pkg/blockchain" "github.com/drpcorg/nodecore/pkg/test_utils/mocks" "github.com/stretchr/testify/assert" ) @@ -114,8 +115,8 @@ func TestDefaultUpstreamState(t *testing.T) { func TestBlockInfo(t *testing.T) { blockInfo := protocol.NewBlockInfo() - blockData := &protocol.BlockData{Height: uint64(75), Hash: "hash"} - blockData2 := &protocol.BlockData{Height: uint64(86), Hash: "hash1", Slot: uint64(25)} + blockData := &protocol.BlockData{Height: uint64(75), Hash: blockchain.NewHashIdFromString("0x2345")} + blockData2 := &protocol.BlockData{Height: uint64(86), Hash: blockchain.NewHashIdFromString("0x2345"), Slot: uint64(25)} assert.NotNil(t, blockInfo) blockInfo.AddBlock(blockData, protocol.FinalizedBlock) diff --git a/internal/upstreams/blocks/block_processor_test.go b/internal/upstreams/blocks/block_processor_test.go index 7b1d60c..54da3b0 100644 --- a/internal/upstreams/blocks/block_processor_test.go +++ b/internal/upstreams/blocks/block_processor_test.go @@ -9,6 +9,7 @@ import ( "github.com/drpcorg/nodecore/internal/protocol" "github.com/drpcorg/nodecore/internal/upstreams/blocks" specific "github.com/drpcorg/nodecore/internal/upstreams/chains_specific" + "github.com/drpcorg/nodecore/pkg/blockchain" "github.com/drpcorg/nodecore/pkg/test_utils/mocks" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -22,7 +23,8 @@ func TestEthLikeBlockProcessorGetFinalizedBlock(t *testing.T) { "jsonrpc": "2.0", "result": { "number": "0x41fd60b", - "hash": "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18" + "hash": "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18", + "parentHash": "0x1eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d11" } }`) response := protocol.NewHttpUpstreamResponse("1", body, 200, protocol.JsonRpc) @@ -37,8 +39,9 @@ func TestEthLikeBlockProcessorGetFinalizedBlock(t *testing.T) { expected := blocks.BlockEvent{ BlockData: &protocol.BlockData{ - Height: uint64(69195275), - Hash: "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18", + Height: uint64(69195275), + Hash: blockchain.NewHashIdFromString("0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18"), + ParentHash: blockchain.NewHashIdFromString("0x1eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d11"), }, BlockType: protocol.FinalizedBlock, } diff --git a/internal/upstreams/blocks/head.go b/internal/upstreams/blocks/head.go index b972745..b93a86e 100644 --- a/internal/upstreams/blocks/head.go +++ b/internal/upstreams/blocks/head.go @@ -34,11 +34,12 @@ var _ utils.Lifecycle = (*HeadProcessor)(nil) func NewHeadProcessor( ctx context.Context, upConfig *config.Upstream, - apiConnector connectors.ApiConnector, + requestConnector, + headConnector connectors.ApiConnector, specific specific.ChainSpecific, ) *HeadProcessor { configuredChain := chains.GetChain(upConfig.ChainName) - head := createHead(ctx, upConfig.Id, upConfig.PollInterval, apiConnector, specific, upConfig.Options) + head := createHead(ctx, upConfig.Id, upConfig.PollInterval, requestConnector, headConnector, specific, upConfig.Options) headNoUpdatesTimeout := 1 * time.Minute switch head.(type) { @@ -115,23 +116,24 @@ func (h *HeadProcessor) Stop() { } func (h *HeadProcessor) UpdateHead(height, slot uint64) { - h.manualHeadChan <- protocol.NewBlock(height, slot, "") + h.manualHeadChan <- protocol.NewBlockWithHeights(height, slot) } func createHead( ctx context.Context, id string, pollInterval time.Duration, - apiConnector connectors.ApiConnector, + requestConnector, + headConnector connectors.ApiConnector, specific specific.ChainSpecific, options *config.UpstreamOptions, ) Head { - switch apiConnector.GetType() { + switch headConnector.GetType() { case protocol.JsonRpcConnector, protocol.RestConnector: log.Info().Msgf("starting an rpc head of upstream %s with poll interval %s", id, pollInterval) - return newRpcHead(ctx, id, apiConnector, specific, pollInterval, options) + return newRpcHead(ctx, id, headConnector, specific, pollInterval, options) case protocol.WsConnector: log.Info().Msgf("starting a subscription head of upstream %s", id) - return newWsHead(ctx, id, apiConnector, specific) + return newWsHead(ctx, id, requestConnector, headConnector, specific) default: return nil } @@ -228,7 +230,7 @@ func (r *RpcHead) poll() { ctx, cancel := context.WithTimeout(r.lifecycle.GetParentContext(), r.internalTimeout) defer cancel() - block, err := r.chainSpecific.GetLatestBlock(ctx, r.connector) + block, err := r.chainSpecific.GetLatestBlock(ctx, r.connector, r.upstreamId) if err != nil { log.Warn().Err(err).Msgf("couldn't get the latest block of upstream %s", r.upstreamId) } else { @@ -239,12 +241,13 @@ func (r *RpcHead) poll() { } type SubscriptionHead struct { - lifecycle *utils.BaseLifecycle - block *utils.Atomic[protocol.Block] - chainSpecific specific.ChainSpecific - connector connectors.ApiConnector - upstreamId string - headsChan chan *protocol.Block + lifecycle *utils.BaseLifecycle + block *utils.Atomic[protocol.Block] + chainSpecific specific.ChainSpecific + requestConnector connectors.ApiConnector + headConnector connectors.ApiConnector + upstreamId string + headsChan chan *protocol.Block } func (w *SubscriptionHead) Running() bool { @@ -269,13 +272,16 @@ func (w *SubscriptionHead) GetCurrentBlock() *protocol.Block { func (w *SubscriptionHead) Start() { w.lifecycle.Start(func(ctx context.Context) error { + // get the latest block in order not to wait for the sub event + w.getLatestBlock() + subReq, err := w.chainSpecific.SubscribeHeadRequest() if err != nil { log.Warn().Err(err).Msgf("couldn't create a subscription request to upstream %s", w.upstreamId) return err } - subResponse, err := w.connector.Subscribe(ctx, subReq) + subResponse, err := w.headConnector.Subscribe(ctx, subReq) if err != nil { log.Warn().Err(err).Msgf("couldn't subscribe to upstream %s heads", w.upstreamId) return err @@ -291,7 +297,7 @@ func (w *SubscriptionHead) Start() { return nil } if message.Type == protocol.Ws { - block, err := w.chainSpecific.ParseSubscriptionBlock(message.Message) + block, err := w.chainSpecific.ParseSubscriptionBlock(message.Message, w.requestConnector, w.upstreamId) if err != nil { log.Warn().Err(err).Msgf("couldn't parse a message from heads subscription of upstream %s", w.upstreamId) return nil @@ -316,14 +322,31 @@ func (w *SubscriptionHead) OnNoHeadUpdates() { go w.Start() } -func newWsHead(ctx context.Context, upstreamId string, connector connectors.ApiConnector, chainSpecific specific.ChainSpecific) *SubscriptionHead { +func (w *SubscriptionHead) getLatestBlock() { + block, err := w.chainSpecific.GetLatestBlock(w.lifecycle.GetParentContext(), w.requestConnector, w.upstreamId) + if err != nil { + log.Warn().Err(err).Msgf("couldn't get the latest block of upstream %s", w.upstreamId) + return + } + w.block.Store(*block) + w.headsChan <- block +} + +func newWsHead( + ctx context.Context, + upstreamId string, + requestConnector, + headConnector connectors.ApiConnector, + chainSpecific specific.ChainSpecific, +) *SubscriptionHead { head := SubscriptionHead{ - lifecycle: utils.NewBaseLifecycle(fmt.Sprintf("%s_subscription_head", upstreamId), ctx), - upstreamId: upstreamId, - chainSpecific: chainSpecific, - connector: connector, - block: utils.NewAtomic[protocol.Block](), - headsChan: make(chan *protocol.Block), + lifecycle: utils.NewBaseLifecycle(fmt.Sprintf("%s_subscription_head", upstreamId), ctx), + upstreamId: upstreamId, + chainSpecific: chainSpecific, + requestConnector: requestConnector, + headConnector: headConnector, + block: utils.NewAtomic[protocol.Block](), + headsChan: make(chan *protocol.Block), } return &head diff --git a/internal/upstreams/blocks/head_test.go b/internal/upstreams/blocks/head_test.go index 02eb621..3852aa5 100644 --- a/internal/upstreams/blocks/head_test.go +++ b/internal/upstreams/blocks/head_test.go @@ -9,6 +9,7 @@ import ( "github.com/drpcorg/nodecore/internal/protocol" "github.com/drpcorg/nodecore/internal/upstreams/blocks" specific "github.com/drpcorg/nodecore/internal/upstreams/chains_specific" + "github.com/drpcorg/nodecore/pkg/blockchain" "github.com/drpcorg/nodecore/pkg/test_utils/mocks" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -23,7 +24,8 @@ func TestRpcHead(t *testing.T) { "jsonrpc": "2.0", "result": { "number": "0x41fd60b", - "hash": "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18" + "hash": "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18", + "parentHash": "0x1eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d11" } }`) response := protocol.NewHttpUpstreamResponse("1", body, 200, protocol.JsonRpc) @@ -35,15 +37,16 @@ func TestRpcHead(t *testing.T) { PollInterval: 10 * time.Millisecond, Options: &config.UpstreamOptions{InternalTimeout: 5 * time.Second}, } - headProcessor := blocks.NewHeadProcessor(ctx, &upConfig, connector, specific.EvmChainSpecific) + headProcessor := blocks.NewHeadProcessor(ctx, &upConfig, connector, connector, specific.EvmChainSpecific) go headProcessor.Start() sub := headProcessor.Subscribe("test") event, ok := <-sub.Events expected := &protocol.BlockData{ - Height: uint64(69195275), - Hash: "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18", + Height: uint64(69195275), + Hash: blockchain.NewHashIdFromString("0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18"), + ParentHash: blockchain.NewHashIdFromString("0x1eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d11"), } connector.AssertExpectations(t) @@ -77,6 +80,18 @@ func TestWsHead(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + reqConnector := mocks.NewConnectorMock() + bodyLastBlock := []byte(`{ + "jsonrpc": "2.0", + "result": { + "number": "0x41FD60A", + "hash": "0x2eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d12", + "parentHash": "0x3eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d13" + } + }`) + responseLastBlock := protocol.NewHttpUpstreamResponse("1", bodyLastBlock, 200, protocol.JsonRpc) + reqConnector.On("SendRequest", mock.Anything, mock.Anything).Return(responseLastBlock) + connector := mocks.NewWsConnectorMock() body := []byte(`{ "jsonrpc": "2.0", @@ -84,7 +99,8 @@ func TestWsHead(t *testing.T) { "params": { "result": { "number": "0x41fd60b", - "hash": "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18" + "hash": "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18", + "parentHash": "0x1eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d11" }, "subscription": "0x89d9f8cd1e113f4b65c1e22f3847d3672cf5761f" } @@ -99,18 +115,28 @@ func TestWsHead(t *testing.T) { Id: "id", PollInterval: 10 * time.Millisecond, } - headProcessor := blocks.NewHeadProcessor(ctx, &upConfig, connector, specific.EvmChainSpecific) + headProcessor := blocks.NewHeadProcessor(ctx, &upConfig, reqConnector, connector, specific.EvmChainSpecific) go headProcessor.Start() sub := headProcessor.Subscribe("test") event, ok := <-sub.Events expected := &protocol.BlockData{ - Height: uint64(69195275), - Hash: "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18", + Height: uint64(69195274), + Hash: blockchain.NewHashIdFromString("0x2eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d12"), + ParentHash: blockchain.NewHashIdFromString("0x3eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d13"), + } + assert.True(t, ok) + assert.Equal(t, expected, event.HeadData) + assert.Equal(t, expected, headProcessor.GetCurrentBlock().BlockData) + + event, ok = <-sub.Events + expected = &protocol.BlockData{ + Height: uint64(69195275), + Hash: blockchain.NewHashIdFromString("0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18"), + ParentHash: blockchain.NewHashIdFromString("0x1eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d11"), } - connector.AssertExpectations(t) assert.True(t, ok) assert.Equal(t, expected, event.HeadData) assert.Equal(t, expected, headProcessor.GetCurrentBlock().BlockData) @@ -135,4 +161,7 @@ func TestWsHead(t *testing.T) { _, ok = <-sub.Events assert.False(t, ok) + + connector.AssertExpectations(t) + reqConnector.AssertExpectations(t) } diff --git a/internal/upstreams/chains_specific/aztec_chain_specific.go b/internal/upstreams/chains_specific/aztec_chain_specific.go index c50a917..0d9c59d 100644 --- a/internal/upstreams/chains_specific/aztec_chain_specific.go +++ b/internal/upstreams/chains_specific/aztec_chain_specific.go @@ -32,7 +32,7 @@ func (a *AztecChainSpecificObject) SettingsValidators( return nil } -func (a *AztecChainSpecificObject) GetLatestBlock(ctx context.Context, connector connectors.ApiConnector) (*protocol.Block, error) { +func (a *AztecChainSpecificObject) GetLatestBlock(ctx context.Context, connector connectors.ApiConnector, _ string) (*protocol.Block, error) { request, err := protocol.NewInternalUpstreamJsonRpcRequest("node_getBlock", []interface{}{"latest"}) if err != nil { return nil, err @@ -46,7 +46,7 @@ func (a *AztecChainSpecificObject) GetLatestBlock(ctx context.Context, connector return a.ParseBlock(response.ResponseResult()) } -func (a *AztecChainSpecificObject) GetFinalizedBlock(ctx context.Context, connector connectors.ApiConnector) (*protocol.Block, error) { +func (a *AztecChainSpecificObject) GetFinalizedBlock(_ context.Context, _ connectors.ApiConnector) (*protocol.Block, error) { return nil, nil } @@ -62,10 +62,10 @@ func (a *AztecChainSpecificObject) ParseBlock(blockBytes []byte) (*protocol.Bloc return nil, fmt.Errorf("couldn't parse the aztec block, got '%s'", string(blockBytes)) } - return protocol.NewBlock(height, 0, block.BlockHash), nil + return nil, nil } -func (a *AztecChainSpecificObject) ParseSubscriptionBlock(blockBytes []byte) (*protocol.Block, error) { +func (a *AztecChainSpecificObject) ParseSubscriptionBlock(_ []byte, _ connectors.ApiConnector, _ string) (*protocol.Block, error) { return nil, fmt.Errorf("aztec does not support websocket subscriptions") } diff --git a/internal/upstreams/chains_specific/evm_chain_specific.go b/internal/upstreams/chains_specific/evm_chain_specific.go index 95d7160..5222b34 100644 --- a/internal/upstreams/chains_specific/evm_chain_specific.go +++ b/internal/upstreams/chains_specific/evm_chain_specific.go @@ -9,16 +9,19 @@ import ( "github.com/drpcorg/nodecore/internal/protocol" "github.com/drpcorg/nodecore/internal/upstreams/connectors" "github.com/drpcorg/nodecore/internal/upstreams/validations" + "github.com/drpcorg/nodecore/pkg/blockchain" "github.com/drpcorg/nodecore/pkg/chains" "github.com/ethereum/go-ethereum/rpc" ) type ChainSpecific interface { - GetLatestBlock(context.Context, connectors.ApiConnector) (*protocol.Block, error) + GetLatestBlock(ctx context.Context, connector connectors.ApiConnector, upstreamId string) (*protocol.Block, error) GetFinalizedBlock(context.Context, connectors.ApiConnector) (*protocol.Block, error) + ParseBlock([]byte) (*protocol.Block, error) + ParseSubscriptionBlock(data []byte, connector connectors.ApiConnector, upstreamId string) (*protocol.Block, error) + SubscribeHeadRequest() (protocol.RequestHolder, error) - ParseSubscriptionBlock([]byte) (*protocol.Block, error) SettingsValidators(upstreamId string, connector connectors.ApiConnector, chain *chains.ConfiguredChain, options *config.UpstreamOptions) []validations.SettingsValidator } @@ -48,7 +51,7 @@ func (e *EvmChainSpecificObject) SettingsValidators( var _ ChainSpecific = (*EvmChainSpecificObject)(nil) -func (e *EvmChainSpecificObject) GetLatestBlock(ctx context.Context, connector connectors.ApiConnector) (*protocol.Block, error) { +func (e *EvmChainSpecificObject) GetLatestBlock(ctx context.Context, connector connectors.ApiConnector, _ string) (*protocol.Block, error) { return e.getBlockByTag(ctx, connector, rpc.LatestBlockNumber) } @@ -56,7 +59,7 @@ func (e *EvmChainSpecificObject) GetFinalizedBlock(ctx context.Context, connecto return e.getBlockByTag(ctx, connector, rpc.FinalizedBlockNumber) } -func (e *EvmChainSpecificObject) ParseSubscriptionBlock(blockBytes []byte) (*protocol.Block, error) { +func (e *EvmChainSpecificObject) ParseSubscriptionBlock(blockBytes []byte, _ connectors.ApiConnector, _ string) (*protocol.Block, error) { return e.ParseBlock(blockBytes) } @@ -70,7 +73,12 @@ func (e *EvmChainSpecificObject) ParseBlock(blockBytes []byte) (*protocol.Block, return nil, fmt.Errorf("couldn't parse the evm block, got '%s'", string(blockBytes)) } - return protocol.NewBlock(uint64(evmBlock.Height.Int64()), 0, evmBlock.Hash), nil + return protocol.NewBlock( + uint64(evmBlock.Height.Int64()), + 0, + blockchain.NewHashIdFromString(evmBlock.Hash), + blockchain.NewHashIdFromString(evmBlock.Parent), + ), nil } func (e *EvmChainSpecificObject) SubscribeHeadRequest() (protocol.RequestHolder, error) { @@ -97,5 +105,6 @@ func (e *EvmChainSpecificObject) getBlockByTag(ctx context.Context, connector co type EvmBlock struct { Hash string `json:"hash"` + Parent string `json:"parentHash"` Height *rpc.BlockNumber `json:"number"` } diff --git a/internal/upstreams/chains_specific/evm_chain_specific_test.go b/internal/upstreams/chains_specific/evm_chain_specific_test.go index cc5d4d5..2f529c4 100644 --- a/internal/upstreams/chains_specific/evm_chain_specific_test.go +++ b/internal/upstreams/chains_specific/evm_chain_specific_test.go @@ -9,6 +9,7 @@ import ( "github.com/drpcorg/nodecore/internal/protocol" specific "github.com/drpcorg/nodecore/internal/upstreams/chains_specific" "github.com/drpcorg/nodecore/internal/upstreams/validations" + "github.com/drpcorg/nodecore/pkg/blockchain" "github.com/drpcorg/nodecore/pkg/chains" "github.com/drpcorg/nodecore/pkg/test_utils/mocks" "github.com/samber/lo" @@ -58,14 +59,20 @@ func TestEvmSubscribeHeadRequest(t *testing.T) { func TestEvmParseSubBLock(t *testing.T) { body := []byte(`{ "number": "0x41fd60b", - "hash": "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18" + "hash": "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18", + "parentHash": "0x1eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d11" }`) - block, err := specific.EvmChainSpecific.ParseSubscriptionBlock(body) - + block, err := specific.EvmChainSpecific.ParseSubscriptionBlock(body, nil, "") assert.Nil(t, err) - assert.Equal(t, uint64(69195275), block.BlockData.Height) - assert.Equal(t, "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18", block.BlockData.Hash) + + expected := &protocol.BlockData{ + Height: uint64(69195275), + Hash: blockchain.NewHashIdFromString("0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18"), + ParentHash: blockchain.NewHashIdFromString("0x1eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d11"), + } + + assert.Equal(t, expected, block.BlockData) } func TestEvmGetLatestBlock(t *testing.T) { @@ -75,19 +82,25 @@ func TestEvmGetLatestBlock(t *testing.T) { "jsonrpc": "2.0", "result": { "number": "0x41fd60b", - "hash": "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18" + "hash": "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18", + "parentHash": "0x1eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d11" } }`) response := protocol.NewHttpUpstreamResponse("1", body, 200, protocol.JsonRpc) connector.On("SendRequest", ctx, mock.Anything).Return(response) - block, err := specific.EvmChainSpecific.GetLatestBlock(ctx, connector) + block, err := specific.EvmChainSpecific.GetLatestBlock(ctx, connector, "") + assert.Nil(t, err) connector.AssertExpectations(t) - assert.Nil(t, err) - assert.Equal(t, uint64(69195275), block.BlockData.Height) - assert.Equal(t, "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18", block.BlockData.Hash) + + expected := &protocol.BlockData{ + Height: uint64(69195275), + Hash: blockchain.NewHashIdFromString("0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18"), + ParentHash: blockchain.NewHashIdFromString("0x1eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d11"), + } + assert.Equal(t, expected, block.BlockData) } func TestEvmGetLatestBlockWithError(t *testing.T) { @@ -97,7 +110,7 @@ func TestEvmGetLatestBlockWithError(t *testing.T) { connector.On("SendRequest", ctx, mock.Anything).Return(response) - block, err := specific.EvmChainSpecific.GetLatestBlock(ctx, connector) + block, err := specific.EvmChainSpecific.GetLatestBlock(ctx, connector, "") connector.AssertExpectations(t) assert.Nil(t, block) diff --git a/internal/upstreams/chains_specific/solana_chain_specific.go b/internal/upstreams/chains_specific/solana_chain_specific.go index 3e82752..b7f5f26 100644 --- a/internal/upstreams/chains_specific/solana_chain_specific.go +++ b/internal/upstreams/chains_specific/solana_chain_specific.go @@ -2,6 +2,7 @@ package specific import ( "context" + "encoding/binary" "fmt" "github.com/bytedance/sonic" @@ -9,17 +10,26 @@ import ( "github.com/drpcorg/nodecore/internal/protocol" "github.com/drpcorg/nodecore/internal/upstreams/connectors" "github.com/drpcorg/nodecore/internal/upstreams/validations" + "github.com/drpcorg/nodecore/pkg/blockchain" "github.com/drpcorg/nodecore/pkg/chains" + "github.com/drpcorg/nodecore/pkg/utils" "github.com/samber/lo" ) +const checkInterval = 5 + var SolanaChainSpecific *SolanaChainSpecificObject func init() { - SolanaChainSpecific = &SolanaChainSpecificObject{} + SolanaChainSpecific = &SolanaChainSpecificObject{ + lastKnownHeights: utils.NewCMap[string, uint64](), + lastCheckedSlots: utils.NewCMap[string, uint64](), + } } type SolanaChainSpecificObject struct { + lastKnownHeights *utils.CMap[string, uint64] + lastCheckedSlots *utils.CMap[string, uint64] } func (s *SolanaChainSpecificObject) SettingsValidators( @@ -33,133 +43,114 @@ func (s *SolanaChainSpecificObject) SettingsValidators( var _ ChainSpecific = (*SolanaChainSpecificObject)(nil) -func (s *SolanaChainSpecificObject) GetLatestBlock(ctx context.Context, connector connectors.ApiConnector) (*protocol.Block, error) { - slot, err := getLatestSlot(ctx, connector) - if err != nil { - return nil, err - } - - maxBlock, err := getMaxBlock(ctx, connector, slot) - if err != nil { - return nil, err - } - - block, err := getBlock(ctx, connector, maxBlock) - if err != nil { - return nil, err - } - - parsedBlock, err := s.ParseBlock(block) - if err != nil { - return nil, err - } - parsedBlock.BlockData.Slot = maxBlock - return parsedBlock, nil +func (s *SolanaChainSpecificObject) GetLatestBlock( + ctx context.Context, + connector connectors.ApiConnector, + upstreamId string, +) (*protocol.Block, error) { + return s.getEpochInfo(ctx, connector, upstreamId) } -func (s *SolanaChainSpecificObject) GetFinalizedBlock(ctx context.Context, connector connectors.ApiConnector) (*protocol.Block, error) { +func (s *SolanaChainSpecificObject) GetFinalizedBlock(_ context.Context, _ connectors.ApiConnector) (*protocol.Block, error) { // TODO: implement get block/slot with finalized commitment return nil, nil } func (s *SolanaChainSpecificObject) ParseBlock(blockBytes []byte) (*protocol.Block, error) { - solanaBlock := SolanaBlock{} - err := sonic.Unmarshal(blockBytes, &solanaBlock) + epochInfo := SolanaEpochInfo{} + err := sonic.Unmarshal(blockBytes, &epochInfo) if err != nil { return nil, fmt.Errorf("couldn't parse the solana block, reason - %s", err.Error()) } - return protocol.NewBlock(solanaBlock.Height, 0, solanaBlock.Hash), nil + return createNewSolanaBlock(epochInfo.BlockHeight, epochInfo.AbsoluteSlot), nil } -func (s *SolanaChainSpecificObject) ParseSubscriptionBlock(blockBytes []byte) (*protocol.Block, error) { - solanaSubBlock := SolanaSubscriptionBlock{} - err := sonic.Unmarshal(blockBytes, &solanaSubBlock) +func (s *SolanaChainSpecificObject) ParseSubscriptionBlock( + blockBytes []byte, + connector connectors.ApiConnector, + upstreamId string, +) (*protocol.Block, error) { + slotEvent := SolanaSlotEvent{} + err := sonic.Unmarshal(blockBytes, &slotEvent) if err != nil { return nil, err } - - return protocol.NewBlock( - solanaSubBlock.Value.Block.Height, - solanaSubBlock.Context.Slot, - solanaSubBlock.Value.Block.Hash, - ), nil + lastSlot, _ := s.lastCheckedSlots.Load(upstreamId) + lastHeight, _ := s.lastKnownHeights.Load(upstreamId) + shouldCheck := slotEvent.Slot-lastSlot >= checkInterval + estimatedHeight := lo.Ternary(lastHeight != 0 && lastSlot != 0, lastHeight+(slotEvent.Slot-lastSlot), 0) + + if shouldCheck || estimatedHeight == 0 { + block, err := s.getEpochInfo(context.Background(), connector, upstreamId) + if err != nil { + var height uint64 + if estimatedHeight != 0 { + height = estimatedHeight + } else { + if lastHeight != 0 { + height = lastHeight + } else { + height = slotEvent.Slot + } + } + return createNewSolanaBlock(height, slotEvent.Slot), nil + } + return createNewSolanaBlock(block.BlockData.Height, block.BlockData.Slot), nil + } + return createNewSolanaBlock(estimatedHeight, slotEvent.Slot), nil } func (s *SolanaChainSpecificObject) SubscribeHeadRequest() (protocol.RequestHolder, error) { - params := map[string]interface{}{ - "showRewards": false, - "transactionDetails": "none", - } - return protocol.NewInternalSubUpstreamJsonRpcRequest("blockSubscribe", []interface{}{"all", params}) + return protocol.NewInternalSubUpstreamJsonRpcRequest("slotSubscribe", nil) } -func getLatestSlot(ctx context.Context, connector connectors.ApiConnector) (uint64, error) { - slotReq, err := protocol.NewInternalUpstreamJsonRpcRequest("getSlot", nil) +func (s *SolanaChainSpecificObject) getEpochInfo( + ctx context.Context, + connector connectors.ApiConnector, + upstreamId string, +) (*protocol.Block, error) { + request, err := protocol.NewInternalUpstreamJsonRpcRequest("getEpochInfo", nil) if err != nil { - return 0, err + return nil, err } - slotResponse := connector.SendRequest(ctx, slotReq) - if slotResponse.HasError() { - return 0, slotResponse.GetError() + response := connector.SendRequest(ctx, request) + if response.HasError() { + return nil, response.GetError() } - - slot := protocol.ResultAsNumber(slotResponse.ResponseResult()) - - return slot, nil -} - -func getMaxBlock(ctx context.Context, connector connectors.ApiConnector, slot uint64) (uint64, error) { - blocksReq, err := protocol.NewInternalUpstreamJsonRpcRequest("getBlocks", []interface{}{slot - 10, slot}) + block, err := s.ParseBlock(response.ResponseResult()) if err != nil { - return 0, err - } - blocksResponse := connector.SendRequest(ctx, blocksReq) - if blocksResponse.HasError() { - return 0, blocksResponse.GetError() + return nil, err } - var blocks []uint64 - err = sonic.Unmarshal(blocksResponse.ResponseResult(), &blocks) - if err != nil { - return 0, err - } + s.lastKnownHeights.Store(upstreamId, block.BlockData.Height) + s.lastCheckedSlots.Store(upstreamId, block.BlockData.Slot) - return lo.Max(blocks), nil + return block, nil } -func getBlock(ctx context.Context, connector connectors.ApiConnector, block uint64) ([]byte, error) { - params := map[string]interface{}{ - "showRewards": false, - "transactionDetails": "none", - "maxSupportedTransactionVersion": 0, - } - blockReq, err := protocol.NewInternalUpstreamJsonRpcRequest("getBlock", []interface{}{block, params}) - if err != nil { - return nil, err - } - blockResponse := connector.SendRequest(ctx, blockReq) - if blockResponse.HasError() { - return nil, blockResponse.GetError() - } +func SyntheticHashes(slot uint64, parentSlot uint64) (blockchain.HashId, blockchain.HashId) { + b1 := make([]byte, 32) + binary.BigEndian.PutUint64(b1, slot) + syntheticHash := blockchain.NewHashIdFromBytes(b1) - return blockResponse.ResponseResult(), nil -} + b2 := make([]byte, 32) + binary.BigEndian.PutUint64(b2, parentSlot) + syntheticParentHash := blockchain.NewHashIdFromBytes(b2) -type SolanaBlock struct { - Height uint64 `json:"blockHeight"` - Hash string `json:"blockhash"` + return syntheticHash, syntheticParentHash } -type SolanaSubscriptionBlock struct { - Context Context `json:"context"` - Value Value `json:"value"` +func createNewSolanaBlock(height uint64, slot uint64) *protocol.Block { + hash, parentHash := SyntheticHashes(slot, slot-1) + return protocol.NewBlock(height, slot, hash, parentHash) } -type Context struct { - Slot uint64 `json:"slot"` +type SolanaEpochInfo struct { + AbsoluteSlot uint64 `json:"absoluteSlot"` + BlockHeight uint64 `json:"blockHeight"` } -type Value struct { - Block SolanaBlock `json:"block"` +type SolanaSlotEvent struct { + Slot uint64 `json:"slot"` } diff --git a/internal/upstreams/chains_specific/solana_chain_specific_test.go b/internal/upstreams/chains_specific/solana_chain_specific_test.go index e4181ca..d2d3d6f 100644 --- a/internal/upstreams/chains_specific/solana_chain_specific_test.go +++ b/internal/upstreams/chains_specific/solana_chain_specific_test.go @@ -21,73 +21,115 @@ func TestSolanaSubscribeHeadRequest(t *testing.T) { assert.Nil(t, err) assert.Equal(t, "1", req.Id()) - assert.Equal(t, "blockSubscribe", req.Method()) + assert.Equal(t, "slotSubscribe", req.Method()) assert.False(t, req.IsStream()) - require.JSONEq(t, `{"id":"1","jsonrpc":"2.0","method":"blockSubscribe","params":["all",{"showRewards":false,"transactionDetails":"none"}]}`, string(body)) + require.JSONEq(t, `{"id":"1","jsonrpc":"2.0","method":"slotSubscribe","params":null}`, string(body)) } -func TestSolanaParseSubBLock(t *testing.T) { +func TestSolanaParseSubBlockErrEpochInfo(t *testing.T) { + connector := mocks.NewConnectorMock() body := []byte(`{ - "context": { - "slot": 327557189 - }, - "value": { - "slot": 327557189, - "block": { - "previousBlockhash": "5SFHqjdrjZdydRF8Cey9Zgp4CCX9ELifUSvjJV7kacnn", - "blockhash": "2XB8V5eP7HaNeRd2u98YLYS7QqzX61MNskmzeyXy4oiG", - "parentSlot": 327557188, - "blockTime": 1742296365, - "blockHeight": 305813576 - }, - "err": null - } - }`) - - block, err := specific.SolanaChainSpecific.ParseSubscriptionBlock(body) + "jsonrpc": "2.0", + "error": { + "code": -32000, + "message": "Server error: EpochInfo" + }, + "id": 1 + }`) + slot := []byte(`{ + "slot": 405220706, + "parent": 405220705, + "root": 405220674 + }`) + epochResponse := protocol.NewHttpUpstreamResponse("1", body, 200, protocol.JsonRpc) + + connector.On("SendRequest", context.Background(), mock.Anything).Return(epochResponse) + block, err := specific.SolanaChainSpecific.ParseSubscriptionBlock(slot, connector, "up1") assert.Nil(t, err) - assert.Equal(t, uint64(305813576), block.BlockData.Height) - assert.Equal(t, uint64(327557189), block.BlockData.Slot) - assert.Equal(t, "2XB8V5eP7HaNeRd2u98YLYS7QqzX61MNskmzeyXy4oiG", block.BlockData.Hash) + + connector.AssertExpectations(t) + + hash, parentHash := specific.SyntheticHashes(405220706, 405220705) + blockData := protocol.NewBlockData(405220706, 405220706, hash, parentHash) + assert.Equal(t, blockData, block.BlockData) } -func TestSolanaGetLatestBlock(t *testing.T) { - ctx := context.Background() +func TestSolanaParseSubBLock(t *testing.T) { connector := mocks.NewConnectorMock() - slotBody := []byte(`{ - "id": 1, - "jsonrpc": "2.0", - "result": 327557752 + body := []byte(`{ + "slot": 405220706, + "parent": 405220705, + "root": 405220674 }`) - slotResponse := protocol.NewHttpUpstreamResponse("1", slotBody, 200, protocol.JsonRpc) - maxBlocksBody := []byte(`{ - "id": 1, - "jsonrpc": "2.0", - "result": [327557750, 327557751, 327557752] + body1 := []byte(`{ + "slot": 405219989, + "parent": 405220705, + "root": 405220674 }`) - maxBlocksResponse := protocol.NewHttpUpstreamResponse("1", maxBlocksBody, 200, protocol.JsonRpc) - blockBody := []byte(`{ - "id": 1, + epochBody := []byte(`{ "jsonrpc": "2.0", "result": { - "blockHeight": 305814139, - "blockhash": "7QbMXETjcbRHTLxqAEH62nGE2o8mNh7JsspkzughEoGv" - } + "absoluteSlot": 405219988, + "blockHeight": 383325939, + "epoch": 938, + "slotIndex": 3988, + "slotsInEpoch": 432000, + "transactionCount": 494578437235 + }, + "id": 1 }`) - blockResponse := protocol.NewHttpUpstreamResponse("1", blockBody, 200, protocol.JsonRpc) + epochResponse := protocol.NewHttpUpstreamResponse("1", epochBody, 200, protocol.JsonRpc) - connector.On("SendRequest", ctx, mock.Anything).Return(slotResponse).Once() - connector.On("SendRequest", ctx, mock.Anything).Return(maxBlocksResponse).Once() - connector.On("SendRequest", ctx, mock.Anything).Return(blockResponse).Once() + connector.On("SendRequest", context.Background(), mock.Anything).Return(epochResponse) - block, err := specific.SolanaChainSpecific.GetLatestBlock(ctx, connector) + block, err := specific.SolanaChainSpecific.ParseSubscriptionBlock(body, connector, "up1") + assert.Nil(t, err) connector.AssertExpectations(t) + + hash, parentHash := specific.SyntheticHashes(405219988, 405219987) + blockData := protocol.NewBlockData(383325939, 405219988, hash, parentHash) + assert.Equal(t, blockData, block.BlockData) + + block, err = specific.SolanaChainSpecific.ParseSubscriptionBlock(body1, connector, "up1") assert.Nil(t, err) - assert.Equal(t, uint64(305814139), block.BlockData.Height) - assert.Equal(t, uint64(327557752), block.BlockData.Slot) - assert.Equal(t, "7QbMXETjcbRHTLxqAEH62nGE2o8mNh7JsspkzughEoGv", block.BlockData.Hash) + + hash, parentHash = specific.SyntheticHashes(405219989, 405219988) + blockData = protocol.NewBlockData(383325940, 405219989, hash, parentHash) + assert.Equal(t, blockData, block.BlockData) + + connector.AssertNumberOfCalls(t, "SendRequest", 1) +} + +func TestSolanaGetLatestBlock(t *testing.T) { + ctx := context.Background() + connector := mocks.NewConnectorMock() + epochBody := []byte(`{ + "jsonrpc": "2.0", + "result": { + "absoluteSlot": 405219988, + "blockHeight": 383325939, + "epoch": 938, + "slotIndex": 3988, + "slotsInEpoch": 432000, + "transactionCount": 494578437235 + }, + "id": 1 + }`) + epochResponse := protocol.NewHttpUpstreamResponse("1", epochBody, 200, protocol.JsonRpc) + + connector.On("SendRequest", ctx, mock.Anything).Return(epochResponse) + + block, err := specific.SolanaChainSpecific.GetLatestBlock(ctx, connector, "") + assert.Nil(t, err) + + connector.AssertExpectations(t) + + hash, parentHash := specific.SyntheticHashes(405219988, 405219987) + blockData := protocol.NewBlockData(383325939, 405219988, hash, parentHash) + + assert.Equal(t, blockData, block.BlockData) } func TestSolanaGetLatestBlockWithError(t *testing.T) { @@ -97,7 +139,7 @@ func TestSolanaGetLatestBlockWithError(t *testing.T) { connector.On("SendRequest", ctx, mock.Anything).Return(response) - block, err := specific.SolanaChainSpecific.GetLatestBlock(ctx, connector) + block, err := specific.SolanaChainSpecific.GetLatestBlock(ctx, connector, "") connector.AssertExpectations(t) assert.Nil(t, block) diff --git a/internal/upstreams/upstream.go b/internal/upstreams/upstream.go index 15b46c5..24e3bf4 100644 --- a/internal/upstreams/upstream.go +++ b/internal/upstreams/upstream.go @@ -98,7 +98,7 @@ func NewBaseUpstream( ctx, cancel := context.WithCancel(ctx) configuredChain := chains.GetChain(conf.ChainName) - headConnector, apiConnectors, caps, err := createUpstreamConnectors(ctx, conf, configuredChain, tracker, statsService, executor, torProxyUrl) + upstreamConnectorsInfo, caps, err := createUpstreamConnectors(ctx, conf, configuredChain, tracker, statsService, executor, torProxyUrl) if err != nil { cancel() return nil, err @@ -138,7 +138,7 @@ func NewBaseUpstream( id: conf.Id, chain: configuredChain.Chain, vendorType: getUpstreamVendor(conf.Connectors), - apiConnectors: apiConnectors, + apiConnectors: upstreamConnectorsInfo.allConnectors, upstreamCtx: newUpstreamCtx(cancel, mainLifecycle, processorsLifecycle), upstreamState: upState, subManager: utils.NewSubscriptionManager[protocol.UpstreamEvent](fmt.Sprintf("%s_upstream", conf.Id)), @@ -147,9 +147,9 @@ func NewBaseUpstream( upstreamIndexHex: upstreamIndexHex, upConfig: conf, - headProcessor: blocks.NewHeadProcessor(ctx, conf, headConnector, chainSpecific), - blockProcessor: createBlockProcessor(ctx, conf, headConnector, chainSpecific, configuredChain.Type), - settingsValidationProcessor: createSettingValidationProcessor(conf.Id, headConnector, configuredChain, chainSpecific, conf.Options), + headProcessor: blocks.NewHeadProcessor(ctx, conf, upstreamConnectorsInfo.internalRequestConnector, upstreamConnectorsInfo.headConnector, chainSpecific), + blockProcessor: createBlockProcessor(ctx, conf, upstreamConnectorsInfo.internalRequestConnector, chainSpecific, configuredChain.Type), + settingsValidationProcessor: createSettingValidationProcessor(conf.Id, upstreamConnectorsInfo.internalRequestConnector, configuredChain, chainSpecific, conf.Options), }, nil } @@ -445,15 +445,16 @@ func createUpstreamConnectors( statsService stats.StatsService, executor failsafe.Executor[protocol.ResponseHolder], torProxyUrl string, -) (connectors.ApiConnector, []connectors.ApiConnector, mapset.Set[protocol.Cap], error) { +) (*connectorsInfo, mapset.Set[protocol.Cap], error) { caps := mapset.NewThreadUnsafeSet[protocol.Cap]() apiConnectors := make([]connectors.ApiConnector, 0) var headConnector connectors.ApiConnector + var internalRequestConnector connectors.ApiConnector for _, connectorConfig := range conf.Connectors { apiConnector, err := createConnector(ctx, conf.Id, configuredChain, connectorConfig, torProxyUrl) if err != nil { - return nil, nil, nil, fmt.Errorf("cound't create api connector of %s: %v", conf.Id, err) + return nil, nil, fmt.Errorf("couldn't create api connector of %s: %v", conf.Id, err) } hooks := []protocol.ResponseReceivedHook{ dimensions.NewDimensionHook(tracker), @@ -463,11 +464,32 @@ func createUpstreamConnectors( if connectorConfig.Type == conf.HeadConnector { headConnector = apiConnector } + if connectorConfig.Type == conf.GetBestConnector() { + internalRequestConnector = apiConnector + } if apiConnector.GetType() == protocol.WsConnector { caps.Add(protocol.WsCap) } apiConnectors = append(apiConnectors, apiConnector) } - return headConnector, apiConnectors, caps, nil + return newConnectorInfo(headConnector, internalRequestConnector, apiConnectors), caps, nil +} + +type connectorsInfo struct { + headConnector connectors.ApiConnector + internalRequestConnector connectors.ApiConnector + allConnectors []connectors.ApiConnector +} + +func newConnectorInfo( + headConnector, + internalRequestConnector connectors.ApiConnector, + allConnectors []connectors.ApiConnector, +) *connectorsInfo { + return &connectorsInfo{ + headConnector: headConnector, + internalRequestConnector: internalRequestConnector, + allConnectors: allConnectors, + } } diff --git a/internal/upstreams/upstream_test.go b/internal/upstreams/upstream_test.go index fa1de0c..b93d1b9 100644 --- a/internal/upstreams/upstream_test.go +++ b/internal/upstreams/upstream_test.go @@ -14,6 +14,7 @@ import ( specific "github.com/drpcorg/nodecore/internal/upstreams/chains_specific" "github.com/drpcorg/nodecore/internal/upstreams/methods" "github.com/drpcorg/nodecore/internal/upstreams/validations" + "github.com/drpcorg/nodecore/pkg/blockchain" "github.com/drpcorg/nodecore/pkg/chains" specs "github.com/drpcorg/nodecore/pkg/methods" "github.com/drpcorg/nodecore/pkg/test_utils" @@ -33,7 +34,8 @@ func TestUpstreamHeadEvent(t *testing.T) { "jsonrpc": "2.0", "result": { "number": "0x41fd60b", - "hash": "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18" + "hash": "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18", + "parentHash": "0x1eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d11" } }`) requestLatest, _ := protocol.NewInternalUpstreamJsonRpcRequest(specs.EthGetBlockByNumber, []any{"latest", false}) @@ -53,14 +55,11 @@ func TestUpstreamHeadEvent(t *testing.T) { sub := upstream.Subscribe("name") - checkFunc := func(height uint64, hash string) { + checkFunc := func(blockData *protocol.BlockData) { event, ok := <-sub.Events state := protocol.UpstreamState{ - Status: protocol.Available, - HeadData: &protocol.BlockData{ - Height: height, - Hash: hash, - }, + Status: protocol.Available, + HeadData: blockData, BlockInfo: protocol.NewBlockInfo(), UpstreamMethods: mocks.NewMethodsMock(), Caps: mapset.NewThreadUnsafeSet[protocol.Cap](), @@ -79,9 +78,15 @@ func TestUpstreamHeadEvent(t *testing.T) { assert.Equal(t, state, upstream.GetUpstreamState()) } - checkFunc(69195275, "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18") + blockData := protocol.NewBlockData( + 69195275, + 0, + blockchain.NewHashIdFromString("0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18"), + blockchain.NewHashIdFromString("0x1eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d11"), + ) + checkFunc(blockData) upstream.UpdateHead(79195275, 0) - checkFunc(79195275, "") + checkFunc(protocol.NewBlockData(79195275, 0, blockchain.EmptyHash, blockchain.EmptyHash)) connector.AssertExpectations(t) } @@ -95,7 +100,8 @@ func TestUpstreamBlockEvent(t *testing.T) { "jsonrpc": "2.0", "result": { "number": "0x345", - "hash": "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18" + "hash": "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18", + "parentHash": "0x1eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d11" } }`) requestLatest, _ := protocol.NewInternalUpstreamJsonRpcRequest(specs.EthGetBlockByNumber, []any{"latest", false}) @@ -156,9 +162,15 @@ func TestUpstreamBlockEvent(t *testing.T) { assert.Equal(t, state.BlockInfo.GetBlocks(), upstream.GetUpstreamState().BlockInfo.GetBlocks()) } - checkFunc(protocol.NewBlockData(837, 0, "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18")) + blockData := protocol.NewBlockData( + 837, + 0, + blockchain.NewHashIdFromString("0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18"), + blockchain.NewHashIdFromString("0x1eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d11"), + ) + checkFunc(blockData) upstream.UpdateBlock(protocol.NewBlockDataWithHeight(1000), protocol.FinalizedBlock) - checkFunc(protocol.NewBlockData(1000, 0, "")) + checkFunc(protocol.NewBlockData(1000, 0, blockchain.EmptyHash, blockchain.EmptyHash)) time.Sleep(15 * time.Millisecond) diff --git a/pkg/blockchain/hash_id.go b/pkg/blockchain/hash_id.go new file mode 100644 index 0000000..e0a121b --- /dev/null +++ b/pkg/blockchain/hash_id.go @@ -0,0 +1,58 @@ +package blockchain + +import ( + "encoding/hex" + "strings" +) + +var hexDigits = []byte("0123456789abcdef") + +var EmptyHash HashId = nil + +type HashId []byte + +func NewHashIdFromBytes(value []byte) HashId { + if len(value) >= 2 && value[0] == '0' && (value[1] == 'x' || value[1] == 'X') { + value = value[2:] + } + return value +} + +func NewHashIdFromString(value string) HashId { + clean := value + if strings.HasPrefix(value, "0x") || strings.HasPrefix(value, "0X") { + clean = value[2:] + } + + if len(clean)%2 != 0 { + clean = "0" + clean + } + + bytes, err := hex.DecodeString(clean) + if err != nil { + return bytes + } + + return bytes +} + +func (h HashId) ToHex() string { + hexHash := make([]byte, len(h)*2) + + i := 0 + j := 0 + for i < len(h) { + b := h[i] + hexHash[j] = hexDigits[(b&0xF0)>>4] + j++ + hexHash[j] = hexDigits[b&0x0F] + j++ + i++ + } + + return string(hexHash) +} + +func (h HashId) ToHexWithPrefix() string { + return "0x" + h.ToHex() +} diff --git a/pkg/blockchain/hash_id_test.go b/pkg/blockchain/hash_id_test.go new file mode 100644 index 0000000..0fbbff8 --- /dev/null +++ b/pkg/blockchain/hash_id_test.go @@ -0,0 +1,25 @@ +package blockchain_test + +import ( + "encoding/binary" + "testing" + + "github.com/drpcorg/nodecore/pkg/blockchain" + "github.com/stretchr/testify/assert" +) + +func TestNewHashIdFromString(t *testing.T) { + hashStr := "0xbb6e6f14b1656de46de5bdd9f13196e5d556ad071a2ce802bb3e5e704047b40e" + hashId := blockchain.NewHashIdFromString(hashStr) + + assert.Equal(t, hashStr[2:], hashId.ToHex()) + assert.Equal(t, hashStr, hashId.ToHexWithPrefix()) +} + +func TestNewHashIdFromBytes(t *testing.T) { + b1 := make([]byte, 32) + binary.BigEndian.PutUint64(b1, 405223378) + hashId := blockchain.NewHashIdFromBytes(b1) + + assert.Equal(t, "00000000182737d2000000000000000000000000000000000000000000000000", hashId.ToHex()) +} diff --git a/pkg/test_utils/test_helpers.go b/pkg/test_utils/test_helpers.go index 98997d9..38522c9 100644 --- a/pkg/test_utils/test_helpers.go +++ b/pkg/test_utils/test_helpers.go @@ -169,7 +169,7 @@ func TestEvmUpstream( "id", chains.ETHEREUM, []connectors.ApiConnector{connector}, - blocks.NewHeadProcessor(ctx, upConfig, connector, specific.EvmChainSpecific), + blocks.NewHeadProcessor(ctx, upConfig, connector, connector, specific.EvmChainSpecific), blockProcessor, settingValidationProcessor, upState, From 432748eea8b8d1ec0d59577165084d6a208b28c7 Mon Sep 17 00:00:00 2001 From: KirillPamPam Date: Mon, 9 Mar 2026 17:11:27 +0400 Subject: [PATCH 2/8] Fix aztec --- internal/upstreams/chains_specific/aztec_chain_specific.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/upstreams/chains_specific/aztec_chain_specific.go b/internal/upstreams/chains_specific/aztec_chain_specific.go index 0d9c59d..fcfd43a 100644 --- a/internal/upstreams/chains_specific/aztec_chain_specific.go +++ b/internal/upstreams/chains_specific/aztec_chain_specific.go @@ -9,6 +9,7 @@ import ( "github.com/drpcorg/nodecore/internal/protocol" "github.com/drpcorg/nodecore/internal/upstreams/connectors" "github.com/drpcorg/nodecore/internal/upstreams/validations" + "github.com/drpcorg/nodecore/pkg/blockchain" "github.com/drpcorg/nodecore/pkg/chains" ) @@ -62,7 +63,7 @@ func (a *AztecChainSpecificObject) ParseBlock(blockBytes []byte) (*protocol.Bloc return nil, fmt.Errorf("couldn't parse the aztec block, got '%s'", string(blockBytes)) } - return nil, nil + return protocol.NewBlock(height, 0, blockchain.NewHashIdFromString(block.BlockHash), blockchain.EmptyHash), nil } func (a *AztecChainSpecificObject) ParseSubscriptionBlock(_ []byte, _ connectors.ApiConnector, _ string) (*protocol.Block, error) { From 635136f009d1c3ab7ecd3d20398ce714763ec5f6 Mon Sep 17 00:00:00 2001 From: KirillPamPam Date: Mon, 9 Mar 2026 17:30:05 +0400 Subject: [PATCH 3/8] Solana log --- internal/upstreams/chains_specific/solana_chain_specific.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/upstreams/chains_specific/solana_chain_specific.go b/internal/upstreams/chains_specific/solana_chain_specific.go index b7f5f26..807761f 100644 --- a/internal/upstreams/chains_specific/solana_chain_specific.go +++ b/internal/upstreams/chains_specific/solana_chain_specific.go @@ -13,6 +13,7 @@ import ( "github.com/drpcorg/nodecore/pkg/blockchain" "github.com/drpcorg/nodecore/pkg/chains" "github.com/drpcorg/nodecore/pkg/utils" + "github.com/rs/zerolog/log" "github.com/samber/lo" ) @@ -94,6 +95,7 @@ func (s *SolanaChainSpecificObject) ParseSubscriptionBlock( height = slotEvent.Slot } } + log.Err(err).Msgf("couldn't get the epoch info for upstream %s, using the estimated height %d, slot %d", upstreamId, height, slotEvent.Slot) return createNewSolanaBlock(height, slotEvent.Slot), nil } return createNewSolanaBlock(block.BlockData.Height, block.BlockData.Slot), nil From 3dc1bc2e788bb8fbff9a5b0fe292dde14a5b8a8e Mon Sep 17 00:00:00 2001 From: KirillPamPam Date: Mon, 9 Mar 2026 17:56:39 +0400 Subject: [PATCH 4/8] Review fixes --- internal/upstreams/chains_specific/solana_chain_specific.go | 2 +- pkg/blockchain/hash_id.go | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/upstreams/chains_specific/solana_chain_specific.go b/internal/upstreams/chains_specific/solana_chain_specific.go index 807761f..631f782 100644 --- a/internal/upstreams/chains_specific/solana_chain_specific.go +++ b/internal/upstreams/chains_specific/solana_chain_specific.go @@ -79,7 +79,7 @@ func (s *SolanaChainSpecificObject) ParseSubscriptionBlock( } lastSlot, _ := s.lastCheckedSlots.Load(upstreamId) lastHeight, _ := s.lastKnownHeights.Load(upstreamId) - shouldCheck := slotEvent.Slot-lastSlot >= checkInterval + shouldCheck := slotEvent.Slot >= lastSlot && slotEvent.Slot-lastSlot >= checkInterval estimatedHeight := lo.Ternary(lastHeight != 0 && lastSlot != 0, lastHeight+(slotEvent.Slot-lastSlot), 0) if shouldCheck || estimatedHeight == 0 { diff --git a/pkg/blockchain/hash_id.go b/pkg/blockchain/hash_id.go index e0a121b..459f14f 100644 --- a/pkg/blockchain/hash_id.go +++ b/pkg/blockchain/hash_id.go @@ -12,6 +12,7 @@ var EmptyHash HashId = nil type HashId []byte func NewHashIdFromBytes(value []byte) HashId { + // just pass bytes as is if len(value) >= 2 && value[0] == '0' && (value[1] == 'x' || value[1] == 'X') { value = value[2:] } From bdab0862bacadf96c524d544bfdff45295c3e622 Mon Sep 17 00:00:00 2001 From: KirillPamPam Date: Tue, 10 Mar 2026 13:28:35 +0400 Subject: [PATCH 5/8] Refactor chain-specific --- internal/upstreams/blocks/block_processor.go | 2 +- .../upstreams/blocks/block_processor_test.go | 6 +- internal/upstreams/blocks/head.go | 46 +++++------- internal/upstreams/blocks/head_test.go | 6 +- .../chains_specific/aztec_chain_specific.go | 33 ++++----- .../chains_specific/evm_chain_specific.go | 45 +++++++----- .../evm_chain_specific_test.go | 13 ++-- .../chains_specific/solana_chain_specific.go | 73 ++++++++----------- .../solana_chain_specific_test.go | 17 +++-- internal/upstreams/upstream.go | 32 ++++---- internal/upstreams/upstream_test.go | 3 +- pkg/test_utils/test_helpers.go | 6 +- 12 files changed, 136 insertions(+), 146 deletions(-) diff --git a/internal/upstreams/blocks/block_processor.go b/internal/upstreams/blocks/block_processor.go index b6d3e8a..006600c 100644 --- a/internal/upstreams/blocks/block_processor.go +++ b/internal/upstreams/blocks/block_processor.go @@ -116,7 +116,7 @@ func (b *EthLikeBlockProcessor) poll(blockType protocol.BlockType) { ctx, cancel := context.WithTimeout(b.lifecycle.GetParentContext(), b.internalTimeout) defer cancel() - block, err := b.chainSpecific.GetFinalizedBlock(ctx, b.connector) + block, err := b.chainSpecific.GetFinalizedBlock(ctx) if err != nil { var respErr *protocol.ResponseError if errors.As(err, &respErr) { diff --git a/internal/upstreams/blocks/block_processor_test.go b/internal/upstreams/blocks/block_processor_test.go index 54da3b0..bada4e8 100644 --- a/internal/upstreams/blocks/block_processor_test.go +++ b/internal/upstreams/blocks/block_processor_test.go @@ -8,8 +8,8 @@ import ( "github.com/drpcorg/nodecore/internal/config" "github.com/drpcorg/nodecore/internal/protocol" "github.com/drpcorg/nodecore/internal/upstreams/blocks" - specific "github.com/drpcorg/nodecore/internal/upstreams/chains_specific" "github.com/drpcorg/nodecore/pkg/blockchain" + "github.com/drpcorg/nodecore/pkg/test_utils" "github.com/drpcorg/nodecore/pkg/test_utils/mocks" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -31,7 +31,7 @@ func TestEthLikeBlockProcessorGetFinalizedBlock(t *testing.T) { connector.On("SendRequest", mock.Anything, mock.Anything).Return(response) - processor := blocks.NewEthLikeBlockProcessor(ctx, upConfig, connector, specific.EvmChainSpecific) + processor := blocks.NewEthLikeBlockProcessor(ctx, upConfig, connector, test_utils.NewEvmChainSpecific(connector)) go processor.Start() sub := processor.Subscribe("sub") @@ -82,7 +82,7 @@ func TestEthLikeBlockProcessorDisableFinalizedBlock(t *testing.T) { connector.On("SendRequest", mock.Anything, mock.Anything).Return(response) - processor := blocks.NewEthLikeBlockProcessor(ctx, upConfig, connector, specific.EvmChainSpecific) + processor := blocks.NewEthLikeBlockProcessor(ctx, upConfig, connector, test_utils.NewEvmChainSpecific(connector)) go processor.Start() sub := processor.Subscribe("sub") diff --git a/internal/upstreams/blocks/head.go b/internal/upstreams/blocks/head.go index b93a86e..faa0885 100644 --- a/internal/upstreams/blocks/head.go +++ b/internal/upstreams/blocks/head.go @@ -34,12 +34,11 @@ var _ utils.Lifecycle = (*HeadProcessor)(nil) func NewHeadProcessor( ctx context.Context, upConfig *config.Upstream, - requestConnector, headConnector connectors.ApiConnector, specific specific.ChainSpecific, ) *HeadProcessor { configuredChain := chains.GetChain(upConfig.ChainName) - head := createHead(ctx, upConfig.Id, upConfig.PollInterval, requestConnector, headConnector, specific, upConfig.Options) + head := createHead(ctx, upConfig.Id, upConfig.PollInterval, headConnector, specific, upConfig.Options) headNoUpdatesTimeout := 1 * time.Minute switch head.(type) { @@ -93,7 +92,7 @@ func (h *HeadProcessor) Start() { return nil case block, ok := <-h.head.HeadsChan(): if ok { - log.Debug().Msgf("got a new head of upstream %s - %d", h.upstreamId, block.BlockData.Height) + log.Info().Msgf("got a new head of upstream %s - %d", h.upstreamId, block.BlockData.Height) h.lastUpdate.Store(time.Now()) h.subManager.Publish(HeadEvent{HeadData: block.BlockData}) } @@ -122,7 +121,6 @@ func (h *HeadProcessor) UpdateHead(height, slot uint64) { func createHead( ctx context.Context, id string, pollInterval time.Duration, - requestConnector, headConnector connectors.ApiConnector, specific specific.ChainSpecific, options *config.UpstreamOptions, @@ -130,10 +128,10 @@ func createHead( switch headConnector.GetType() { case protocol.JsonRpcConnector, protocol.RestConnector: log.Info().Msgf("starting an rpc head of upstream %s with poll interval %s", id, pollInterval) - return newRpcHead(ctx, id, headConnector, specific, pollInterval, options) + return newRpcHead(ctx, id, specific, pollInterval, options) case protocol.WsConnector: log.Info().Msgf("starting a subscription head of upstream %s", id) - return newWsHead(ctx, id, requestConnector, headConnector, specific) + return newWsHead(ctx, id, headConnector, specific) default: return nil } @@ -153,7 +151,6 @@ type RpcHead struct { chainSpecific specific.ChainSpecific pollInterval time.Duration internalTimeout time.Duration - connector connectors.ApiConnector upstreamId string pollInProgress atomic.Bool headsChan chan *protocol.Block @@ -177,7 +174,6 @@ var _ Head = (*RpcHead)(nil) func newRpcHead( ctx context.Context, upstreamId string, - connector connectors.ApiConnector, chainSpecific specific.ChainSpecific, pollInterval time.Duration, options *config.UpstreamOptions, @@ -187,7 +183,6 @@ func newRpcHead( block: utils.NewAtomic[protocol.Block](), chainSpecific: chainSpecific, pollInterval: pollInterval, - connector: connector, upstreamId: upstreamId, pollInProgress: atomic.Bool{}, headsChan: make(chan *protocol.Block), @@ -230,7 +225,7 @@ func (r *RpcHead) poll() { ctx, cancel := context.WithTimeout(r.lifecycle.GetParentContext(), r.internalTimeout) defer cancel() - block, err := r.chainSpecific.GetLatestBlock(ctx, r.connector, r.upstreamId) + block, err := r.chainSpecific.GetLatestBlock(ctx) if err != nil { log.Warn().Err(err).Msgf("couldn't get the latest block of upstream %s", r.upstreamId) } else { @@ -241,13 +236,12 @@ func (r *RpcHead) poll() { } type SubscriptionHead struct { - lifecycle *utils.BaseLifecycle - block *utils.Atomic[protocol.Block] - chainSpecific specific.ChainSpecific - requestConnector connectors.ApiConnector - headConnector connectors.ApiConnector - upstreamId string - headsChan chan *protocol.Block + lifecycle *utils.BaseLifecycle + block *utils.Atomic[protocol.Block] + chainSpecific specific.ChainSpecific + headConnector connectors.ApiConnector + upstreamId string + headsChan chan *protocol.Block } func (w *SubscriptionHead) Running() bool { @@ -297,7 +291,7 @@ func (w *SubscriptionHead) Start() { return nil } if message.Type == protocol.Ws { - block, err := w.chainSpecific.ParseSubscriptionBlock(message.Message, w.requestConnector, w.upstreamId) + block, err := w.chainSpecific.ParseSubscriptionBlock(message.Message) if err != nil { log.Warn().Err(err).Msgf("couldn't parse a message from heads subscription of upstream %s", w.upstreamId) return nil @@ -323,7 +317,7 @@ func (w *SubscriptionHead) OnNoHeadUpdates() { } func (w *SubscriptionHead) getLatestBlock() { - block, err := w.chainSpecific.GetLatestBlock(w.lifecycle.GetParentContext(), w.requestConnector, w.upstreamId) + block, err := w.chainSpecific.GetLatestBlock(w.lifecycle.GetParentContext()) if err != nil { log.Warn().Err(err).Msgf("couldn't get the latest block of upstream %s", w.upstreamId) return @@ -335,18 +329,16 @@ func (w *SubscriptionHead) getLatestBlock() { func newWsHead( ctx context.Context, upstreamId string, - requestConnector, headConnector connectors.ApiConnector, chainSpecific specific.ChainSpecific, ) *SubscriptionHead { head := SubscriptionHead{ - lifecycle: utils.NewBaseLifecycle(fmt.Sprintf("%s_subscription_head", upstreamId), ctx), - upstreamId: upstreamId, - chainSpecific: chainSpecific, - requestConnector: requestConnector, - headConnector: headConnector, - block: utils.NewAtomic[protocol.Block](), - headsChan: make(chan *protocol.Block), + lifecycle: utils.NewBaseLifecycle(fmt.Sprintf("%s_subscription_head", upstreamId), ctx), + upstreamId: upstreamId, + chainSpecific: chainSpecific, + headConnector: headConnector, + block: utils.NewAtomic[protocol.Block](), + headsChan: make(chan *protocol.Block), } return &head diff --git a/internal/upstreams/blocks/head_test.go b/internal/upstreams/blocks/head_test.go index 3852aa5..4d2fd16 100644 --- a/internal/upstreams/blocks/head_test.go +++ b/internal/upstreams/blocks/head_test.go @@ -8,8 +8,8 @@ import ( "github.com/drpcorg/nodecore/internal/config" "github.com/drpcorg/nodecore/internal/protocol" "github.com/drpcorg/nodecore/internal/upstreams/blocks" - specific "github.com/drpcorg/nodecore/internal/upstreams/chains_specific" "github.com/drpcorg/nodecore/pkg/blockchain" + "github.com/drpcorg/nodecore/pkg/test_utils" "github.com/drpcorg/nodecore/pkg/test_utils/mocks" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -37,7 +37,7 @@ func TestRpcHead(t *testing.T) { PollInterval: 10 * time.Millisecond, Options: &config.UpstreamOptions{InternalTimeout: 5 * time.Second}, } - headProcessor := blocks.NewHeadProcessor(ctx, &upConfig, connector, connector, specific.EvmChainSpecific) + headProcessor := blocks.NewHeadProcessor(ctx, &upConfig, connector, test_utils.NewEvmChainSpecific(connector)) go headProcessor.Start() sub := headProcessor.Subscribe("test") @@ -115,7 +115,7 @@ func TestWsHead(t *testing.T) { Id: "id", PollInterval: 10 * time.Millisecond, } - headProcessor := blocks.NewHeadProcessor(ctx, &upConfig, reqConnector, connector, specific.EvmChainSpecific) + headProcessor := blocks.NewHeadProcessor(ctx, &upConfig, connector, test_utils.NewEvmChainSpecific(reqConnector)) go headProcessor.Start() sub := headProcessor.Subscribe("test") diff --git a/internal/upstreams/chains_specific/aztec_chain_specific.go b/internal/upstreams/chains_specific/aztec_chain_specific.go index fcfd43a..b78e495 100644 --- a/internal/upstreams/chains_specific/aztec_chain_specific.go +++ b/internal/upstreams/chains_specific/aztec_chain_specific.go @@ -5,41 +5,40 @@ import ( "fmt" "github.com/bytedance/sonic" - "github.com/drpcorg/nodecore/internal/config" "github.com/drpcorg/nodecore/internal/protocol" "github.com/drpcorg/nodecore/internal/upstreams/connectors" "github.com/drpcorg/nodecore/internal/upstreams/validations" "github.com/drpcorg/nodecore/pkg/blockchain" - "github.com/drpcorg/nodecore/pkg/chains" ) -var AztecChainSpecific *AztecChainSpecificObject - -func init() { - AztecChainSpecific = &AztecChainSpecificObject{} +type AztecChainSpecificObject struct { + upstreamId string + connector connectors.ApiConnector } -type AztecChainSpecificObject struct { +func NewAztecChainSpecificObject( + upstreamId string, + connector connectors.ApiConnector, +) *AztecChainSpecificObject { + return &AztecChainSpecificObject{ + upstreamId: upstreamId, + connector: connector, + } } var _ ChainSpecific = (*AztecChainSpecificObject)(nil) -func (a *AztecChainSpecificObject) SettingsValidators( - _ string, - _ connectors.ApiConnector, - _ *chains.ConfiguredChain, - _ *config.UpstreamOptions, -) []validations.SettingsValidator { +func (a *AztecChainSpecificObject) SettingsValidators() []validations.SettingsValidator { return nil } -func (a *AztecChainSpecificObject) GetLatestBlock(ctx context.Context, connector connectors.ApiConnector, _ string) (*protocol.Block, error) { +func (a *AztecChainSpecificObject) GetLatestBlock(ctx context.Context) (*protocol.Block, error) { request, err := protocol.NewInternalUpstreamJsonRpcRequest("node_getBlock", []interface{}{"latest"}) if err != nil { return nil, err } - response := connector.SendRequest(ctx, request) + response := a.connector.SendRequest(ctx, request) if response.HasError() { return nil, response.GetError() } @@ -47,7 +46,7 @@ func (a *AztecChainSpecificObject) GetLatestBlock(ctx context.Context, connector return a.ParseBlock(response.ResponseResult()) } -func (a *AztecChainSpecificObject) GetFinalizedBlock(_ context.Context, _ connectors.ApiConnector) (*protocol.Block, error) { +func (a *AztecChainSpecificObject) GetFinalizedBlock(_ context.Context) (*protocol.Block, error) { return nil, nil } @@ -66,7 +65,7 @@ func (a *AztecChainSpecificObject) ParseBlock(blockBytes []byte) (*protocol.Bloc return protocol.NewBlock(height, 0, blockchain.NewHashIdFromString(block.BlockHash), blockchain.EmptyHash), nil } -func (a *AztecChainSpecificObject) ParseSubscriptionBlock(_ []byte, _ connectors.ApiConnector, _ string) (*protocol.Block, error) { +func (a *AztecChainSpecificObject) ParseSubscriptionBlock(_ []byte) (*protocol.Block, error) { return nil, fmt.Errorf("aztec does not support websocket subscriptions") } diff --git a/internal/upstreams/chains_specific/evm_chain_specific.go b/internal/upstreams/chains_specific/evm_chain_specific.go index 5222b34..9ee941d 100644 --- a/internal/upstreams/chains_specific/evm_chain_specific.go +++ b/internal/upstreams/chains_specific/evm_chain_specific.go @@ -15,35 +15,42 @@ import ( ) type ChainSpecific interface { - GetLatestBlock(ctx context.Context, connector connectors.ApiConnector, upstreamId string) (*protocol.Block, error) - GetFinalizedBlock(context.Context, connectors.ApiConnector) (*protocol.Block, error) + GetLatestBlock(ctx context.Context) (*protocol.Block, error) + GetFinalizedBlock(context.Context) (*protocol.Block, error) ParseBlock([]byte) (*protocol.Block, error) - ParseSubscriptionBlock(data []byte, connector connectors.ApiConnector, upstreamId string) (*protocol.Block, error) + ParseSubscriptionBlock(data []byte) (*protocol.Block, error) SubscribeHeadRequest() (protocol.RequestHolder, error) - SettingsValidators(upstreamId string, connector connectors.ApiConnector, chain *chains.ConfiguredChain, options *config.UpstreamOptions) []validations.SettingsValidator -} - -var EvmChainSpecific *EvmChainSpecificObject - -func init() { - EvmChainSpecific = &EvmChainSpecificObject{} + SettingsValidators() []validations.SettingsValidator } type EvmChainSpecificObject struct { + upstreamId string + connector connectors.ApiConnector + chain *chains.ConfiguredChain + options *config.UpstreamOptions } -func (e *EvmChainSpecificObject) SettingsValidators( +func NewEvmChainSpecific( upstreamId string, connector connectors.ApiConnector, chain *chains.ConfiguredChain, options *config.UpstreamOptions, -) []validations.SettingsValidator { +) *EvmChainSpecificObject { + return &EvmChainSpecificObject{ + upstreamId: upstreamId, + connector: connector, + chain: chain, + options: options, + } +} + +func (e *EvmChainSpecificObject) SettingsValidators() []validations.SettingsValidator { settingsValidators := make([]validations.SettingsValidator, 0) - if !*options.DisableChainValidation { - settingsValidators = append(settingsValidators, validations.NewChainValidator(upstreamId, connector, chain, options)) + if !*e.options.DisableChainValidation { + settingsValidators = append(settingsValidators, validations.NewChainValidator(e.upstreamId, e.connector, e.chain, e.options)) } return settingsValidators @@ -51,15 +58,15 @@ func (e *EvmChainSpecificObject) SettingsValidators( var _ ChainSpecific = (*EvmChainSpecificObject)(nil) -func (e *EvmChainSpecificObject) GetLatestBlock(ctx context.Context, connector connectors.ApiConnector, _ string) (*protocol.Block, error) { - return e.getBlockByTag(ctx, connector, rpc.LatestBlockNumber) +func (e *EvmChainSpecificObject) GetLatestBlock(ctx context.Context) (*protocol.Block, error) { + return e.getBlockByTag(ctx, e.connector, rpc.LatestBlockNumber) } -func (e *EvmChainSpecificObject) GetFinalizedBlock(ctx context.Context, connector connectors.ApiConnector) (*protocol.Block, error) { - return e.getBlockByTag(ctx, connector, rpc.FinalizedBlockNumber) +func (e *EvmChainSpecificObject) GetFinalizedBlock(ctx context.Context) (*protocol.Block, error) { + return e.getBlockByTag(ctx, e.connector, rpc.FinalizedBlockNumber) } -func (e *EvmChainSpecificObject) ParseSubscriptionBlock(blockBytes []byte, _ connectors.ApiConnector, _ string) (*protocol.Block, error) { +func (e *EvmChainSpecificObject) ParseSubscriptionBlock(blockBytes []byte) (*protocol.Block, error) { return e.ParseBlock(blockBytes) } diff --git a/internal/upstreams/chains_specific/evm_chain_specific_test.go b/internal/upstreams/chains_specific/evm_chain_specific_test.go index 2f529c4..4a7dc2e 100644 --- a/internal/upstreams/chains_specific/evm_chain_specific_test.go +++ b/internal/upstreams/chains_specific/evm_chain_specific_test.go @@ -11,6 +11,7 @@ import ( "github.com/drpcorg/nodecore/internal/upstreams/validations" "github.com/drpcorg/nodecore/pkg/blockchain" "github.com/drpcorg/nodecore/pkg/chains" + "github.com/drpcorg/nodecore/pkg/test_utils" "github.com/drpcorg/nodecore/pkg/test_utils/mocks" "github.com/samber/lo" "github.com/stretchr/testify/assert" @@ -24,7 +25,7 @@ func TestChainValidator(t *testing.T) { } connector := mocks.NewConnectorMock() - validators := specific.EvmChainSpecific.SettingsValidators("id", connector, chains.UnknownChain, options) + validators := specific.NewEvmChainSpecific("id", connector, chains.UnknownChain, options).SettingsValidators() _, ok := lo.Find(validators, func(item validations.SettingsValidator) bool { _, ok := item.(*validations.ChainValidator) @@ -33,7 +34,7 @@ func TestChainValidator(t *testing.T) { assert.True(t, ok) options.DisableChainValidation = lo.ToPtr(true) - validators = specific.EvmChainSpecific.SettingsValidators("id", connector, chains.UnknownChain, options) + validators = specific.NewEvmChainSpecific("id", connector, chains.UnknownChain, options).SettingsValidators() _, ok = lo.Find(validators, func(item validations.SettingsValidator) bool { _, ok := item.(*validations.ChainValidator) @@ -44,7 +45,7 @@ func TestChainValidator(t *testing.T) { } func TestEvmSubscribeHeadRequest(t *testing.T) { - req, err := specific.EvmChainSpecific.SubscribeHeadRequest() + req, err := test_utils.NewEvmChainSpecific(nil).SubscribeHeadRequest() assert.Nil(t, err) body, reqErr := req.Body() @@ -63,7 +64,7 @@ func TestEvmParseSubBLock(t *testing.T) { "parentHash": "0x1eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d11" }`) - block, err := specific.EvmChainSpecific.ParseSubscriptionBlock(body, nil, "") + block, err := test_utils.NewEvmChainSpecific(nil).ParseSubscriptionBlock(body) assert.Nil(t, err) expected := &protocol.BlockData{ @@ -90,7 +91,7 @@ func TestEvmGetLatestBlock(t *testing.T) { connector.On("SendRequest", ctx, mock.Anything).Return(response) - block, err := specific.EvmChainSpecific.GetLatestBlock(ctx, connector, "") + block, err := test_utils.NewEvmChainSpecific(connector).GetLatestBlock(ctx) assert.Nil(t, err) connector.AssertExpectations(t) @@ -110,7 +111,7 @@ func TestEvmGetLatestBlockWithError(t *testing.T) { connector.On("SendRequest", ctx, mock.Anything).Return(response) - block, err := specific.EvmChainSpecific.GetLatestBlock(ctx, connector, "") + block, err := test_utils.NewEvmChainSpecific(connector).GetLatestBlock(ctx) connector.AssertExpectations(t) assert.Nil(t, block) diff --git a/internal/upstreams/chains_specific/solana_chain_specific.go b/internal/upstreams/chains_specific/solana_chain_specific.go index 631f782..0d7b6f9 100644 --- a/internal/upstreams/chains_specific/solana_chain_specific.go +++ b/internal/upstreams/chains_specific/solana_chain_specific.go @@ -4,14 +4,13 @@ import ( "context" "encoding/binary" "fmt" + "time" "github.com/bytedance/sonic" - "github.com/drpcorg/nodecore/internal/config" "github.com/drpcorg/nodecore/internal/protocol" "github.com/drpcorg/nodecore/internal/upstreams/connectors" "github.com/drpcorg/nodecore/internal/upstreams/validations" "github.com/drpcorg/nodecore/pkg/blockchain" - "github.com/drpcorg/nodecore/pkg/chains" "github.com/drpcorg/nodecore/pkg/utils" "github.com/rs/zerolog/log" "github.com/samber/lo" @@ -19,40 +18,34 @@ import ( const checkInterval = 5 -var SolanaChainSpecific *SolanaChainSpecificObject +type SolanaChainSpecificObject struct { + upstreamId string + connector connectors.ApiConnector + lastKnownHeights *utils.CMap[string, uint64] + lastCheckedSlots *utils.CMap[string, uint64] +} -func init() { - SolanaChainSpecific = &SolanaChainSpecificObject{ +func NewSolanaChainSpecificObject( + upstreamId string, + connector connectors.ApiConnector, +) *SolanaChainSpecificObject { + return &SolanaChainSpecificObject{ + upstreamId: upstreamId, + connector: connector, lastKnownHeights: utils.NewCMap[string, uint64](), lastCheckedSlots: utils.NewCMap[string, uint64](), } } -type SolanaChainSpecificObject struct { - lastKnownHeights *utils.CMap[string, uint64] - lastCheckedSlots *utils.CMap[string, uint64] -} - -func (s *SolanaChainSpecificObject) SettingsValidators( - _ string, - _ connectors.ApiConnector, - _ *chains.ConfiguredChain, - _ *config.UpstreamOptions, -) []validations.SettingsValidator { +func (s *SolanaChainSpecificObject) SettingsValidators() []validations.SettingsValidator { return nil } -var _ ChainSpecific = (*SolanaChainSpecificObject)(nil) - -func (s *SolanaChainSpecificObject) GetLatestBlock( - ctx context.Context, - connector connectors.ApiConnector, - upstreamId string, -) (*protocol.Block, error) { - return s.getEpochInfo(ctx, connector, upstreamId) +func (s *SolanaChainSpecificObject) GetLatestBlock(ctx context.Context) (*protocol.Block, error) { + return s.getEpochInfo(ctx) } -func (s *SolanaChainSpecificObject) GetFinalizedBlock(_ context.Context, _ connectors.ApiConnector) (*protocol.Block, error) { +func (s *SolanaChainSpecificObject) GetFinalizedBlock(_ context.Context) (*protocol.Block, error) { // TODO: implement get block/slot with finalized commitment return nil, nil } @@ -67,23 +60,21 @@ func (s *SolanaChainSpecificObject) ParseBlock(blockBytes []byte) (*protocol.Blo return createNewSolanaBlock(epochInfo.BlockHeight, epochInfo.AbsoluteSlot), nil } -func (s *SolanaChainSpecificObject) ParseSubscriptionBlock( - blockBytes []byte, - connector connectors.ApiConnector, - upstreamId string, -) (*protocol.Block, error) { +func (s *SolanaChainSpecificObject) ParseSubscriptionBlock(blockBytes []byte) (*protocol.Block, error) { slotEvent := SolanaSlotEvent{} err := sonic.Unmarshal(blockBytes, &slotEvent) if err != nil { return nil, err } - lastSlot, _ := s.lastCheckedSlots.Load(upstreamId) - lastHeight, _ := s.lastKnownHeights.Load(upstreamId) + lastSlot, _ := s.lastCheckedSlots.Load(s.upstreamId) + lastHeight, _ := s.lastKnownHeights.Load(s.upstreamId) shouldCheck := slotEvent.Slot >= lastSlot && slotEvent.Slot-lastSlot >= checkInterval estimatedHeight := lo.Ternary(lastHeight != 0 && lastSlot != 0, lastHeight+(slotEvent.Slot-lastSlot), 0) if shouldCheck || estimatedHeight == 0 { - block, err := s.getEpochInfo(context.Background(), connector, upstreamId) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + block, err := s.getEpochInfo(ctx) if err != nil { var height uint64 if estimatedHeight != 0 { @@ -95,7 +86,7 @@ func (s *SolanaChainSpecificObject) ParseSubscriptionBlock( height = slotEvent.Slot } } - log.Err(err).Msgf("couldn't get the epoch info for upstream %s, using the estimated height %d, slot %d", upstreamId, height, slotEvent.Slot) + log.Err(err).Msgf("couldn't get the epoch info for upstream %s, using the estimated height %d, slot %d", s.upstreamId, height, slotEvent.Slot) return createNewSolanaBlock(height, slotEvent.Slot), nil } return createNewSolanaBlock(block.BlockData.Height, block.BlockData.Slot), nil @@ -107,16 +98,12 @@ func (s *SolanaChainSpecificObject) SubscribeHeadRequest() (protocol.RequestHold return protocol.NewInternalSubUpstreamJsonRpcRequest("slotSubscribe", nil) } -func (s *SolanaChainSpecificObject) getEpochInfo( - ctx context.Context, - connector connectors.ApiConnector, - upstreamId string, -) (*protocol.Block, error) { +func (s *SolanaChainSpecificObject) getEpochInfo(ctx context.Context) (*protocol.Block, error) { request, err := protocol.NewInternalUpstreamJsonRpcRequest("getEpochInfo", nil) if err != nil { return nil, err } - response := connector.SendRequest(ctx, request) + response := s.connector.SendRequest(ctx, request) if response.HasError() { return nil, response.GetError() } @@ -125,8 +112,8 @@ func (s *SolanaChainSpecificObject) getEpochInfo( return nil, err } - s.lastKnownHeights.Store(upstreamId, block.BlockData.Height) - s.lastCheckedSlots.Store(upstreamId, block.BlockData.Slot) + s.lastKnownHeights.Store(s.upstreamId, block.BlockData.Height) + s.lastCheckedSlots.Store(s.upstreamId, block.BlockData.Slot) return block, nil } @@ -156,3 +143,5 @@ type SolanaEpochInfo struct { type SolanaSlotEvent struct { Slot uint64 `json:"slot"` } + +var _ ChainSpecific = (*SolanaChainSpecificObject)(nil) diff --git a/internal/upstreams/chains_specific/solana_chain_specific_test.go b/internal/upstreams/chains_specific/solana_chain_specific_test.go index d2d3d6f..5fcd0bf 100644 --- a/internal/upstreams/chains_specific/solana_chain_specific_test.go +++ b/internal/upstreams/chains_specific/solana_chain_specific_test.go @@ -14,7 +14,7 @@ import ( ) func TestSolanaSubscribeHeadRequest(t *testing.T) { - req, err := specific.SolanaChainSpecific.SubscribeHeadRequest() + req, err := specific.NewSolanaChainSpecificObject("id", nil).SubscribeHeadRequest() assert.Nil(t, err) body, err := req.Body() @@ -43,9 +43,9 @@ func TestSolanaParseSubBlockErrEpochInfo(t *testing.T) { }`) epochResponse := protocol.NewHttpUpstreamResponse("1", body, 200, protocol.JsonRpc) - connector.On("SendRequest", context.Background(), mock.Anything).Return(epochResponse) + connector.On("SendRequest", mock.Anything, mock.Anything).Return(epochResponse) - block, err := specific.SolanaChainSpecific.ParseSubscriptionBlock(slot, connector, "up1") + block, err := specific.NewSolanaChainSpecificObject("id", connector).ParseSubscriptionBlock(slot) assert.Nil(t, err) connector.AssertExpectations(t) @@ -57,6 +57,7 @@ func TestSolanaParseSubBlockErrEpochInfo(t *testing.T) { func TestSolanaParseSubBLock(t *testing.T) { connector := mocks.NewConnectorMock() + solanaSpecific := specific.NewSolanaChainSpecificObject("id", connector) body := []byte(`{ "slot": 405220706, "parent": 405220705, @@ -81,9 +82,9 @@ func TestSolanaParseSubBLock(t *testing.T) { }`) epochResponse := protocol.NewHttpUpstreamResponse("1", epochBody, 200, protocol.JsonRpc) - connector.On("SendRequest", context.Background(), mock.Anything).Return(epochResponse) + connector.On("SendRequest", mock.Anything, mock.Anything).Return(epochResponse) - block, err := specific.SolanaChainSpecific.ParseSubscriptionBlock(body, connector, "up1") + block, err := solanaSpecific.ParseSubscriptionBlock(body) assert.Nil(t, err) connector.AssertExpectations(t) @@ -92,7 +93,7 @@ func TestSolanaParseSubBLock(t *testing.T) { blockData := protocol.NewBlockData(383325939, 405219988, hash, parentHash) assert.Equal(t, blockData, block.BlockData) - block, err = specific.SolanaChainSpecific.ParseSubscriptionBlock(body1, connector, "up1") + block, err = solanaSpecific.ParseSubscriptionBlock(body1) assert.Nil(t, err) hash, parentHash = specific.SyntheticHashes(405219989, 405219988) @@ -121,7 +122,7 @@ func TestSolanaGetLatestBlock(t *testing.T) { connector.On("SendRequest", ctx, mock.Anything).Return(epochResponse) - block, err := specific.SolanaChainSpecific.GetLatestBlock(ctx, connector, "") + block, err := specific.NewSolanaChainSpecificObject("id", connector).GetLatestBlock(ctx) assert.Nil(t, err) connector.AssertExpectations(t) @@ -139,7 +140,7 @@ func TestSolanaGetLatestBlockWithError(t *testing.T) { connector.On("SendRequest", ctx, mock.Anything).Return(response) - block, err := specific.SolanaChainSpecific.GetLatestBlock(ctx, connector, "") + block, err := specific.NewSolanaChainSpecificObject("id", connector).GetLatestBlock(ctx) connector.AssertExpectations(t) assert.Nil(t, block) diff --git a/internal/upstreams/upstream.go b/internal/upstreams/upstream.go index 24e3bf4..1ed05e1 100644 --- a/internal/upstreams/upstream.go +++ b/internal/upstreams/upstream.go @@ -103,7 +103,7 @@ func NewBaseUpstream( cancel() return nil, err } - chainSpecific := getChainSpecific(configuredChain.Type) + chainSpecific := getChainSpecific(conf, upstreamConnectorsInfo, configuredChain) upstreamMethods, err := methods.NewUpstreamMethods(configuredChain.MethodSpec, conf.Methods) if err != nil { @@ -147,9 +147,9 @@ func NewBaseUpstream( upstreamIndexHex: upstreamIndexHex, upConfig: conf, - headProcessor: blocks.NewHeadProcessor(ctx, conf, upstreamConnectorsInfo.internalRequestConnector, upstreamConnectorsInfo.headConnector, chainSpecific), + headProcessor: blocks.NewHeadProcessor(ctx, conf, upstreamConnectorsInfo.headConnector, chainSpecific), blockProcessor: createBlockProcessor(ctx, conf, upstreamConnectorsInfo.internalRequestConnector, chainSpecific, configuredChain.Type), - settingsValidationProcessor: createSettingValidationProcessor(conf.Id, upstreamConnectorsInfo.internalRequestConnector, configuredChain, chainSpecific, conf.Options), + settingsValidationProcessor: createSettingValidationProcessor(chainSpecific), }, nil } @@ -387,14 +387,8 @@ func createConnector( } } -func createSettingValidationProcessor( - upstreamId string, - connector connectors.ApiConnector, - configuredChain *chains.ConfiguredChain, - chainSpecific specific.ChainSpecific, - options *config.UpstreamOptions, -) *validations.SettingsValidationProcessor { - validators := chainSpecific.SettingsValidators(upstreamId, connector, configuredChain, options) +func createSettingValidationProcessor(chainSpecific specific.ChainSpecific) *validations.SettingsValidationProcessor { + validators := chainSpecific.SettingsValidators() if len(validators) == 0 { return nil } @@ -416,17 +410,21 @@ func createBlockProcessor( } } -func getChainSpecific(blockchainType chains.BlockchainType) specific.ChainSpecific { +func getChainSpecific( + conf *config.Upstream, + upstreamConnectorsInfo *connectorsInfo, + configuredChain *chains.ConfiguredChain, +) specific.ChainSpecific { //TODO: there might be a few protocols a chain can work with, so it will be necessary to implement all of them - switch blockchainType { + switch configuredChain.Type { case chains.Ethereum: - return specific.EvmChainSpecific + return specific.NewEvmChainSpecific(conf.Id, upstreamConnectorsInfo.internalRequestConnector, configuredChain, conf.Options) case chains.Aztec: - return specific.AztecChainSpecific + return specific.NewAztecChainSpecificObject(conf.Id, upstreamConnectorsInfo.internalRequestConnector) case chains.Solana: - return specific.SolanaChainSpecific + return specific.NewSolanaChainSpecificObject(conf.Id, upstreamConnectorsInfo.internalRequestConnector) default: - panic(fmt.Sprintf("unknown blockchain type - %s", blockchainType)) + panic(fmt.Sprintf("unknown blockchain type - %s", configuredChain.Type)) } } diff --git a/internal/upstreams/upstream_test.go b/internal/upstreams/upstream_test.go index b93d1b9..04d5424 100644 --- a/internal/upstreams/upstream_test.go +++ b/internal/upstreams/upstream_test.go @@ -11,7 +11,6 @@ import ( "github.com/drpcorg/nodecore/internal/protocol" "github.com/drpcorg/nodecore/internal/upstreams" "github.com/drpcorg/nodecore/internal/upstreams/blocks" - specific "github.com/drpcorg/nodecore/internal/upstreams/chains_specific" "github.com/drpcorg/nodecore/internal/upstreams/methods" "github.com/drpcorg/nodecore/internal/upstreams/validations" "github.com/drpcorg/nodecore/pkg/blockchain" @@ -122,7 +121,7 @@ func TestUpstreamBlockEvent(t *testing.T) { Options: &config.UpstreamOptions{InternalTimeout: 5 * time.Second}, } - blockProcessor := blocks.NewEthLikeBlockProcessor(ctx, upConfig, connector, &specific.EvmChainSpecificObject{}) + blockProcessor := blocks.NewEthLikeBlockProcessor(ctx, upConfig, connector, test_utils.NewEvmChainSpecific(connector)) upstream := test_utils.TestEvmUpstream(ctx, connector, upConfig, blockProcessor, nil, mocks.NewMethodsMock()) go upstream.Start() diff --git a/pkg/test_utils/test_helpers.go b/pkg/test_utils/test_helpers.go index 38522c9..f7bc52e 100644 --- a/pkg/test_utils/test_helpers.go +++ b/pkg/test_utils/test_helpers.go @@ -169,7 +169,7 @@ func TestEvmUpstream( "id", chains.ETHEREUM, []connectors.ApiConnector{connector}, - blocks.NewHeadProcessor(ctx, upConfig, connector, connector, specific.EvmChainSpecific), + blocks.NewHeadProcessor(ctx, upConfig, connector, NewEvmChainSpecific(connector)), blockProcessor, settingValidationProcessor, upState, @@ -178,6 +178,10 @@ func TestEvmUpstream( ) } +func NewEvmChainSpecific(connector connectors.ApiConnector) *specific.EvmChainSpecificObject { + return specific.NewEvmChainSpecific("id", connector, chains.GetChain("polygon"), nil) +} + func CreateChainSupervisor() *upstreams.ChainSupervisor { chainSupervisor := upstreams.NewChainSupervisor(context.Background(), chains.ARBITRUM, fork_choice.NewHeightForkChoice(), nil) From 3dba6e448206a870aea598e1f00ebab1e964724a Mon Sep 17 00:00:00 2001 From: KirillPamPam Date: Tue, 10 Mar 2026 13:28:49 +0400 Subject: [PATCH 6/8] Return debug --- internal/upstreams/blocks/head.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/upstreams/blocks/head.go b/internal/upstreams/blocks/head.go index faa0885..9daa9ef 100644 --- a/internal/upstreams/blocks/head.go +++ b/internal/upstreams/blocks/head.go @@ -92,7 +92,7 @@ func (h *HeadProcessor) Start() { return nil case block, ok := <-h.head.HeadsChan(): if ok { - log.Info().Msgf("got a new head of upstream %s - %d", h.upstreamId, block.BlockData.Height) + log.Debug().Msgf("got a new head of upstream %s - %d", h.upstreamId, block.BlockData.Height) h.lastUpdate.Store(time.Now()) h.subManager.Publish(HeadEvent{HeadData: block.BlockData}) } From be0f8053f0dc91e5dd463ba5bc8d2161f53ed550 Mon Sep 17 00:00:00 2001 From: KirillPamPam Date: Tue, 10 Mar 2026 13:48:34 +0400 Subject: [PATCH 7/8] Small fixes --- internal/upstreams/blocks/head.go | 41 +++++++++++-------- internal/upstreams/blocks/head_test.go | 1 + .../chains_specific/solana_chain_specific.go | 9 ++-- .../solana_chain_specific_test.go | 15 +++---- internal/upstreams/upstream.go | 2 +- 5 files changed, 39 insertions(+), 29 deletions(-) diff --git a/internal/upstreams/blocks/head.go b/internal/upstreams/blocks/head.go index 9daa9ef..dc25c2a 100644 --- a/internal/upstreams/blocks/head.go +++ b/internal/upstreams/blocks/head.go @@ -128,10 +128,10 @@ func createHead( switch headConnector.GetType() { case protocol.JsonRpcConnector, protocol.RestConnector: log.Info().Msgf("starting an rpc head of upstream %s with poll interval %s", id, pollInterval) - return newRpcHead(ctx, id, specific, pollInterval, options) + return newRpcHead(ctx, id, options.InternalTimeout, pollInterval, specific) case protocol.WsConnector: log.Info().Msgf("starting a subscription head of upstream %s", id) - return newWsHead(ctx, id, headConnector, specific) + return newWsHead(ctx, id, options.InternalTimeout, headConnector, specific) default: return nil } @@ -174,9 +174,9 @@ var _ Head = (*RpcHead)(nil) func newRpcHead( ctx context.Context, upstreamId string, - chainSpecific specific.ChainSpecific, + internalTimeout, pollInterval time.Duration, - options *config.UpstreamOptions, + chainSpecific specific.ChainSpecific, ) *RpcHead { head := RpcHead{ lifecycle: utils.NewBaseLifecycle(fmt.Sprintf("%s_rpc_head", upstreamId), ctx), @@ -186,7 +186,7 @@ func newRpcHead( upstreamId: upstreamId, pollInProgress: atomic.Bool{}, headsChan: make(chan *protocol.Block), - internalTimeout: options.InternalTimeout, + internalTimeout: internalTimeout, } return &head @@ -236,12 +236,13 @@ func (r *RpcHead) poll() { } type SubscriptionHead struct { - lifecycle *utils.BaseLifecycle - block *utils.Atomic[protocol.Block] - chainSpecific specific.ChainSpecific - headConnector connectors.ApiConnector - upstreamId string - headsChan chan *protocol.Block + lifecycle *utils.BaseLifecycle + block *utils.Atomic[protocol.Block] + chainSpecific specific.ChainSpecific + headConnector connectors.ApiConnector + upstreamId string + headsChan chan *protocol.Block + internalTimeout time.Duration } func (w *SubscriptionHead) Running() bool { @@ -317,7 +318,9 @@ func (w *SubscriptionHead) OnNoHeadUpdates() { } func (w *SubscriptionHead) getLatestBlock() { - block, err := w.chainSpecific.GetLatestBlock(w.lifecycle.GetParentContext()) + ctx, cancel := context.WithTimeout(w.lifecycle.GetParentContext(), w.internalTimeout) + defer cancel() + block, err := w.chainSpecific.GetLatestBlock(ctx) if err != nil { log.Warn().Err(err).Msgf("couldn't get the latest block of upstream %s", w.upstreamId) return @@ -329,16 +332,18 @@ func (w *SubscriptionHead) getLatestBlock() { func newWsHead( ctx context.Context, upstreamId string, + internalTimeout time.Duration, headConnector connectors.ApiConnector, chainSpecific specific.ChainSpecific, ) *SubscriptionHead { head := SubscriptionHead{ - lifecycle: utils.NewBaseLifecycle(fmt.Sprintf("%s_subscription_head", upstreamId), ctx), - upstreamId: upstreamId, - chainSpecific: chainSpecific, - headConnector: headConnector, - block: utils.NewAtomic[protocol.Block](), - headsChan: make(chan *protocol.Block), + lifecycle: utils.NewBaseLifecycle(fmt.Sprintf("%s_subscription_head", upstreamId), ctx), + upstreamId: upstreamId, + chainSpecific: chainSpecific, + headConnector: headConnector, + internalTimeout: internalTimeout, + block: utils.NewAtomic[protocol.Block](), + headsChan: make(chan *protocol.Block), } return &head diff --git a/internal/upstreams/blocks/head_test.go b/internal/upstreams/blocks/head_test.go index 4d2fd16..c8a15d1 100644 --- a/internal/upstreams/blocks/head_test.go +++ b/internal/upstreams/blocks/head_test.go @@ -114,6 +114,7 @@ func TestWsHead(t *testing.T) { ChainName: "ethereum", Id: "id", PollInterval: 10 * time.Millisecond, + Options: &config.UpstreamOptions{InternalTimeout: 5 * time.Second}, } headProcessor := blocks.NewHeadProcessor(ctx, &upConfig, connector, test_utils.NewEvmChainSpecific(reqConnector)) go headProcessor.Start() diff --git a/internal/upstreams/chains_specific/solana_chain_specific.go b/internal/upstreams/chains_specific/solana_chain_specific.go index 0d7b6f9..38b1e5c 100644 --- a/internal/upstreams/chains_specific/solana_chain_specific.go +++ b/internal/upstreams/chains_specific/solana_chain_specific.go @@ -21,6 +21,7 @@ const checkInterval = 5 type SolanaChainSpecificObject struct { upstreamId string connector connectors.ApiConnector + internalTimeout time.Duration lastKnownHeights *utils.CMap[string, uint64] lastCheckedSlots *utils.CMap[string, uint64] } @@ -28,10 +29,12 @@ type SolanaChainSpecificObject struct { func NewSolanaChainSpecificObject( upstreamId string, connector connectors.ApiConnector, + internalTimeout time.Duration, ) *SolanaChainSpecificObject { return &SolanaChainSpecificObject{ upstreamId: upstreamId, connector: connector, + internalTimeout: internalTimeout, lastKnownHeights: utils.NewCMap[string, uint64](), lastCheckedSlots: utils.NewCMap[string, uint64](), } @@ -72,9 +75,7 @@ func (s *SolanaChainSpecificObject) ParseSubscriptionBlock(blockBytes []byte) (* estimatedHeight := lo.Ternary(lastHeight != 0 && lastSlot != 0, lastHeight+(slotEvent.Slot-lastSlot), 0) if shouldCheck || estimatedHeight == 0 { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - block, err := s.getEpochInfo(ctx) + block, err := s.getEpochInfo(context.Background()) if err != nil { var height uint64 if estimatedHeight != 0 { @@ -99,6 +100,8 @@ func (s *SolanaChainSpecificObject) SubscribeHeadRequest() (protocol.RequestHold } func (s *SolanaChainSpecificObject) getEpochInfo(ctx context.Context) (*protocol.Block, error) { + ctx, cancel := context.WithTimeout(ctx, s.internalTimeout) + defer cancel() request, err := protocol.NewInternalUpstreamJsonRpcRequest("getEpochInfo", nil) if err != nil { return nil, err diff --git a/internal/upstreams/chains_specific/solana_chain_specific_test.go b/internal/upstreams/chains_specific/solana_chain_specific_test.go index 5fcd0bf..83fc9fa 100644 --- a/internal/upstreams/chains_specific/solana_chain_specific_test.go +++ b/internal/upstreams/chains_specific/solana_chain_specific_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "testing" + "time" "github.com/drpcorg/nodecore/internal/protocol" specific "github.com/drpcorg/nodecore/internal/upstreams/chains_specific" @@ -14,7 +15,7 @@ import ( ) func TestSolanaSubscribeHeadRequest(t *testing.T) { - req, err := specific.NewSolanaChainSpecificObject("id", nil).SubscribeHeadRequest() + req, err := specific.NewSolanaChainSpecificObject("id", nil, 5*time.Second).SubscribeHeadRequest() assert.Nil(t, err) body, err := req.Body() @@ -45,7 +46,7 @@ func TestSolanaParseSubBlockErrEpochInfo(t *testing.T) { connector.On("SendRequest", mock.Anything, mock.Anything).Return(epochResponse) - block, err := specific.NewSolanaChainSpecificObject("id", connector).ParseSubscriptionBlock(slot) + block, err := specific.NewSolanaChainSpecificObject("id", connector, 5*time.Second).ParseSubscriptionBlock(slot) assert.Nil(t, err) connector.AssertExpectations(t) @@ -57,7 +58,7 @@ func TestSolanaParseSubBlockErrEpochInfo(t *testing.T) { func TestSolanaParseSubBLock(t *testing.T) { connector := mocks.NewConnectorMock() - solanaSpecific := specific.NewSolanaChainSpecificObject("id", connector) + solanaSpecific := specific.NewSolanaChainSpecificObject("id", connector, 5*time.Second) body := []byte(`{ "slot": 405220706, "parent": 405220705, @@ -120,9 +121,9 @@ func TestSolanaGetLatestBlock(t *testing.T) { }`) epochResponse := protocol.NewHttpUpstreamResponse("1", epochBody, 200, protocol.JsonRpc) - connector.On("SendRequest", ctx, mock.Anything).Return(epochResponse) + connector.On("SendRequest", mock.Anything, mock.Anything).Return(epochResponse) - block, err := specific.NewSolanaChainSpecificObject("id", connector).GetLatestBlock(ctx) + block, err := specific.NewSolanaChainSpecificObject("id", connector, 5*time.Second).GetLatestBlock(ctx) assert.Nil(t, err) connector.AssertExpectations(t) @@ -138,9 +139,9 @@ func TestSolanaGetLatestBlockWithError(t *testing.T) { connector := mocks.NewConnectorMock() response := protocol.NewHttpUpstreamResponseWithError(protocol.ResponseErrorWithData(1, "block error", nil)) - connector.On("SendRequest", ctx, mock.Anything).Return(response) + connector.On("SendRequest", mock.Anything, mock.Anything).Return(response) - block, err := specific.NewSolanaChainSpecificObject("id", connector).GetLatestBlock(ctx) + block, err := specific.NewSolanaChainSpecificObject("id", connector, 5*time.Second).GetLatestBlock(ctx) connector.AssertExpectations(t) assert.Nil(t, block) diff --git a/internal/upstreams/upstream.go b/internal/upstreams/upstream.go index 1ed05e1..f221a9e 100644 --- a/internal/upstreams/upstream.go +++ b/internal/upstreams/upstream.go @@ -422,7 +422,7 @@ func getChainSpecific( case chains.Aztec: return specific.NewAztecChainSpecificObject(conf.Id, upstreamConnectorsInfo.internalRequestConnector) case chains.Solana: - return specific.NewSolanaChainSpecificObject(conf.Id, upstreamConnectorsInfo.internalRequestConnector) + return specific.NewSolanaChainSpecificObject(conf.Id, upstreamConnectorsInfo.internalRequestConnector, conf.Options.InternalTimeout) default: panic(fmt.Sprintf("unknown blockchain type - %s", configuredChain.Type)) } From e2962c867409f6a7ccd06dec6c0fdd5c1cf82302 Mon Sep 17 00:00:00 2001 From: KirillPamPam Date: Tue, 10 Mar 2026 13:58:54 +0400 Subject: [PATCH 8/8] More delay --- internal/upstreams/chain_supervisor_test.go | 44 ++++++++++----------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/internal/upstreams/chain_supervisor_test.go b/internal/upstreams/chain_supervisor_test.go index 83a5aca..d2d0c47 100644 --- a/internal/upstreams/chain_supervisor_test.go +++ b/internal/upstreams/chain_supervisor_test.go @@ -24,19 +24,19 @@ func TestChainSupervisorUpdateHeadWithHeightFc(t *testing.T) { go chainSupervisor.Start() chainSupervisor.Publish(test_utils.CreateEvent("id", protocol.Available, 100, methodsMock)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, uint64(100), chainSupervisor.GetChainState().HeadData.Head) chainSupervisor.Publish(test_utils.CreateEvent("id1", protocol.Available, 95, methodsMock)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, uint64(100), chainSupervisor.GetChainState().HeadData.Head) chainSupervisor.Publish(test_utils.CreateEvent("id3", protocol.Unavailable, 500, methodsMock)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, uint64(100), chainSupervisor.GetChainState().HeadData.Head) chainSupervisor.Publish(test_utils.CreateEvent("id", protocol.Available, 1000, methodsMock)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, uint64(1000), chainSupervisor.GetChainState().HeadData.Head) } @@ -54,14 +54,14 @@ func TestChainSupervisorTrackLags(t *testing.T) { blockInfo2.AddBlock(protocol.NewBlockDataWithHeight(700), protocol.FinalizedBlock) chainSupervisor.Publish(test_utils.CreateEventWithBlockData("id1", protocol.Available, 100, methodsMock, blockInfo1)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) chainDims1 := tracker.GetChainDimensions(chains.ARBITRUM, "id1") assert.Equal(t, uint64(0), chainDims1.GetHeadLag()) assert.Equal(t, uint64(0), chainDims1.GetFinalizationLag()) chainSupervisor.Publish(test_utils.CreateEventWithBlockData("id2", protocol.Available, 300, methodsMock, blockInfo2)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) chainDims1 = tracker.GetChainDimensions(chains.ARBITRUM, "id1") chainDims2 := tracker.GetChainDimensions(chains.ARBITRUM, "id2") @@ -80,19 +80,19 @@ func TestChainSupervisorUpdateStatus(t *testing.T) { go chainSupervisor.Start() chainSupervisor.Publish(test_utils.CreateEvent("id", protocol.Available, 100, methodsMock)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, protocol.Available, chainSupervisor.GetChainState().Status) chainSupervisor.Publish(test_utils.CreateEvent("id1", protocol.Unavailable, 95, methodsMock)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, protocol.Available, chainSupervisor.GetChainState().Status) chainSupervisor.Publish(test_utils.CreateEvent("id", protocol.Unavailable, 500, methodsMock)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, protocol.Unavailable, chainSupervisor.GetChainState().Status) chainSupervisor.Publish(test_utils.CreateEvent("id12", protocol.Available, 95, methodsMock)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, protocol.Available, chainSupervisor.GetChainState().Status) } @@ -108,19 +108,19 @@ func TestChainSupervisorUnionUpstreamMethods(t *testing.T) { go chainSupervisor.Start() chainSupervisor.Publish(test_utils.CreateEvent("id", protocol.Available, 100, methods1)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, mapset.NewThreadUnsafeSet[string]("test1"), chainSupervisor.GetChainState().Methods.GetSupportedMethods()) chainSupervisor.Publish(test_utils.CreateEvent("id2", protocol.Available, 100, methods2)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, mapset.NewThreadUnsafeSet[string]("test1", "test2"), chainSupervisor.GetChainState().Methods.GetSupportedMethods()) chainSupervisor.Publish(test_utils.CreateEvent("id1", protocol.Available, 100, methods3)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, mapset.NewThreadUnsafeSet[string]("test1", "test2", "test5"), chainSupervisor.GetChainState().Methods.GetSupportedMethods()) chainSupervisor.Publish(test_utils.CreateEvent("id", protocol.Unavailable, 100, methods1)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, mapset.NewThreadUnsafeSet[string]("test2", "test5"), chainSupervisor.GetChainState().Methods.GetSupportedMethods()) } @@ -135,35 +135,35 @@ func TestChainSupervisorUnionUpstreamBlockInfo(t *testing.T) { go chainSupervisor.Start() chainSupervisor.Publish(test_utils.CreateEventWithBlockData("id", protocol.Available, 100, methods, blockInfo1)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, uint64(1000), chainSupervisor.GetChainState().Blocks[protocol.FinalizedBlock].Height) blockInfo1.AddBlock(protocol.NewBlockDataWithHeight(2000), protocol.FinalizedBlock) chainSupervisor.Publish(test_utils.CreateEventWithBlockData("id", protocol.Available, 100, methods, blockInfo1)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, uint64(2000), chainSupervisor.GetChainState().Blocks[protocol.FinalizedBlock].Height) blockInfo2 := protocol.NewBlockInfo() blockInfo2.AddBlock(protocol.NewBlockDataWithHeight(500), protocol.FinalizedBlock) chainSupervisor.Publish(test_utils.CreateEventWithBlockData("id1", protocol.Available, 100, methods, blockInfo2)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, uint64(2000), chainSupervisor.GetChainState().Blocks[protocol.FinalizedBlock].Height) blockInfo3 := protocol.NewBlockInfo() blockInfo3.AddBlock(protocol.NewBlockDataWithHeight(50000), protocol.FinalizedBlock) chainSupervisor.Publish(test_utils.CreateEventWithBlockData("id5", protocol.Available, 100, methods, blockInfo3)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, uint64(50000), chainSupervisor.GetChainState().Blocks[protocol.FinalizedBlock].Height) chainSupervisor.Publish(test_utils.CreateEventWithBlockData("id5", protocol.Unavailable, 100, methods, blockInfo3)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, uint64(2000), chainSupervisor.GetChainState().Blocks[protocol.FinalizedBlock].Height) chainSupervisor.Publish(test_utils.CreateEventWithBlockData("id", protocol.Unavailable, 100, methods, blockInfo3)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, uint64(500), chainSupervisor.GetChainState().Blocks[protocol.FinalizedBlock].Height) } @@ -175,12 +175,12 @@ func TestChainSupervisorRemoveUpstreamState(t *testing.T) { go chainSupervisor.Start() chainSupervisor.Publish(test_utils.CreateEvent("id", protocol.Available, 100, methods)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.True(t, chainSupervisor.GetUpstreamState("id") != nil) chainSupervisor.Publish(test_utils.CreateRemoveEvent("id")) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Nil(t, chainSupervisor.GetUpstreamState("id")) }