Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions internal/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,16 +276,8 @@ func (u *Upstream) setDefaults(defaults *ChainDefaults) {
}
}
if u.HeadConnector == "" && len(u.Connectors) > 0 {
filteredConnectors := lo.Filter(u.Connectors, func(item *ApiConnectorConfig, index int) bool {
_, ok := connectorTypesRating[item.Type]
return ok
})

if len(filteredConnectors) > 0 {
defaultHeadConnectorType := lo.MinBy(filteredConnectors, func(a *ApiConnectorConfig, b *ApiConnectorConfig) bool {
return connectorTypesRating[a.Type] < connectorTypesRating[b.Type]
}).Type
u.HeadConnector = defaultHeadConnectorType
if headConnector := u.GetBestConnector(); headConnector != "" {
u.HeadConnector = headConnector
}
}
if u.RateLimitAutoTune != nil {
Expand Down
14 changes: 14 additions & 0 deletions internal/config/upstream_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,20 @@ type Upstream struct {
RateLimitAutoTune *RateLimitAutoTuneConfig `yaml:"rate-limit-auto-tune"`
}

func (u *Upstream) GetBestConnector() ApiConnectorType {
filteredConnectors := lo.Filter(u.Connectors, func(item *ApiConnectorConfig, index int) bool {
_, ok := connectorTypesRating[item.Type]
return ok
})

if len(filteredConnectors) > 0 {
return lo.MinBy(filteredConnectors, func(a *ApiConnectorConfig, b *ApiConnectorConfig) bool {
return connectorTypesRating[a.Type] < connectorTypesRating[b.Type]
}).Type
}
return ""
}

type UpstreamOptions struct {
InternalTimeout time.Duration `yaml:"internal-timeout"`
ValidationInterval time.Duration `yaml:"validation-interval"`
Expand Down
35 changes: 25 additions & 10 deletions internal/protocol/blocks.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,48 @@
package protocol

import "github.com/drpcorg/nodecore/pkg/blockchain"

type Block struct {
BlockData *BlockData
}

type BlockData struct {
Height uint64
Slot uint64
Hash string
Height uint64
Slot uint64
Hash blockchain.HashId
ParentHash blockchain.HashId
}

func (b *BlockData) IsEmpty() bool {
return b.Height == 0 && b.Slot == 0 && b.Hash == ""
return b.Height == 0 && b.Slot == 0 && len(b.Hash) == 0 && len(b.ParentHash) == 0
}

func NewBlockDataWithHeight(height uint64) *BlockData {
return &BlockData{Height: height}
}

func NewBlockData(height, slot uint64, hash string) *BlockData {
return &BlockData{Height: height, Slot: slot, Hash: hash}
func NewBlockData(height, slot uint64, hash, parentHash blockchain.HashId) *BlockData {
return &BlockData{
Height: height,
Slot: slot,
Hash: hash,
ParentHash: parentHash,
}
}

func NewBlock(height, slot uint64, hash string) *Block {
func NewBlock(height, slot uint64, hash, parentHash blockchain.HashId) *Block {
return &Block{
BlockData: &BlockData{
Height: height,
Slot: slot,
Hash: hash,
Height: height,
Slot: slot,
Hash: hash,
ParentHash: parentHash,
},
}
}

func NewBlockWithHeights(height, slot uint64) *Block {
return &Block{
BlockData: &BlockData{Height: height, Slot: slot},
}
}
21 changes: 16 additions & 5 deletions internal/protocol/blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,24 @@ import (
"testing"

"github.com/drpcorg/nodecore/internal/protocol"
"github.com/drpcorg/nodecore/pkg/blockchain"
"github.com/stretchr/testify/assert"
)

func TestBlockCreation(t *testing.T) {
block := protocol.NewBlock(uint64(15), uint64(55), "hash")
block := protocol.NewBlock(
uint64(15),
uint64(55),
blockchain.NewHashIdFromString("0x2345"),
blockchain.NewHashIdFromString("0x1111"),
)
expectedBlock := protocol.Block{
BlockData: &protocol.BlockData{Height: uint64(15), Slot: uint64(55), Hash: "hash"},
BlockData: &protocol.BlockData{
Height: uint64(15),
Slot: uint64(55),
Hash: blockchain.NewHashIdFromString("0x2345"),
ParentHash: blockchain.NewHashIdFromString("0x1111"),
},
}

assert.Equal(t, &expectedBlock, block)
Expand Down Expand Up @@ -47,7 +58,7 @@ func TestBlockData(t *testing.T) {
},
{
name: "data with hash",
blockData: protocol.BlockData{Hash: "hash"},
blockData: protocol.BlockData{Hash: blockchain.NewHashIdFromString("0x2345")},
expected: false,
},
{
Expand All @@ -57,12 +68,12 @@ func TestBlockData(t *testing.T) {
},
{
name: "data with height and hash",
blockData: protocol.BlockData{Height: uint64(12), Hash: "hash"},
blockData: protocol.BlockData{Height: uint64(12), Hash: blockchain.NewHashIdFromString("0x2345")},
expected: false,
},
{
name: "data with slot and hash",
blockData: protocol.BlockData{Slot: uint64(12), Hash: "hash"},
blockData: protocol.BlockData{Slot: uint64(12), Hash: blockchain.NewHashIdFromString("0x2345")},
expected: false,
},
}
Expand Down
5 changes: 3 additions & 2 deletions internal/protocol/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

mapset "github.com/deckarep/golang-set/v2"
"github.com/drpcorg/nodecore/internal/protocol"
"github.com/drpcorg/nodecore/pkg/blockchain"
"github.com/drpcorg/nodecore/pkg/test_utils/mocks"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -114,8 +115,8 @@ func TestDefaultUpstreamState(t *testing.T) {

func TestBlockInfo(t *testing.T) {
blockInfo := protocol.NewBlockInfo()
blockData := &protocol.BlockData{Height: uint64(75), Hash: "hash"}
blockData2 := &protocol.BlockData{Height: uint64(86), Hash: "hash1", Slot: uint64(25)}
blockData := &protocol.BlockData{Height: uint64(75), Hash: blockchain.NewHashIdFromString("0x2345")}
blockData2 := &protocol.BlockData{Height: uint64(86), Hash: blockchain.NewHashIdFromString("0x2345"), Slot: uint64(25)}
assert.NotNil(t, blockInfo)

blockInfo.AddBlock(blockData, protocol.FinalizedBlock)
Expand Down
2 changes: 1 addition & 1 deletion internal/upstreams/blocks/block_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (b *EthLikeBlockProcessor) poll(blockType protocol.BlockType) {
ctx, cancel := context.WithTimeout(b.lifecycle.GetParentContext(), b.internalTimeout)
defer cancel()

block, err := b.chainSpecific.GetFinalizedBlock(ctx, b.connector)
block, err := b.chainSpecific.GetFinalizedBlock(ctx)
if err != nil {
var respErr *protocol.ResponseError
if errors.As(err, &respErr) {
Expand Down
15 changes: 9 additions & 6 deletions internal/upstreams/blocks/block_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import (
"github.com/drpcorg/nodecore/internal/config"
"github.com/drpcorg/nodecore/internal/protocol"
"github.com/drpcorg/nodecore/internal/upstreams/blocks"
specific "github.com/drpcorg/nodecore/internal/upstreams/chains_specific"
"github.com/drpcorg/nodecore/pkg/blockchain"
"github.com/drpcorg/nodecore/pkg/test_utils"
"github.com/drpcorg/nodecore/pkg/test_utils/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand All @@ -22,23 +23,25 @@ func TestEthLikeBlockProcessorGetFinalizedBlock(t *testing.T) {
"jsonrpc": "2.0",
"result": {
"number": "0x41fd60b",
"hash": "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18"
"hash": "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18",
"parentHash": "0x1eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d11"
}
}`)
response := protocol.NewHttpUpstreamResponse("1", body, 200, protocol.JsonRpc)

connector.On("SendRequest", mock.Anything, mock.Anything).Return(response)

processor := blocks.NewEthLikeBlockProcessor(ctx, upConfig, connector, specific.EvmChainSpecific)
processor := blocks.NewEthLikeBlockProcessor(ctx, upConfig, connector, test_utils.NewEvmChainSpecific(connector))
go processor.Start()

sub := processor.Subscribe("sub")
event, ok := <-sub.Events

expected := blocks.BlockEvent{
BlockData: &protocol.BlockData{
Height: uint64(69195275),
Hash: "0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18",
Height: uint64(69195275),
Hash: blockchain.NewHashIdFromString("0xdeeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d18"),
ParentHash: blockchain.NewHashIdFromString("0x1eeaae5f33e2a990aab15d48c26118fd8875f1a2aaac376047268d80f2486d11"),
},
BlockType: protocol.FinalizedBlock,
}
Expand Down Expand Up @@ -79,7 +82,7 @@ func TestEthLikeBlockProcessorDisableFinalizedBlock(t *testing.T) {

connector.On("SendRequest", mock.Anything, mock.Anything).Return(response)

processor := blocks.NewEthLikeBlockProcessor(ctx, upConfig, connector, specific.EvmChainSpecific)
processor := blocks.NewEthLikeBlockProcessor(ctx, upConfig, connector, test_utils.NewEvmChainSpecific(connector))
go processor.Start()

sub := processor.Subscribe("sub")
Expand Down
76 changes: 48 additions & 28 deletions internal/upstreams/blocks/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ var _ utils.Lifecycle = (*HeadProcessor)(nil)
func NewHeadProcessor(
ctx context.Context,
upConfig *config.Upstream,
apiConnector connectors.ApiConnector,
headConnector connectors.ApiConnector,
specific specific.ChainSpecific,
) *HeadProcessor {
configuredChain := chains.GetChain(upConfig.ChainName)
head := createHead(ctx, upConfig.Id, upConfig.PollInterval, apiConnector, specific, upConfig.Options)
head := createHead(ctx, upConfig.Id, upConfig.PollInterval, headConnector, specific, upConfig.Options)

headNoUpdatesTimeout := 1 * time.Minute
switch head.(type) {
Expand Down Expand Up @@ -115,23 +115,23 @@ func (h *HeadProcessor) Stop() {
}

func (h *HeadProcessor) UpdateHead(height, slot uint64) {
h.manualHeadChan <- protocol.NewBlock(height, slot, "")
h.manualHeadChan <- protocol.NewBlockWithHeights(height, slot)
}

func createHead(
ctx context.Context,
id string, pollInterval time.Duration,
apiConnector connectors.ApiConnector,
headConnector connectors.ApiConnector,
specific specific.ChainSpecific,
options *config.UpstreamOptions,
) Head {
switch apiConnector.GetType() {
switch headConnector.GetType() {
case protocol.JsonRpcConnector, protocol.RestConnector:
log.Info().Msgf("starting an rpc head of upstream %s with poll interval %s", id, pollInterval)
return newRpcHead(ctx, id, apiConnector, specific, pollInterval, options)
return newRpcHead(ctx, id, options.InternalTimeout, pollInterval, specific)
case protocol.WsConnector:
log.Info().Msgf("starting a subscription head of upstream %s", id)
return newWsHead(ctx, id, apiConnector, specific)
return newWsHead(ctx, id, options.InternalTimeout, headConnector, specific)
default:
return nil
}
Expand All @@ -151,7 +151,6 @@ type RpcHead struct {
chainSpecific specific.ChainSpecific
pollInterval time.Duration
internalTimeout time.Duration
connector connectors.ApiConnector
upstreamId string
pollInProgress atomic.Bool
headsChan chan *protocol.Block
Expand All @@ -175,21 +174,19 @@ var _ Head = (*RpcHead)(nil)
func newRpcHead(
ctx context.Context,
upstreamId string,
connector connectors.ApiConnector,
chainSpecific specific.ChainSpecific,
internalTimeout,
pollInterval time.Duration,
options *config.UpstreamOptions,
chainSpecific specific.ChainSpecific,
) *RpcHead {
head := RpcHead{
lifecycle: utils.NewBaseLifecycle(fmt.Sprintf("%s_rpc_head", upstreamId), ctx),
block: utils.NewAtomic[protocol.Block](),
chainSpecific: chainSpecific,
pollInterval: pollInterval,
connector: connector,
upstreamId: upstreamId,
pollInProgress: atomic.Bool{},
headsChan: make(chan *protocol.Block),
internalTimeout: options.InternalTimeout,
internalTimeout: internalTimeout,
}

return &head
Expand Down Expand Up @@ -228,7 +225,7 @@ func (r *RpcHead) poll() {
ctx, cancel := context.WithTimeout(r.lifecycle.GetParentContext(), r.internalTimeout)
defer cancel()

block, err := r.chainSpecific.GetLatestBlock(ctx, r.connector)
block, err := r.chainSpecific.GetLatestBlock(ctx)
if err != nil {
log.Warn().Err(err).Msgf("couldn't get the latest block of upstream %s", r.upstreamId)
} else {
Expand All @@ -239,12 +236,13 @@ func (r *RpcHead) poll() {
}

type SubscriptionHead struct {
lifecycle *utils.BaseLifecycle
block *utils.Atomic[protocol.Block]
chainSpecific specific.ChainSpecific
connector connectors.ApiConnector
upstreamId string
headsChan chan *protocol.Block
lifecycle *utils.BaseLifecycle
block *utils.Atomic[protocol.Block]
chainSpecific specific.ChainSpecific
headConnector connectors.ApiConnector
upstreamId string
headsChan chan *protocol.Block
internalTimeout time.Duration
}

func (w *SubscriptionHead) Running() bool {
Expand All @@ -269,13 +267,16 @@ func (w *SubscriptionHead) GetCurrentBlock() *protocol.Block {

func (w *SubscriptionHead) Start() {
w.lifecycle.Start(func(ctx context.Context) error {
// get the latest block in order not to wait for the sub event
w.getLatestBlock()

subReq, err := w.chainSpecific.SubscribeHeadRequest()
if err != nil {
log.Warn().Err(err).Msgf("couldn't create a subscription request to upstream %s", w.upstreamId)
return err
}

subResponse, err := w.connector.Subscribe(ctx, subReq)
subResponse, err := w.headConnector.Subscribe(ctx, subReq)
if err != nil {
log.Warn().Err(err).Msgf("couldn't subscribe to upstream %s heads", w.upstreamId)
return err
Expand Down Expand Up @@ -316,14 +317,33 @@ func (w *SubscriptionHead) OnNoHeadUpdates() {
go w.Start()
}

func newWsHead(ctx context.Context, upstreamId string, connector connectors.ApiConnector, chainSpecific specific.ChainSpecific) *SubscriptionHead {
func (w *SubscriptionHead) getLatestBlock() {
ctx, cancel := context.WithTimeout(w.lifecycle.GetParentContext(), w.internalTimeout)
defer cancel()
block, err := w.chainSpecific.GetLatestBlock(ctx)
if err != nil {
log.Warn().Err(err).Msgf("couldn't get the latest block of upstream %s", w.upstreamId)
return
}
w.block.Store(*block)
w.headsChan <- block
}

func newWsHead(
ctx context.Context,
upstreamId string,
internalTimeout time.Duration,
headConnector connectors.ApiConnector,
chainSpecific specific.ChainSpecific,
) *SubscriptionHead {
head := SubscriptionHead{
lifecycle: utils.NewBaseLifecycle(fmt.Sprintf("%s_subscription_head", upstreamId), ctx),
upstreamId: upstreamId,
chainSpecific: chainSpecific,
connector: connector,
block: utils.NewAtomic[protocol.Block](),
headsChan: make(chan *protocol.Block),
lifecycle: utils.NewBaseLifecycle(fmt.Sprintf("%s_subscription_head", upstreamId), ctx),
upstreamId: upstreamId,
chainSpecific: chainSpecific,
headConnector: headConnector,
internalTimeout: internalTimeout,
block: utils.NewAtomic[protocol.Block](),
headsChan: make(chan *protocol.Block),
}

return &head
Expand Down
Loading