diff --git a/internal/config/defaults.go b/internal/config/defaults.go index 8da240a..2051fee 100644 --- a/internal/config/defaults.go +++ b/internal/config/defaults.go @@ -276,16 +276,8 @@ func (u *Upstream) setDefaults(defaults *ChainDefaults) { } } if u.HeadConnector == "" && len(u.Connectors) > 0 { - filteredConnectors := lo.Filter(u.Connectors, func(item *ApiConnectorConfig, index int) bool { - _, ok := connectorTypesRating[item.Type] - return ok - }) - - if len(filteredConnectors) > 0 { - defaultHeadConnectorType := lo.MinBy(filteredConnectors, func(a *ApiConnectorConfig, b *ApiConnectorConfig) bool { - return connectorTypesRating[a.Type] < connectorTypesRating[b.Type] - }).Type - u.HeadConnector = defaultHeadConnectorType + if headConnector := u.GetBestConnector(); headConnector != "" { + u.HeadConnector = headConnector } } if u.RateLimitAutoTune != nil { diff --git a/internal/config/upstream_config.go b/internal/config/upstream_config.go index 3c4e9d4..9d28276 100644 --- a/internal/config/upstream_config.go +++ b/internal/config/upstream_config.go @@ -39,6 +39,20 @@ type Upstream struct { RateLimitAutoTune *RateLimitAutoTuneConfig `yaml:"rate-limit-auto-tune"` } +func (u *Upstream) GetBestConnector() ApiConnectorType { + filteredConnectors := lo.Filter(u.Connectors, func(item *ApiConnectorConfig, index int) bool { + _, ok := connectorTypesRating[item.Type] + return ok + }) + + if len(filteredConnectors) > 0 { + return lo.MinBy(filteredConnectors, func(a *ApiConnectorConfig, b *ApiConnectorConfig) bool { + return connectorTypesRating[a.Type] < connectorTypesRating[b.Type] + }).Type + } + return "" +} + type UpstreamOptions struct { InternalTimeout time.Duration `yaml:"internal-timeout"` ValidationInterval time.Duration `yaml:"validation-interval"` diff --git a/internal/protocol/blocks.go b/internal/protocol/blocks.go index a824d4a..51b93a2 100644 --- a/internal/protocol/blocks.go +++ b/internal/protocol/blocks.go @@ -1,33 +1,48 @@ package protocol +import "github.com/drpcorg/nodecore/pkg/blockchain" + type Block struct { BlockData *BlockData } type BlockData struct { - Height uint64 - Slot uint64 - Hash string + Height uint64 + Slot uint64 + Hash blockchain.HashId + ParentHash blockchain.HashId } func (b *BlockData) IsEmpty() bool { - return b.Height == 0 && b.Slot == 0 && b.Hash == "" + return b.Height == 0 && b.Slot == 0 && len(b.Hash) == 0 && len(b.ParentHash) == 0 } func NewBlockDataWithHeight(height uint64) *BlockData { return &BlockData{Height: height} } -func NewBlockData(height, slot uint64, hash string) *BlockData { - return &BlockData{Height: height, Slot: slot, Hash: hash} +func NewBlockData(height, slot uint64, hash, parentHash blockchain.HashId) *BlockData { + return &BlockData{ + Height: height, + Slot: slot, + Hash: hash, + ParentHash: parentHash, + } } -func NewBlock(height, slot uint64, hash string) *Block { +func NewBlock(height, slot uint64, hash, parentHash blockchain.HashId) *Block { return &Block{ BlockData: &BlockData{ - Height: height, - Slot: slot, - Hash: hash, + Height: height, + Slot: slot, + Hash: hash, + ParentHash: parentHash, }, } } + +func NewBlockWithHeights(height, slot uint64) *Block { + return &Block{ + BlockData: &BlockData{Height: height, Slot: slot}, + } +} diff --git a/internal/protocol/blocks_test.go b/internal/protocol/blocks_test.go index e0eee16..92c4172 100644 --- a/internal/protocol/blocks_test.go +++ b/internal/protocol/blocks_test.go @@ -4,13 +4,24 @@ import ( "testing" "github.com/drpcorg/nodecore/internal/protocol" + "github.com/drpcorg/nodecore/pkg/blockchain" "github.com/stretchr/testify/assert" ) func TestBlockCreation(t *testing.T) { - block := protocol.NewBlock(uint64(15), uint64(55), "hash") + block := protocol.NewBlock( + uint64(15), + uint64(55), + blockchain.NewHashIdFromString("0x2345"), + blockchain.NewHashIdFromString("0x1111"), + ) expectedBlock := protocol.Block{ - BlockData: &protocol.BlockData{Height: uint64(15), Slot: uint64(55), Hash: "hash"}, + BlockData: &protocol.BlockData{ + Height: uint64(15), + Slot: uint64(55), + Hash: blockchain.NewHashIdFromString("0x2345"), + ParentHash: blockchain.NewHashIdFromString("0x1111"), + }, } assert.Equal(t, &expectedBlock, block) @@ -47,7 +58,7 @@ func TestBlockData(t *testing.T) { }, { name: "data with hash", - blockData: protocol.BlockData{Hash: "hash"}, + blockData: protocol.BlockData{Hash: blockchain.NewHashIdFromString("0x2345")}, expected: false, }, { @@ -57,12 +68,12 @@ func TestBlockData(t *testing.T) { }, { name: "data with height and hash", - blockData: protocol.BlockData{Height: uint64(12), Hash: "hash"}, + blockData: protocol.BlockData{Height: uint64(12), Hash: blockchain.NewHashIdFromString("0x2345")}, expected: false, }, { name: "data with slot and hash", - blockData: protocol.BlockData{Slot: uint64(12), Hash: "hash"}, + blockData: protocol.BlockData{Slot: uint64(12), Hash: blockchain.NewHashIdFromString("0x2345")}, expected: false, }, } diff --git a/internal/protocol/data_test.go b/internal/protocol/data_test.go index b2f808c..c823d70 100644 --- a/internal/protocol/data_test.go +++ b/internal/protocol/data_test.go @@ -6,6 +6,7 @@ import ( mapset "github.com/deckarep/golang-set/v2" "github.com/drpcorg/nodecore/internal/protocol" + "github.com/drpcorg/nodecore/pkg/blockchain" "github.com/drpcorg/nodecore/pkg/test_utils/mocks" "github.com/stretchr/testify/assert" ) @@ -114,8 +115,8 @@ func TestDefaultUpstreamState(t *testing.T) { func TestBlockInfo(t *testing.T) { blockInfo := protocol.NewBlockInfo() - blockData := &protocol.BlockData{Height: uint64(75), Hash: "hash"} - blockData2 := &protocol.BlockData{Height: uint64(86), Hash: "hash1", Slot: uint64(25)} + blockData := &protocol.BlockData{Height: uint64(75), Hash: blockchain.NewHashIdFromString("0x2345")} + blockData2 := &protocol.BlockData{Height: uint64(86), Hash: blockchain.NewHashIdFromString("0x2345"), Slot: uint64(25)} assert.NotNil(t, blockInfo) blockInfo.AddBlock(blockData, protocol.FinalizedBlock) diff --git a/internal/upstreams/blocks/block_processor.go b/internal/upstreams/blocks/block_processor.go index b6d3e8a..006600c 100644 --- a/internal/upstreams/blocks/block_processor.go +++ b/internal/upstreams/blocks/block_processor.go @@ -116,7 +116,7 @@ func (b *EthLikeBlockProcessor) poll(blockType protocol.BlockType) { ctx, cancel := context.WithTimeout(b.lifecycle.GetParentContext(), b.internalTimeout) defer cancel() - block, err := b.chainSpecific.GetFinalizedBlock(ctx, b.connector) + block, err := b.chainSpecific.GetFinalizedBlock(ctx) if err != nil { var respErr *protocol.ResponseError if errors.As(err, &respErr) { diff --git a/internal/upstreams/blocks/block_processor_test.go b/internal/upstreams/blocks/block_processor_test.go index 7b1d60c..bada4e8 100644 --- a/internal/upstreams/blocks/block_processor_test.go +++ b/internal/upstreams/blocks/block_processor_test.go @@ -8,7 +8,8 @@ import ( "github.com/drpcorg/nodecore/internal/config" "github.com/drpcorg/nodecore/internal/protocol" "github.com/drpcorg/nodecore/internal/upstreams/blocks" - specific "github.com/drpcorg/nodecore/internal/upstreams/chains_specific" + "github.com/drpcorg/nodecore/pkg/blockchain" + "github.com/drpcorg/nodecore/pkg/test_utils" "github.com/drpcorg/nodecore/pkg/test_utils/mocks" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -22,14 +23,15 @@ func TestEthLikeBlockProcessorGetFinalizedBlock(t *testing.T) { "jsonrpc": "2.0", "result": { "number": "0x41fd60b", - "hash": "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18" + "hash": "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18", + "parentHash": "0x1eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d11" } }`) response := protocol.NewHttpUpstreamResponse("1", body, 200, protocol.JsonRpc) connector.On("SendRequest", mock.Anything, mock.Anything).Return(response) - processor := blocks.NewEthLikeBlockProcessor(ctx, upConfig, connector, specific.EvmChainSpecific) + processor := blocks.NewEthLikeBlockProcessor(ctx, upConfig, connector, test_utils.NewEvmChainSpecific(connector)) go processor.Start() sub := processor.Subscribe("sub") @@ -37,8 +39,9 @@ func TestEthLikeBlockProcessorGetFinalizedBlock(t *testing.T) { expected := blocks.BlockEvent{ BlockData: &protocol.BlockData{ - Height: uint64(69195275), - Hash: "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18", + Height: uint64(69195275), + Hash: blockchain.NewHashIdFromString("0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18"), + ParentHash: blockchain.NewHashIdFromString("0x1eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d11"), }, BlockType: protocol.FinalizedBlock, } @@ -79,7 +82,7 @@ func TestEthLikeBlockProcessorDisableFinalizedBlock(t *testing.T) { connector.On("SendRequest", mock.Anything, mock.Anything).Return(response) - processor := blocks.NewEthLikeBlockProcessor(ctx, upConfig, connector, specific.EvmChainSpecific) + processor := blocks.NewEthLikeBlockProcessor(ctx, upConfig, connector, test_utils.NewEvmChainSpecific(connector)) go processor.Start() sub := processor.Subscribe("sub") diff --git a/internal/upstreams/blocks/head.go b/internal/upstreams/blocks/head.go index b972745..dc25c2a 100644 --- a/internal/upstreams/blocks/head.go +++ b/internal/upstreams/blocks/head.go @@ -34,11 +34,11 @@ var _ utils.Lifecycle = (*HeadProcessor)(nil) func NewHeadProcessor( ctx context.Context, upConfig *config.Upstream, - apiConnector connectors.ApiConnector, + headConnector connectors.ApiConnector, specific specific.ChainSpecific, ) *HeadProcessor { configuredChain := chains.GetChain(upConfig.ChainName) - head := createHead(ctx, upConfig.Id, upConfig.PollInterval, apiConnector, specific, upConfig.Options) + head := createHead(ctx, upConfig.Id, upConfig.PollInterval, headConnector, specific, upConfig.Options) headNoUpdatesTimeout := 1 * time.Minute switch head.(type) { @@ -115,23 +115,23 @@ func (h *HeadProcessor) Stop() { } func (h *HeadProcessor) UpdateHead(height, slot uint64) { - h.manualHeadChan <- protocol.NewBlock(height, slot, "") + h.manualHeadChan <- protocol.NewBlockWithHeights(height, slot) } func createHead( ctx context.Context, id string, pollInterval time.Duration, - apiConnector connectors.ApiConnector, + headConnector connectors.ApiConnector, specific specific.ChainSpecific, options *config.UpstreamOptions, ) Head { - switch apiConnector.GetType() { + switch headConnector.GetType() { case protocol.JsonRpcConnector, protocol.RestConnector: log.Info().Msgf("starting an rpc head of upstream %s with poll interval %s", id, pollInterval) - return newRpcHead(ctx, id, apiConnector, specific, pollInterval, options) + return newRpcHead(ctx, id, options.InternalTimeout, pollInterval, specific) case protocol.WsConnector: log.Info().Msgf("starting a subscription head of upstream %s", id) - return newWsHead(ctx, id, apiConnector, specific) + return newWsHead(ctx, id, options.InternalTimeout, headConnector, specific) default: return nil } @@ -151,7 +151,6 @@ type RpcHead struct { chainSpecific specific.ChainSpecific pollInterval time.Duration internalTimeout time.Duration - connector connectors.ApiConnector upstreamId string pollInProgress atomic.Bool headsChan chan *protocol.Block @@ -175,21 +174,19 @@ var _ Head = (*RpcHead)(nil) func newRpcHead( ctx context.Context, upstreamId string, - connector connectors.ApiConnector, - chainSpecific specific.ChainSpecific, + internalTimeout, pollInterval time.Duration, - options *config.UpstreamOptions, + chainSpecific specific.ChainSpecific, ) *RpcHead { head := RpcHead{ lifecycle: utils.NewBaseLifecycle(fmt.Sprintf("%s_rpc_head", upstreamId), ctx), block: utils.NewAtomic[protocol.Block](), chainSpecific: chainSpecific, pollInterval: pollInterval, - connector: connector, upstreamId: upstreamId, pollInProgress: atomic.Bool{}, headsChan: make(chan *protocol.Block), - internalTimeout: options.InternalTimeout, + internalTimeout: internalTimeout, } return &head @@ -228,7 +225,7 @@ func (r *RpcHead) poll() { ctx, cancel := context.WithTimeout(r.lifecycle.GetParentContext(), r.internalTimeout) defer cancel() - block, err := r.chainSpecific.GetLatestBlock(ctx, r.connector) + block, err := r.chainSpecific.GetLatestBlock(ctx) if err != nil { log.Warn().Err(err).Msgf("couldn't get the latest block of upstream %s", r.upstreamId) } else { @@ -239,12 +236,13 @@ func (r *RpcHead) poll() { } type SubscriptionHead struct { - lifecycle *utils.BaseLifecycle - block *utils.Atomic[protocol.Block] - chainSpecific specific.ChainSpecific - connector connectors.ApiConnector - upstreamId string - headsChan chan *protocol.Block + lifecycle *utils.BaseLifecycle + block *utils.Atomic[protocol.Block] + chainSpecific specific.ChainSpecific + headConnector connectors.ApiConnector + upstreamId string + headsChan chan *protocol.Block + internalTimeout time.Duration } func (w *SubscriptionHead) Running() bool { @@ -269,13 +267,16 @@ func (w *SubscriptionHead) GetCurrentBlock() *protocol.Block { func (w *SubscriptionHead) Start() { w.lifecycle.Start(func(ctx context.Context) error { + // get the latest block in order not to wait for the sub event + w.getLatestBlock() + subReq, err := w.chainSpecific.SubscribeHeadRequest() if err != nil { log.Warn().Err(err).Msgf("couldn't create a subscription request to upstream %s", w.upstreamId) return err } - subResponse, err := w.connector.Subscribe(ctx, subReq) + subResponse, err := w.headConnector.Subscribe(ctx, subReq) if err != nil { log.Warn().Err(err).Msgf("couldn't subscribe to upstream %s heads", w.upstreamId) return err @@ -316,14 +317,33 @@ func (w *SubscriptionHead) OnNoHeadUpdates() { go w.Start() } -func newWsHead(ctx context.Context, upstreamId string, connector connectors.ApiConnector, chainSpecific specific.ChainSpecific) *SubscriptionHead { +func (w *SubscriptionHead) getLatestBlock() { + ctx, cancel := context.WithTimeout(w.lifecycle.GetParentContext(), w.internalTimeout) + defer cancel() + block, err := w.chainSpecific.GetLatestBlock(ctx) + if err != nil { + log.Warn().Err(err).Msgf("couldn't get the latest block of upstream %s", w.upstreamId) + return + } + w.block.Store(*block) + w.headsChan <- block +} + +func newWsHead( + ctx context.Context, + upstreamId string, + internalTimeout time.Duration, + headConnector connectors.ApiConnector, + chainSpecific specific.ChainSpecific, +) *SubscriptionHead { head := SubscriptionHead{ - lifecycle: utils.NewBaseLifecycle(fmt.Sprintf("%s_subscription_head", upstreamId), ctx), - upstreamId: upstreamId, - chainSpecific: chainSpecific, - connector: connector, - block: utils.NewAtomic[protocol.Block](), - headsChan: make(chan *protocol.Block), + lifecycle: utils.NewBaseLifecycle(fmt.Sprintf("%s_subscription_head", upstreamId), ctx), + upstreamId: upstreamId, + chainSpecific: chainSpecific, + headConnector: headConnector, + internalTimeout: internalTimeout, + block: utils.NewAtomic[protocol.Block](), + headsChan: make(chan *protocol.Block), } return &head diff --git a/internal/upstreams/blocks/head_test.go b/internal/upstreams/blocks/head_test.go index 02eb621..c8a15d1 100644 --- a/internal/upstreams/blocks/head_test.go +++ b/internal/upstreams/blocks/head_test.go @@ -8,7 +8,8 @@ import ( "github.com/drpcorg/nodecore/internal/config" "github.com/drpcorg/nodecore/internal/protocol" "github.com/drpcorg/nodecore/internal/upstreams/blocks" - specific "github.com/drpcorg/nodecore/internal/upstreams/chains_specific" + "github.com/drpcorg/nodecore/pkg/blockchain" + "github.com/drpcorg/nodecore/pkg/test_utils" "github.com/drpcorg/nodecore/pkg/test_utils/mocks" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -23,7 +24,8 @@ func TestRpcHead(t *testing.T) { "jsonrpc": "2.0", "result": { "number": "0x41fd60b", - "hash": "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18" + "hash": "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18", + "parentHash": "0x1eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d11" } }`) response := protocol.NewHttpUpstreamResponse("1", body, 200, protocol.JsonRpc) @@ -35,15 +37,16 @@ func TestRpcHead(t *testing.T) { PollInterval: 10 * time.Millisecond, Options: &config.UpstreamOptions{InternalTimeout: 5 * time.Second}, } - headProcessor := blocks.NewHeadProcessor(ctx, &upConfig, connector, specific.EvmChainSpecific) + headProcessor := blocks.NewHeadProcessor(ctx, &upConfig, connector, test_utils.NewEvmChainSpecific(connector)) go headProcessor.Start() sub := headProcessor.Subscribe("test") event, ok := <-sub.Events expected := &protocol.BlockData{ - Height: uint64(69195275), - Hash: "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18", + Height: uint64(69195275), + Hash: blockchain.NewHashIdFromString("0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18"), + ParentHash: blockchain.NewHashIdFromString("0x1eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d11"), } connector.AssertExpectations(t) @@ -77,6 +80,18 @@ func TestWsHead(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + reqConnector := mocks.NewConnectorMock() + bodyLastBlock := []byte(`{ + "jsonrpc": "2.0", + "result": { + "number": "0x41FD60A", + "hash": "0x2eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d12", + "parentHash": "0x3eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d13" + } + }`) + responseLastBlock := protocol.NewHttpUpstreamResponse("1", bodyLastBlock, 200, protocol.JsonRpc) + reqConnector.On("SendRequest", mock.Anything, mock.Anything).Return(responseLastBlock) + connector := mocks.NewWsConnectorMock() body := []byte(`{ "jsonrpc": "2.0", @@ -84,7 +99,8 @@ func TestWsHead(t *testing.T) { "params": { "result": { "number": "0x41fd60b", - "hash": "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18" + "hash": "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18", + "parentHash": "0x1eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d11" }, "subscription": "0x89d9f8cd1e113f4b65c1e22f3847d3672cf5761f" } @@ -98,19 +114,30 @@ func TestWsHead(t *testing.T) { ChainName: "ethereum", Id: "id", PollInterval: 10 * time.Millisecond, + Options: &config.UpstreamOptions{InternalTimeout: 5 * time.Second}, } - headProcessor := blocks.NewHeadProcessor(ctx, &upConfig, connector, specific.EvmChainSpecific) + headProcessor := blocks.NewHeadProcessor(ctx, &upConfig, connector, test_utils.NewEvmChainSpecific(reqConnector)) go headProcessor.Start() sub := headProcessor.Subscribe("test") event, ok := <-sub.Events expected := &protocol.BlockData{ - Height: uint64(69195275), - Hash: "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18", + Height: uint64(69195274), + Hash: blockchain.NewHashIdFromString("0x2eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d12"), + ParentHash: blockchain.NewHashIdFromString("0x3eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d13"), + } + assert.True(t, ok) + assert.Equal(t, expected, event.HeadData) + assert.Equal(t, expected, headProcessor.GetCurrentBlock().BlockData) + + event, ok = <-sub.Events + expected = &protocol.BlockData{ + Height: uint64(69195275), + Hash: blockchain.NewHashIdFromString("0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18"), + ParentHash: blockchain.NewHashIdFromString("0x1eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d11"), } - connector.AssertExpectations(t) assert.True(t, ok) assert.Equal(t, expected, event.HeadData) assert.Equal(t, expected, headProcessor.GetCurrentBlock().BlockData) @@ -135,4 +162,7 @@ func TestWsHead(t *testing.T) { _, ok = <-sub.Events assert.False(t, ok) + + connector.AssertExpectations(t) + reqConnector.AssertExpectations(t) } diff --git a/internal/upstreams/chain_supervisor_test.go b/internal/upstreams/chain_supervisor_test.go index 83a5aca..d2d0c47 100644 --- a/internal/upstreams/chain_supervisor_test.go +++ b/internal/upstreams/chain_supervisor_test.go @@ -24,19 +24,19 @@ func TestChainSupervisorUpdateHeadWithHeightFc(t *testing.T) { go chainSupervisor.Start() chainSupervisor.Publish(test_utils.CreateEvent("id", protocol.Available, 100, methodsMock)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, uint64(100), chainSupervisor.GetChainState().HeadData.Head) chainSupervisor.Publish(test_utils.CreateEvent("id1", protocol.Available, 95, methodsMock)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, uint64(100), chainSupervisor.GetChainState().HeadData.Head) chainSupervisor.Publish(test_utils.CreateEvent("id3", protocol.Unavailable, 500, methodsMock)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, uint64(100), chainSupervisor.GetChainState().HeadData.Head) chainSupervisor.Publish(test_utils.CreateEvent("id", protocol.Available, 1000, methodsMock)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, uint64(1000), chainSupervisor.GetChainState().HeadData.Head) } @@ -54,14 +54,14 @@ func TestChainSupervisorTrackLags(t *testing.T) { blockInfo2.AddBlock(protocol.NewBlockDataWithHeight(700), protocol.FinalizedBlock) chainSupervisor.Publish(test_utils.CreateEventWithBlockData("id1", protocol.Available, 100, methodsMock, blockInfo1)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) chainDims1 := tracker.GetChainDimensions(chains.ARBITRUM, "id1") assert.Equal(t, uint64(0), chainDims1.GetHeadLag()) assert.Equal(t, uint64(0), chainDims1.GetFinalizationLag()) chainSupervisor.Publish(test_utils.CreateEventWithBlockData("id2", protocol.Available, 300, methodsMock, blockInfo2)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) chainDims1 = tracker.GetChainDimensions(chains.ARBITRUM, "id1") chainDims2 := tracker.GetChainDimensions(chains.ARBITRUM, "id2") @@ -80,19 +80,19 @@ func TestChainSupervisorUpdateStatus(t *testing.T) { go chainSupervisor.Start() chainSupervisor.Publish(test_utils.CreateEvent("id", protocol.Available, 100, methodsMock)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, protocol.Available, chainSupervisor.GetChainState().Status) chainSupervisor.Publish(test_utils.CreateEvent("id1", protocol.Unavailable, 95, methodsMock)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, protocol.Available, chainSupervisor.GetChainState().Status) chainSupervisor.Publish(test_utils.CreateEvent("id", protocol.Unavailable, 500, methodsMock)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, protocol.Unavailable, chainSupervisor.GetChainState().Status) chainSupervisor.Publish(test_utils.CreateEvent("id12", protocol.Available, 95, methodsMock)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, protocol.Available, chainSupervisor.GetChainState().Status) } @@ -108,19 +108,19 @@ func TestChainSupervisorUnionUpstreamMethods(t *testing.T) { go chainSupervisor.Start() chainSupervisor.Publish(test_utils.CreateEvent("id", protocol.Available, 100, methods1)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, mapset.NewThreadUnsafeSet[string]("test1"), chainSupervisor.GetChainState().Methods.GetSupportedMethods()) chainSupervisor.Publish(test_utils.CreateEvent("id2", protocol.Available, 100, methods2)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, mapset.NewThreadUnsafeSet[string]("test1", "test2"), chainSupervisor.GetChainState().Methods.GetSupportedMethods()) chainSupervisor.Publish(test_utils.CreateEvent("id1", protocol.Available, 100, methods3)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, mapset.NewThreadUnsafeSet[string]("test1", "test2", "test5"), chainSupervisor.GetChainState().Methods.GetSupportedMethods()) chainSupervisor.Publish(test_utils.CreateEvent("id", protocol.Unavailable, 100, methods1)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, mapset.NewThreadUnsafeSet[string]("test2", "test5"), chainSupervisor.GetChainState().Methods.GetSupportedMethods()) } @@ -135,35 +135,35 @@ func TestChainSupervisorUnionUpstreamBlockInfo(t *testing.T) { go chainSupervisor.Start() chainSupervisor.Publish(test_utils.CreateEventWithBlockData("id", protocol.Available, 100, methods, blockInfo1)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, uint64(1000), chainSupervisor.GetChainState().Blocks[protocol.FinalizedBlock].Height) blockInfo1.AddBlock(protocol.NewBlockDataWithHeight(2000), protocol.FinalizedBlock) chainSupervisor.Publish(test_utils.CreateEventWithBlockData("id", protocol.Available, 100, methods, blockInfo1)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, uint64(2000), chainSupervisor.GetChainState().Blocks[protocol.FinalizedBlock].Height) blockInfo2 := protocol.NewBlockInfo() blockInfo2.AddBlock(protocol.NewBlockDataWithHeight(500), protocol.FinalizedBlock) chainSupervisor.Publish(test_utils.CreateEventWithBlockData("id1", protocol.Available, 100, methods, blockInfo2)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, uint64(2000), chainSupervisor.GetChainState().Blocks[protocol.FinalizedBlock].Height) blockInfo3 := protocol.NewBlockInfo() blockInfo3.AddBlock(protocol.NewBlockDataWithHeight(50000), protocol.FinalizedBlock) chainSupervisor.Publish(test_utils.CreateEventWithBlockData("id5", protocol.Available, 100, methods, blockInfo3)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, uint64(50000), chainSupervisor.GetChainState().Blocks[protocol.FinalizedBlock].Height) chainSupervisor.Publish(test_utils.CreateEventWithBlockData("id5", protocol.Unavailable, 100, methods, blockInfo3)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, uint64(2000), chainSupervisor.GetChainState().Blocks[protocol.FinalizedBlock].Height) chainSupervisor.Publish(test_utils.CreateEventWithBlockData("id", protocol.Unavailable, 100, methods, blockInfo3)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Equal(t, uint64(500), chainSupervisor.GetChainState().Blocks[protocol.FinalizedBlock].Height) } @@ -175,12 +175,12 @@ func TestChainSupervisorRemoveUpstreamState(t *testing.T) { go chainSupervisor.Start() chainSupervisor.Publish(test_utils.CreateEvent("id", protocol.Available, 100, methods)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.True(t, chainSupervisor.GetUpstreamState("id") != nil) chainSupervisor.Publish(test_utils.CreateRemoveEvent("id")) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) assert.Nil(t, chainSupervisor.GetUpstreamState("id")) } diff --git a/internal/upstreams/chains_specific/aztec_chain_specific.go b/internal/upstreams/chains_specific/aztec_chain_specific.go index c50a917..b78e495 100644 --- a/internal/upstreams/chains_specific/aztec_chain_specific.go +++ b/internal/upstreams/chains_specific/aztec_chain_specific.go @@ -5,40 +5,40 @@ import ( "fmt" "github.com/bytedance/sonic" - "github.com/drpcorg/nodecore/internal/config" "github.com/drpcorg/nodecore/internal/protocol" "github.com/drpcorg/nodecore/internal/upstreams/connectors" "github.com/drpcorg/nodecore/internal/upstreams/validations" - "github.com/drpcorg/nodecore/pkg/chains" + "github.com/drpcorg/nodecore/pkg/blockchain" ) -var AztecChainSpecific *AztecChainSpecificObject - -func init() { - AztecChainSpecific = &AztecChainSpecificObject{} +type AztecChainSpecificObject struct { + upstreamId string + connector connectors.ApiConnector } -type AztecChainSpecificObject struct { +func NewAztecChainSpecificObject( + upstreamId string, + connector connectors.ApiConnector, +) *AztecChainSpecificObject { + return &AztecChainSpecificObject{ + upstreamId: upstreamId, + connector: connector, + } } var _ ChainSpecific = (*AztecChainSpecificObject)(nil) -func (a *AztecChainSpecificObject) SettingsValidators( - _ string, - _ connectors.ApiConnector, - _ *chains.ConfiguredChain, - _ *config.UpstreamOptions, -) []validations.SettingsValidator { +func (a *AztecChainSpecificObject) SettingsValidators() []validations.SettingsValidator { return nil } -func (a *AztecChainSpecificObject) GetLatestBlock(ctx context.Context, connector connectors.ApiConnector) (*protocol.Block, error) { +func (a *AztecChainSpecificObject) GetLatestBlock(ctx context.Context) (*protocol.Block, error) { request, err := protocol.NewInternalUpstreamJsonRpcRequest("node_getBlock", []interface{}{"latest"}) if err != nil { return nil, err } - response := connector.SendRequest(ctx, request) + response := a.connector.SendRequest(ctx, request) if response.HasError() { return nil, response.GetError() } @@ -46,7 +46,7 @@ func (a *AztecChainSpecificObject) GetLatestBlock(ctx context.Context, connector return a.ParseBlock(response.ResponseResult()) } -func (a *AztecChainSpecificObject) GetFinalizedBlock(ctx context.Context, connector connectors.ApiConnector) (*protocol.Block, error) { +func (a *AztecChainSpecificObject) GetFinalizedBlock(_ context.Context) (*protocol.Block, error) { return nil, nil } @@ -62,10 +62,10 @@ func (a *AztecChainSpecificObject) ParseBlock(blockBytes []byte) (*protocol.Bloc return nil, fmt.Errorf("couldn't parse the aztec block, got '%s'", string(blockBytes)) } - return protocol.NewBlock(height, 0, block.BlockHash), nil + return protocol.NewBlock(height, 0, blockchain.NewHashIdFromString(block.BlockHash), blockchain.EmptyHash), nil } -func (a *AztecChainSpecificObject) ParseSubscriptionBlock(blockBytes []byte) (*protocol.Block, error) { +func (a *AztecChainSpecificObject) ParseSubscriptionBlock(_ []byte) (*protocol.Block, error) { return nil, fmt.Errorf("aztec does not support websocket subscriptions") } diff --git a/internal/upstreams/chains_specific/evm_chain_specific.go b/internal/upstreams/chains_specific/evm_chain_specific.go index 95d7160..9ee941d 100644 --- a/internal/upstreams/chains_specific/evm_chain_specific.go +++ b/internal/upstreams/chains_specific/evm_chain_specific.go @@ -9,38 +9,48 @@ import ( "github.com/drpcorg/nodecore/internal/protocol" "github.com/drpcorg/nodecore/internal/upstreams/connectors" "github.com/drpcorg/nodecore/internal/upstreams/validations" + "github.com/drpcorg/nodecore/pkg/blockchain" "github.com/drpcorg/nodecore/pkg/chains" "github.com/ethereum/go-ethereum/rpc" ) type ChainSpecific interface { - GetLatestBlock(context.Context, connectors.ApiConnector) (*protocol.Block, error) - GetFinalizedBlock(context.Context, connectors.ApiConnector) (*protocol.Block, error) - ParseBlock([]byte) (*protocol.Block, error) - SubscribeHeadRequest() (protocol.RequestHolder, error) - ParseSubscriptionBlock([]byte) (*protocol.Block, error) - SettingsValidators(upstreamId string, connector connectors.ApiConnector, chain *chains.ConfiguredChain, options *config.UpstreamOptions) []validations.SettingsValidator -} + GetLatestBlock(ctx context.Context) (*protocol.Block, error) + GetFinalizedBlock(context.Context) (*protocol.Block, error) -var EvmChainSpecific *EvmChainSpecificObject + ParseBlock([]byte) (*protocol.Block, error) + ParseSubscriptionBlock(data []byte) (*protocol.Block, error) -func init() { - EvmChainSpecific = &EvmChainSpecificObject{} + SubscribeHeadRequest() (protocol.RequestHolder, error) + SettingsValidators() []validations.SettingsValidator } type EvmChainSpecificObject struct { + upstreamId string + connector connectors.ApiConnector + chain *chains.ConfiguredChain + options *config.UpstreamOptions } -func (e *EvmChainSpecificObject) SettingsValidators( +func NewEvmChainSpecific( upstreamId string, connector connectors.ApiConnector, chain *chains.ConfiguredChain, options *config.UpstreamOptions, -) []validations.SettingsValidator { +) *EvmChainSpecificObject { + return &EvmChainSpecificObject{ + upstreamId: upstreamId, + connector: connector, + chain: chain, + options: options, + } +} + +func (e *EvmChainSpecificObject) SettingsValidators() []validations.SettingsValidator { settingsValidators := make([]validations.SettingsValidator, 0) - if !*options.DisableChainValidation { - settingsValidators = append(settingsValidators, validations.NewChainValidator(upstreamId, connector, chain, options)) + if !*e.options.DisableChainValidation { + settingsValidators = append(settingsValidators, validations.NewChainValidator(e.upstreamId, e.connector, e.chain, e.options)) } return settingsValidators @@ -48,12 +58,12 @@ func (e *EvmChainSpecificObject) SettingsValidators( var _ ChainSpecific = (*EvmChainSpecificObject)(nil) -func (e *EvmChainSpecificObject) GetLatestBlock(ctx context.Context, connector connectors.ApiConnector) (*protocol.Block, error) { - return e.getBlockByTag(ctx, connector, rpc.LatestBlockNumber) +func (e *EvmChainSpecificObject) GetLatestBlock(ctx context.Context) (*protocol.Block, error) { + return e.getBlockByTag(ctx, e.connector, rpc.LatestBlockNumber) } -func (e *EvmChainSpecificObject) GetFinalizedBlock(ctx context.Context, connector connectors.ApiConnector) (*protocol.Block, error) { - return e.getBlockByTag(ctx, connector, rpc.FinalizedBlockNumber) +func (e *EvmChainSpecificObject) GetFinalizedBlock(ctx context.Context) (*protocol.Block, error) { + return e.getBlockByTag(ctx, e.connector, rpc.FinalizedBlockNumber) } func (e *EvmChainSpecificObject) ParseSubscriptionBlock(blockBytes []byte) (*protocol.Block, error) { @@ -70,7 +80,12 @@ func (e *EvmChainSpecificObject) ParseBlock(blockBytes []byte) (*protocol.Block, return nil, fmt.Errorf("couldn't parse the evm block, got '%s'", string(blockBytes)) } - return protocol.NewBlock(uint64(evmBlock.Height.Int64()), 0, evmBlock.Hash), nil + return protocol.NewBlock( + uint64(evmBlock.Height.Int64()), + 0, + blockchain.NewHashIdFromString(evmBlock.Hash), + blockchain.NewHashIdFromString(evmBlock.Parent), + ), nil } func (e *EvmChainSpecificObject) SubscribeHeadRequest() (protocol.RequestHolder, error) { @@ -97,5 +112,6 @@ func (e *EvmChainSpecificObject) getBlockByTag(ctx context.Context, connector co type EvmBlock struct { Hash string `json:"hash"` + Parent string `json:"parentHash"` Height *rpc.BlockNumber `json:"number"` } diff --git a/internal/upstreams/chains_specific/evm_chain_specific_test.go b/internal/upstreams/chains_specific/evm_chain_specific_test.go index cc5d4d5..4a7dc2e 100644 --- a/internal/upstreams/chains_specific/evm_chain_specific_test.go +++ b/internal/upstreams/chains_specific/evm_chain_specific_test.go @@ -9,7 +9,9 @@ import ( "github.com/drpcorg/nodecore/internal/protocol" specific "github.com/drpcorg/nodecore/internal/upstreams/chains_specific" "github.com/drpcorg/nodecore/internal/upstreams/validations" + "github.com/drpcorg/nodecore/pkg/blockchain" "github.com/drpcorg/nodecore/pkg/chains" + "github.com/drpcorg/nodecore/pkg/test_utils" "github.com/drpcorg/nodecore/pkg/test_utils/mocks" "github.com/samber/lo" "github.com/stretchr/testify/assert" @@ -23,7 +25,7 @@ func TestChainValidator(t *testing.T) { } connector := mocks.NewConnectorMock() - validators := specific.EvmChainSpecific.SettingsValidators("id", connector, chains.UnknownChain, options) + validators := specific.NewEvmChainSpecific("id", connector, chains.UnknownChain, options).SettingsValidators() _, ok := lo.Find(validators, func(item validations.SettingsValidator) bool { _, ok := item.(*validations.ChainValidator) @@ -32,7 +34,7 @@ func TestChainValidator(t *testing.T) { assert.True(t, ok) options.DisableChainValidation = lo.ToPtr(true) - validators = specific.EvmChainSpecific.SettingsValidators("id", connector, chains.UnknownChain, options) + validators = specific.NewEvmChainSpecific("id", connector, chains.UnknownChain, options).SettingsValidators() _, ok = lo.Find(validators, func(item validations.SettingsValidator) bool { _, ok := item.(*validations.ChainValidator) @@ -43,7 +45,7 @@ func TestChainValidator(t *testing.T) { } func TestEvmSubscribeHeadRequest(t *testing.T) { - req, err := specific.EvmChainSpecific.SubscribeHeadRequest() + req, err := test_utils.NewEvmChainSpecific(nil).SubscribeHeadRequest() assert.Nil(t, err) body, reqErr := req.Body() @@ -58,14 +60,20 @@ func TestEvmSubscribeHeadRequest(t *testing.T) { func TestEvmParseSubBLock(t *testing.T) { body := []byte(`{ "number": "0x41fd60b", - "hash": "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18" + "hash": "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18", + "parentHash": "0x1eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d11" }`) - block, err := specific.EvmChainSpecific.ParseSubscriptionBlock(body) - + block, err := test_utils.NewEvmChainSpecific(nil).ParseSubscriptionBlock(body) assert.Nil(t, err) - assert.Equal(t, uint64(69195275), block.BlockData.Height) - assert.Equal(t, "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18", block.BlockData.Hash) + + expected := &protocol.BlockData{ + Height: uint64(69195275), + Hash: blockchain.NewHashIdFromString("0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18"), + ParentHash: blockchain.NewHashIdFromString("0x1eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d11"), + } + + assert.Equal(t, expected, block.BlockData) } func TestEvmGetLatestBlock(t *testing.T) { @@ -75,19 +83,25 @@ func TestEvmGetLatestBlock(t *testing.T) { "jsonrpc": "2.0", "result": { "number": "0x41fd60b", - "hash": "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18" + "hash": "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18", + "parentHash": "0x1eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d11" } }`) response := protocol.NewHttpUpstreamResponse("1", body, 200, protocol.JsonRpc) connector.On("SendRequest", ctx, mock.Anything).Return(response) - block, err := specific.EvmChainSpecific.GetLatestBlock(ctx, connector) + block, err := test_utils.NewEvmChainSpecific(connector).GetLatestBlock(ctx) + assert.Nil(t, err) connector.AssertExpectations(t) - assert.Nil(t, err) - assert.Equal(t, uint64(69195275), block.BlockData.Height) - assert.Equal(t, "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18", block.BlockData.Hash) + + expected := &protocol.BlockData{ + Height: uint64(69195275), + Hash: blockchain.NewHashIdFromString("0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18"), + ParentHash: blockchain.NewHashIdFromString("0x1eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d11"), + } + assert.Equal(t, expected, block.BlockData) } func TestEvmGetLatestBlockWithError(t *testing.T) { @@ -97,7 +111,7 @@ func TestEvmGetLatestBlockWithError(t *testing.T) { connector.On("SendRequest", ctx, mock.Anything).Return(response) - block, err := specific.EvmChainSpecific.GetLatestBlock(ctx, connector) + block, err := test_utils.NewEvmChainSpecific(connector).GetLatestBlock(ctx) connector.AssertExpectations(t) assert.Nil(t, block) diff --git a/internal/upstreams/chains_specific/solana_chain_specific.go b/internal/upstreams/chains_specific/solana_chain_specific.go index 3e82752..38b1e5c 100644 --- a/internal/upstreams/chains_specific/solana_chain_specific.go +++ b/internal/upstreams/chains_specific/solana_chain_specific.go @@ -2,164 +2,149 @@ package specific import ( "context" + "encoding/binary" "fmt" + "time" "github.com/bytedance/sonic" - "github.com/drpcorg/nodecore/internal/config" "github.com/drpcorg/nodecore/internal/protocol" "github.com/drpcorg/nodecore/internal/upstreams/connectors" "github.com/drpcorg/nodecore/internal/upstreams/validations" - "github.com/drpcorg/nodecore/pkg/chains" + "github.com/drpcorg/nodecore/pkg/blockchain" + "github.com/drpcorg/nodecore/pkg/utils" + "github.com/rs/zerolog/log" "github.com/samber/lo" ) -var SolanaChainSpecific *SolanaChainSpecificObject +const checkInterval = 5 -func init() { - SolanaChainSpecific = &SolanaChainSpecificObject{} +type SolanaChainSpecificObject struct { + upstreamId string + connector connectors.ApiConnector + internalTimeout time.Duration + lastKnownHeights *utils.CMap[string, uint64] + lastCheckedSlots *utils.CMap[string, uint64] } -type SolanaChainSpecificObject struct { +func NewSolanaChainSpecificObject( + upstreamId string, + connector connectors.ApiConnector, + internalTimeout time.Duration, +) *SolanaChainSpecificObject { + return &SolanaChainSpecificObject{ + upstreamId: upstreamId, + connector: connector, + internalTimeout: internalTimeout, + lastKnownHeights: utils.NewCMap[string, uint64](), + lastCheckedSlots: utils.NewCMap[string, uint64](), + } } -func (s *SolanaChainSpecificObject) SettingsValidators( - _ string, - _ connectors.ApiConnector, - _ *chains.ConfiguredChain, - _ *config.UpstreamOptions, -) []validations.SettingsValidator { +func (s *SolanaChainSpecificObject) SettingsValidators() []validations.SettingsValidator { return nil } -var _ ChainSpecific = (*SolanaChainSpecificObject)(nil) - -func (s *SolanaChainSpecificObject) GetLatestBlock(ctx context.Context, connector connectors.ApiConnector) (*protocol.Block, error) { - slot, err := getLatestSlot(ctx, connector) - if err != nil { - return nil, err - } - - maxBlock, err := getMaxBlock(ctx, connector, slot) - if err != nil { - return nil, err - } - - block, err := getBlock(ctx, connector, maxBlock) - if err != nil { - return nil, err - } - - parsedBlock, err := s.ParseBlock(block) - if err != nil { - return nil, err - } - parsedBlock.BlockData.Slot = maxBlock - return parsedBlock, nil +func (s *SolanaChainSpecificObject) GetLatestBlock(ctx context.Context) (*protocol.Block, error) { + return s.getEpochInfo(ctx) } -func (s *SolanaChainSpecificObject) GetFinalizedBlock(ctx context.Context, connector connectors.ApiConnector) (*protocol.Block, error) { +func (s *SolanaChainSpecificObject) GetFinalizedBlock(_ context.Context) (*protocol.Block, error) { // TODO: implement get block/slot with finalized commitment return nil, nil } func (s *SolanaChainSpecificObject) ParseBlock(blockBytes []byte) (*protocol.Block, error) { - solanaBlock := SolanaBlock{} - err := sonic.Unmarshal(blockBytes, &solanaBlock) + epochInfo := SolanaEpochInfo{} + err := sonic.Unmarshal(blockBytes, &epochInfo) if err != nil { return nil, fmt.Errorf("couldn't parse the solana block, reason - %s", err.Error()) } - return protocol.NewBlock(solanaBlock.Height, 0, solanaBlock.Hash), nil + return createNewSolanaBlock(epochInfo.BlockHeight, epochInfo.AbsoluteSlot), nil } func (s *SolanaChainSpecificObject) ParseSubscriptionBlock(blockBytes []byte) (*protocol.Block, error) { - solanaSubBlock := SolanaSubscriptionBlock{} - err := sonic.Unmarshal(blockBytes, &solanaSubBlock) + slotEvent := SolanaSlotEvent{} + err := sonic.Unmarshal(blockBytes, &slotEvent) if err != nil { return nil, err } - - return protocol.NewBlock( - solanaSubBlock.Value.Block.Height, - solanaSubBlock.Context.Slot, - solanaSubBlock.Value.Block.Hash, - ), nil + lastSlot, _ := s.lastCheckedSlots.Load(s.upstreamId) + lastHeight, _ := s.lastKnownHeights.Load(s.upstreamId) + shouldCheck := slotEvent.Slot >= lastSlot && slotEvent.Slot-lastSlot >= checkInterval + estimatedHeight := lo.Ternary(lastHeight != 0 && lastSlot != 0, lastHeight+(slotEvent.Slot-lastSlot), 0) + + if shouldCheck || estimatedHeight == 0 { + block, err := s.getEpochInfo(context.Background()) + if err != nil { + var height uint64 + if estimatedHeight != 0 { + height = estimatedHeight + } else { + if lastHeight != 0 { + height = lastHeight + } else { + height = slotEvent.Slot + } + } + log.Err(err).Msgf("couldn't get the epoch info for upstream %s, using the estimated height %d, slot %d", s.upstreamId, height, slotEvent.Slot) + return createNewSolanaBlock(height, slotEvent.Slot), nil + } + return createNewSolanaBlock(block.BlockData.Height, block.BlockData.Slot), nil + } + return createNewSolanaBlock(estimatedHeight, slotEvent.Slot), nil } func (s *SolanaChainSpecificObject) SubscribeHeadRequest() (protocol.RequestHolder, error) { - params := map[string]interface{}{ - "showRewards": false, - "transactionDetails": "none", - } - return protocol.NewInternalSubUpstreamJsonRpcRequest("blockSubscribe", []interface{}{"all", params}) + return protocol.NewInternalSubUpstreamJsonRpcRequest("slotSubscribe", nil) } -func getLatestSlot(ctx context.Context, connector connectors.ApiConnector) (uint64, error) { - slotReq, err := protocol.NewInternalUpstreamJsonRpcRequest("getSlot", nil) +func (s *SolanaChainSpecificObject) getEpochInfo(ctx context.Context) (*protocol.Block, error) { + ctx, cancel := context.WithTimeout(ctx, s.internalTimeout) + defer cancel() + request, err := protocol.NewInternalUpstreamJsonRpcRequest("getEpochInfo", nil) if err != nil { - return 0, err + return nil, err } - slotResponse := connector.SendRequest(ctx, slotReq) - if slotResponse.HasError() { - return 0, slotResponse.GetError() + response := s.connector.SendRequest(ctx, request) + if response.HasError() { + return nil, response.GetError() } - - slot := protocol.ResultAsNumber(slotResponse.ResponseResult()) - - return slot, nil -} - -func getMaxBlock(ctx context.Context, connector connectors.ApiConnector, slot uint64) (uint64, error) { - blocksReq, err := protocol.NewInternalUpstreamJsonRpcRequest("getBlocks", []interface{}{slot - 10, slot}) + block, err := s.ParseBlock(response.ResponseResult()) if err != nil { - return 0, err - } - blocksResponse := connector.SendRequest(ctx, blocksReq) - if blocksResponse.HasError() { - return 0, blocksResponse.GetError() + return nil, err } - var blocks []uint64 - err = sonic.Unmarshal(blocksResponse.ResponseResult(), &blocks) - if err != nil { - return 0, err - } + s.lastKnownHeights.Store(s.upstreamId, block.BlockData.Height) + s.lastCheckedSlots.Store(s.upstreamId, block.BlockData.Slot) - return lo.Max(blocks), nil + return block, nil } -func getBlock(ctx context.Context, connector connectors.ApiConnector, block uint64) ([]byte, error) { - params := map[string]interface{}{ - "showRewards": false, - "transactionDetails": "none", - "maxSupportedTransactionVersion": 0, - } - blockReq, err := protocol.NewInternalUpstreamJsonRpcRequest("getBlock", []interface{}{block, params}) - if err != nil { - return nil, err - } - blockResponse := connector.SendRequest(ctx, blockReq) - if blockResponse.HasError() { - return nil, blockResponse.GetError() - } +func SyntheticHashes(slot uint64, parentSlot uint64) (blockchain.HashId, blockchain.HashId) { + b1 := make([]byte, 32) + binary.BigEndian.PutUint64(b1, slot) + syntheticHash := blockchain.NewHashIdFromBytes(b1) + + b2 := make([]byte, 32) + binary.BigEndian.PutUint64(b2, parentSlot) + syntheticParentHash := blockchain.NewHashIdFromBytes(b2) - return blockResponse.ResponseResult(), nil + return syntheticHash, syntheticParentHash } -type SolanaBlock struct { - Height uint64 `json:"blockHeight"` - Hash string `json:"blockhash"` +func createNewSolanaBlock(height uint64, slot uint64) *protocol.Block { + hash, parentHash := SyntheticHashes(slot, slot-1) + return protocol.NewBlock(height, slot, hash, parentHash) } -type SolanaSubscriptionBlock struct { - Context Context `json:"context"` - Value Value `json:"value"` +type SolanaEpochInfo struct { + AbsoluteSlot uint64 `json:"absoluteSlot"` + BlockHeight uint64 `json:"blockHeight"` } -type Context struct { +type SolanaSlotEvent struct { Slot uint64 `json:"slot"` } -type Value struct { - Block SolanaBlock `json:"block"` -} +var _ ChainSpecific = (*SolanaChainSpecificObject)(nil) diff --git a/internal/upstreams/chains_specific/solana_chain_specific_test.go b/internal/upstreams/chains_specific/solana_chain_specific_test.go index e4181ca..83fc9fa 100644 --- a/internal/upstreams/chains_specific/solana_chain_specific_test.go +++ b/internal/upstreams/chains_specific/solana_chain_specific_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "testing" + "time" "github.com/drpcorg/nodecore/internal/protocol" specific "github.com/drpcorg/nodecore/internal/upstreams/chains_specific" @@ -14,80 +15,123 @@ import ( ) func TestSolanaSubscribeHeadRequest(t *testing.T) { - req, err := specific.SolanaChainSpecific.SubscribeHeadRequest() + req, err := specific.NewSolanaChainSpecificObject("id", nil, 5*time.Second).SubscribeHeadRequest() assert.Nil(t, err) body, err := req.Body() assert.Nil(t, err) assert.Equal(t, "1", req.Id()) - assert.Equal(t, "blockSubscribe", req.Method()) + assert.Equal(t, "slotSubscribe", req.Method()) assert.False(t, req.IsStream()) - require.JSONEq(t, `{"id":"1","jsonrpc":"2.0","method":"blockSubscribe","params":["all",{"showRewards":false,"transactionDetails":"none"}]}`, string(body)) + require.JSONEq(t, `{"id":"1","jsonrpc":"2.0","method":"slotSubscribe","params":null}`, string(body)) } -func TestSolanaParseSubBLock(t *testing.T) { +func TestSolanaParseSubBlockErrEpochInfo(t *testing.T) { + connector := mocks.NewConnectorMock() body := []byte(`{ - "context": { - "slot": 327557189 - }, - "value": { - "slot": 327557189, - "block": { - "previousBlockhash": "5SFHqjdrjZdydRF8Cey9Zgp4CCX9ELifUSvjJV7kacnn", - "blockhash": "2XB8V5eP7HaNeRd2u98YLYS7QqzX61MNskmzeyXy4oiG", - "parentSlot": 327557188, - "blockTime": 1742296365, - "blockHeight": 305813576 - }, - "err": null - } - }`) - - block, err := specific.SolanaChainSpecific.ParseSubscriptionBlock(body) + "jsonrpc": "2.0", + "error": { + "code": -32000, + "message": "Server error: EpochInfo" + }, + "id": 1 + }`) + slot := []byte(`{ + "slot": 405220706, + "parent": 405220705, + "root": 405220674 + }`) + epochResponse := protocol.NewHttpUpstreamResponse("1", body, 200, protocol.JsonRpc) + + connector.On("SendRequest", mock.Anything, mock.Anything).Return(epochResponse) + block, err := specific.NewSolanaChainSpecificObject("id", connector, 5*time.Second).ParseSubscriptionBlock(slot) assert.Nil(t, err) - assert.Equal(t, uint64(305813576), block.BlockData.Height) - assert.Equal(t, uint64(327557189), block.BlockData.Slot) - assert.Equal(t, "2XB8V5eP7HaNeRd2u98YLYS7QqzX61MNskmzeyXy4oiG", block.BlockData.Hash) + + connector.AssertExpectations(t) + + hash, parentHash := specific.SyntheticHashes(405220706, 405220705) + blockData := protocol.NewBlockData(405220706, 405220706, hash, parentHash) + assert.Equal(t, blockData, block.BlockData) } -func TestSolanaGetLatestBlock(t *testing.T) { - ctx := context.Background() +func TestSolanaParseSubBLock(t *testing.T) { connector := mocks.NewConnectorMock() - slotBody := []byte(`{ - "id": 1, - "jsonrpc": "2.0", - "result": 327557752 + solanaSpecific := specific.NewSolanaChainSpecificObject("id", connector, 5*time.Second) + body := []byte(`{ + "slot": 405220706, + "parent": 405220705, + "root": 405220674 }`) - slotResponse := protocol.NewHttpUpstreamResponse("1", slotBody, 200, protocol.JsonRpc) - maxBlocksBody := []byte(`{ - "id": 1, - "jsonrpc": "2.0", - "result": [327557750, 327557751, 327557752] + body1 := []byte(`{ + "slot": 405219989, + "parent": 405220705, + "root": 405220674 }`) - maxBlocksResponse := protocol.NewHttpUpstreamResponse("1", maxBlocksBody, 200, protocol.JsonRpc) - blockBody := []byte(`{ - "id": 1, + epochBody := []byte(`{ "jsonrpc": "2.0", "result": { - "blockHeight": 305814139, - "blockhash": "7QbMXETjcbRHTLxqAEH62nGE2o8mNh7JsspkzughEoGv" - } + "absoluteSlot": 405219988, + "blockHeight": 383325939, + "epoch": 938, + "slotIndex": 3988, + "slotsInEpoch": 432000, + "transactionCount": 494578437235 + }, + "id": 1 }`) - blockResponse := protocol.NewHttpUpstreamResponse("1", blockBody, 200, protocol.JsonRpc) + epochResponse := protocol.NewHttpUpstreamResponse("1", epochBody, 200, protocol.JsonRpc) - connector.On("SendRequest", ctx, mock.Anything).Return(slotResponse).Once() - connector.On("SendRequest", ctx, mock.Anything).Return(maxBlocksResponse).Once() - connector.On("SendRequest", ctx, mock.Anything).Return(blockResponse).Once() + connector.On("SendRequest", mock.Anything, mock.Anything).Return(epochResponse) - block, err := specific.SolanaChainSpecific.GetLatestBlock(ctx, connector) + block, err := solanaSpecific.ParseSubscriptionBlock(body) + assert.Nil(t, err) connector.AssertExpectations(t) + + hash, parentHash := specific.SyntheticHashes(405219988, 405219987) + blockData := protocol.NewBlockData(383325939, 405219988, hash, parentHash) + assert.Equal(t, blockData, block.BlockData) + + block, err = solanaSpecific.ParseSubscriptionBlock(body1) assert.Nil(t, err) - assert.Equal(t, uint64(305814139), block.BlockData.Height) - assert.Equal(t, uint64(327557752), block.BlockData.Slot) - assert.Equal(t, "7QbMXETjcbRHTLxqAEH62nGE2o8mNh7JsspkzughEoGv", block.BlockData.Hash) + + hash, parentHash = specific.SyntheticHashes(405219989, 405219988) + blockData = protocol.NewBlockData(383325940, 405219989, hash, parentHash) + assert.Equal(t, blockData, block.BlockData) + + connector.AssertNumberOfCalls(t, "SendRequest", 1) +} + +func TestSolanaGetLatestBlock(t *testing.T) { + ctx := context.Background() + connector := mocks.NewConnectorMock() + epochBody := []byte(`{ + "jsonrpc": "2.0", + "result": { + "absoluteSlot": 405219988, + "blockHeight": 383325939, + "epoch": 938, + "slotIndex": 3988, + "slotsInEpoch": 432000, + "transactionCount": 494578437235 + }, + "id": 1 + }`) + epochResponse := protocol.NewHttpUpstreamResponse("1", epochBody, 200, protocol.JsonRpc) + + connector.On("SendRequest", mock.Anything, mock.Anything).Return(epochResponse) + + block, err := specific.NewSolanaChainSpecificObject("id", connector, 5*time.Second).GetLatestBlock(ctx) + assert.Nil(t, err) + + connector.AssertExpectations(t) + + hash, parentHash := specific.SyntheticHashes(405219988, 405219987) + blockData := protocol.NewBlockData(383325939, 405219988, hash, parentHash) + + assert.Equal(t, blockData, block.BlockData) } func TestSolanaGetLatestBlockWithError(t *testing.T) { @@ -95,9 +139,9 @@ func TestSolanaGetLatestBlockWithError(t *testing.T) { connector := mocks.NewConnectorMock() response := protocol.NewHttpUpstreamResponseWithError(protocol.ResponseErrorWithData(1, "block error", nil)) - connector.On("SendRequest", ctx, mock.Anything).Return(response) + connector.On("SendRequest", mock.Anything, mock.Anything).Return(response) - block, err := specific.SolanaChainSpecific.GetLatestBlock(ctx, connector) + block, err := specific.NewSolanaChainSpecificObject("id", connector, 5*time.Second).GetLatestBlock(ctx) connector.AssertExpectations(t) assert.Nil(t, block) diff --git a/internal/upstreams/upstream.go b/internal/upstreams/upstream.go index 15b46c5..f221a9e 100644 --- a/internal/upstreams/upstream.go +++ b/internal/upstreams/upstream.go @@ -98,12 +98,12 @@ func NewBaseUpstream( ctx, cancel := context.WithCancel(ctx) configuredChain := chains.GetChain(conf.ChainName) - headConnector, apiConnectors, caps, err := createUpstreamConnectors(ctx, conf, configuredChain, tracker, statsService, executor, torProxyUrl) + upstreamConnectorsInfo, caps, err := createUpstreamConnectors(ctx, conf, configuredChain, tracker, statsService, executor, torProxyUrl) if err != nil { cancel() return nil, err } - chainSpecific := getChainSpecific(configuredChain.Type) + chainSpecific := getChainSpecific(conf, upstreamConnectorsInfo, configuredChain) upstreamMethods, err := methods.NewUpstreamMethods(configuredChain.MethodSpec, conf.Methods) if err != nil { @@ -138,7 +138,7 @@ func NewBaseUpstream( id: conf.Id, chain: configuredChain.Chain, vendorType: getUpstreamVendor(conf.Connectors), - apiConnectors: apiConnectors, + apiConnectors: upstreamConnectorsInfo.allConnectors, upstreamCtx: newUpstreamCtx(cancel, mainLifecycle, processorsLifecycle), upstreamState: upState, subManager: utils.NewSubscriptionManager[protocol.UpstreamEvent](fmt.Sprintf("%s_upstream", conf.Id)), @@ -147,9 +147,9 @@ func NewBaseUpstream( upstreamIndexHex: upstreamIndexHex, upConfig: conf, - headProcessor: blocks.NewHeadProcessor(ctx, conf, headConnector, chainSpecific), - blockProcessor: createBlockProcessor(ctx, conf, headConnector, chainSpecific, configuredChain.Type), - settingsValidationProcessor: createSettingValidationProcessor(conf.Id, headConnector, configuredChain, chainSpecific, conf.Options), + headProcessor: blocks.NewHeadProcessor(ctx, conf, upstreamConnectorsInfo.headConnector, chainSpecific), + blockProcessor: createBlockProcessor(ctx, conf, upstreamConnectorsInfo.internalRequestConnector, chainSpecific, configuredChain.Type), + settingsValidationProcessor: createSettingValidationProcessor(chainSpecific), }, nil } @@ -387,14 +387,8 @@ func createConnector( } } -func createSettingValidationProcessor( - upstreamId string, - connector connectors.ApiConnector, - configuredChain *chains.ConfiguredChain, - chainSpecific specific.ChainSpecific, - options *config.UpstreamOptions, -) *validations.SettingsValidationProcessor { - validators := chainSpecific.SettingsValidators(upstreamId, connector, configuredChain, options) +func createSettingValidationProcessor(chainSpecific specific.ChainSpecific) *validations.SettingsValidationProcessor { + validators := chainSpecific.SettingsValidators() if len(validators) == 0 { return nil } @@ -416,17 +410,21 @@ func createBlockProcessor( } } -func getChainSpecific(blockchainType chains.BlockchainType) specific.ChainSpecific { +func getChainSpecific( + conf *config.Upstream, + upstreamConnectorsInfo *connectorsInfo, + configuredChain *chains.ConfiguredChain, +) specific.ChainSpecific { //TODO: there might be a few protocols a chain can work with, so it will be necessary to implement all of them - switch blockchainType { + switch configuredChain.Type { case chains.Ethereum: - return specific.EvmChainSpecific + return specific.NewEvmChainSpecific(conf.Id, upstreamConnectorsInfo.internalRequestConnector, configuredChain, conf.Options) case chains.Aztec: - return specific.AztecChainSpecific + return specific.NewAztecChainSpecificObject(conf.Id, upstreamConnectorsInfo.internalRequestConnector) case chains.Solana: - return specific.SolanaChainSpecific + return specific.NewSolanaChainSpecificObject(conf.Id, upstreamConnectorsInfo.internalRequestConnector, conf.Options.InternalTimeout) default: - panic(fmt.Sprintf("unknown blockchain type - %s", blockchainType)) + panic(fmt.Sprintf("unknown blockchain type - %s", configuredChain.Type)) } } @@ -445,15 +443,16 @@ func createUpstreamConnectors( statsService stats.StatsService, executor failsafe.Executor[protocol.ResponseHolder], torProxyUrl string, -) (connectors.ApiConnector, []connectors.ApiConnector, mapset.Set[protocol.Cap], error) { +) (*connectorsInfo, mapset.Set[protocol.Cap], error) { caps := mapset.NewThreadUnsafeSet[protocol.Cap]() apiConnectors := make([]connectors.ApiConnector, 0) var headConnector connectors.ApiConnector + var internalRequestConnector connectors.ApiConnector for _, connectorConfig := range conf.Connectors { apiConnector, err := createConnector(ctx, conf.Id, configuredChain, connectorConfig, torProxyUrl) if err != nil { - return nil, nil, nil, fmt.Errorf("cound't create api connector of %s: %v", conf.Id, err) + return nil, nil, fmt.Errorf("couldn't create api connector of %s: %v", conf.Id, err) } hooks := []protocol.ResponseReceivedHook{ dimensions.NewDimensionHook(tracker), @@ -463,11 +462,32 @@ func createUpstreamConnectors( if connectorConfig.Type == conf.HeadConnector { headConnector = apiConnector } + if connectorConfig.Type == conf.GetBestConnector() { + internalRequestConnector = apiConnector + } if apiConnector.GetType() == protocol.WsConnector { caps.Add(protocol.WsCap) } apiConnectors = append(apiConnectors, apiConnector) } - return headConnector, apiConnectors, caps, nil + return newConnectorInfo(headConnector, internalRequestConnector, apiConnectors), caps, nil +} + +type connectorsInfo struct { + headConnector connectors.ApiConnector + internalRequestConnector connectors.ApiConnector + allConnectors []connectors.ApiConnector +} + +func newConnectorInfo( + headConnector, + internalRequestConnector connectors.ApiConnector, + allConnectors []connectors.ApiConnector, +) *connectorsInfo { + return &connectorsInfo{ + headConnector: headConnector, + internalRequestConnector: internalRequestConnector, + allConnectors: allConnectors, + } } diff --git a/internal/upstreams/upstream_test.go b/internal/upstreams/upstream_test.go index fa1de0c..04d5424 100644 --- a/internal/upstreams/upstream_test.go +++ b/internal/upstreams/upstream_test.go @@ -11,9 +11,9 @@ import ( "github.com/drpcorg/nodecore/internal/protocol" "github.com/drpcorg/nodecore/internal/upstreams" "github.com/drpcorg/nodecore/internal/upstreams/blocks" - specific "github.com/drpcorg/nodecore/internal/upstreams/chains_specific" "github.com/drpcorg/nodecore/internal/upstreams/methods" "github.com/drpcorg/nodecore/internal/upstreams/validations" + "github.com/drpcorg/nodecore/pkg/blockchain" "github.com/drpcorg/nodecore/pkg/chains" specs "github.com/drpcorg/nodecore/pkg/methods" "github.com/drpcorg/nodecore/pkg/test_utils" @@ -33,7 +33,8 @@ func TestUpstreamHeadEvent(t *testing.T) { "jsonrpc": "2.0", "result": { "number": "0x41fd60b", - "hash": "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18" + "hash": "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18", + "parentHash": "0x1eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d11" } }`) requestLatest, _ := protocol.NewInternalUpstreamJsonRpcRequest(specs.EthGetBlockByNumber, []any{"latest", false}) @@ -53,14 +54,11 @@ func TestUpstreamHeadEvent(t *testing.T) { sub := upstream.Subscribe("name") - checkFunc := func(height uint64, hash string) { + checkFunc := func(blockData *protocol.BlockData) { event, ok := <-sub.Events state := protocol.UpstreamState{ - Status: protocol.Available, - HeadData: &protocol.BlockData{ - Height: height, - Hash: hash, - }, + Status: protocol.Available, + HeadData: blockData, BlockInfo: protocol.NewBlockInfo(), UpstreamMethods: mocks.NewMethodsMock(), Caps: mapset.NewThreadUnsafeSet[protocol.Cap](), @@ -79,9 +77,15 @@ func TestUpstreamHeadEvent(t *testing.T) { assert.Equal(t, state, upstream.GetUpstreamState()) } - checkFunc(69195275, "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18") + blockData := protocol.NewBlockData( + 69195275, + 0, + blockchain.NewHashIdFromString("0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18"), + blockchain.NewHashIdFromString("0x1eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d11"), + ) + checkFunc(blockData) upstream.UpdateHead(79195275, 0) - checkFunc(79195275, "") + checkFunc(protocol.NewBlockData(79195275, 0, blockchain.EmptyHash, blockchain.EmptyHash)) connector.AssertExpectations(t) } @@ -95,7 +99,8 @@ func TestUpstreamBlockEvent(t *testing.T) { "jsonrpc": "2.0", "result": { "number": "0x345", - "hash": "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18" + "hash": "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18", + "parentHash": "0x1eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d11" } }`) requestLatest, _ := protocol.NewInternalUpstreamJsonRpcRequest(specs.EthGetBlockByNumber, []any{"latest", false}) @@ -116,7 +121,7 @@ func TestUpstreamBlockEvent(t *testing.T) { Options: &config.UpstreamOptions{InternalTimeout: 5 * time.Second}, } - blockProcessor := blocks.NewEthLikeBlockProcessor(ctx, upConfig, connector, &specific.EvmChainSpecificObject{}) + blockProcessor := blocks.NewEthLikeBlockProcessor(ctx, upConfig, connector, test_utils.NewEvmChainSpecific(connector)) upstream := test_utils.TestEvmUpstream(ctx, connector, upConfig, blockProcessor, nil, mocks.NewMethodsMock()) go upstream.Start() @@ -156,9 +161,15 @@ func TestUpstreamBlockEvent(t *testing.T) { assert.Equal(t, state.BlockInfo.GetBlocks(), upstream.GetUpstreamState().BlockInfo.GetBlocks()) } - checkFunc(protocol.NewBlockData(837, 0, "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18")) + blockData := protocol.NewBlockData( + 837, + 0, + blockchain.NewHashIdFromString("0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18"), + blockchain.NewHashIdFromString("0x1eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d11"), + ) + checkFunc(blockData) upstream.UpdateBlock(protocol.NewBlockDataWithHeight(1000), protocol.FinalizedBlock) - checkFunc(protocol.NewBlockData(1000, 0, "")) + checkFunc(protocol.NewBlockData(1000, 0, blockchain.EmptyHash, blockchain.EmptyHash)) time.Sleep(15 * time.Millisecond) diff --git a/pkg/blockchain/hash_id.go b/pkg/blockchain/hash_id.go new file mode 100644 index 0000000..459f14f --- /dev/null +++ b/pkg/blockchain/hash_id.go @@ -0,0 +1,59 @@ +package blockchain + +import ( + "encoding/hex" + "strings" +) + +var hexDigits = []byte("0123456789abcdef") + +var EmptyHash HashId = nil + +type HashId []byte + +func NewHashIdFromBytes(value []byte) HashId { + // just pass bytes as is + if len(value) >= 2 && value[0] == '0' && (value[1] == 'x' || value[1] == 'X') { + value = value[2:] + } + return value +} + +func NewHashIdFromString(value string) HashId { + clean := value + if strings.HasPrefix(value, "0x") || strings.HasPrefix(value, "0X") { + clean = value[2:] + } + + if len(clean)%2 != 0 { + clean = "0" + clean + } + + bytes, err := hex.DecodeString(clean) + if err != nil { + return bytes + } + + return bytes +} + +func (h HashId) ToHex() string { + hexHash := make([]byte, len(h)*2) + + i := 0 + j := 0 + for i < len(h) { + b := h[i] + hexHash[j] = hexDigits[(b&0xF0)>>4] + j++ + hexHash[j] = hexDigits[b&0x0F] + j++ + i++ + } + + return string(hexHash) +} + +func (h HashId) ToHexWithPrefix() string { + return "0x" + h.ToHex() +} diff --git a/pkg/blockchain/hash_id_test.go b/pkg/blockchain/hash_id_test.go new file mode 100644 index 0000000..0fbbff8 --- /dev/null +++ b/pkg/blockchain/hash_id_test.go @@ -0,0 +1,25 @@ +package blockchain_test + +import ( + "encoding/binary" + "testing" + + "github.com/drpcorg/nodecore/pkg/blockchain" + "github.com/stretchr/testify/assert" +) + +func TestNewHashIdFromString(t *testing.T) { + hashStr := "0xbb6e6f14b1656de46de5bdd9f13196e5d556ad071a2ce802bb3e5e704047b40e" + hashId := blockchain.NewHashIdFromString(hashStr) + + assert.Equal(t, hashStr[2:], hashId.ToHex()) + assert.Equal(t, hashStr, hashId.ToHexWithPrefix()) +} + +func TestNewHashIdFromBytes(t *testing.T) { + b1 := make([]byte, 32) + binary.BigEndian.PutUint64(b1, 405223378) + hashId := blockchain.NewHashIdFromBytes(b1) + + assert.Equal(t, "00000000182737d2000000000000000000000000000000000000000000000000", hashId.ToHex()) +} diff --git a/pkg/test_utils/test_helpers.go b/pkg/test_utils/test_helpers.go index 98997d9..f7bc52e 100644 --- a/pkg/test_utils/test_helpers.go +++ b/pkg/test_utils/test_helpers.go @@ -169,7 +169,7 @@ func TestEvmUpstream( "id", chains.ETHEREUM, []connectors.ApiConnector{connector}, - blocks.NewHeadProcessor(ctx, upConfig, connector, specific.EvmChainSpecific), + blocks.NewHeadProcessor(ctx, upConfig, connector, NewEvmChainSpecific(connector)), blockProcessor, settingValidationProcessor, upState, @@ -178,6 +178,10 @@ func TestEvmUpstream( ) } +func NewEvmChainSpecific(connector connectors.ApiConnector) *specific.EvmChainSpecificObject { + return specific.NewEvmChainSpecific("id", connector, chains.GetChain("polygon"), nil) +} + func CreateChainSupervisor() *upstreams.ChainSupervisor { chainSupervisor := upstreams.NewChainSupervisor(context.Background(), chains.ARBITRUM, fork_choice.NewHeightForkChoice(), nil)