Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions seq/consts.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package seq

import "time"

const DefaultProposerLRUSize = 20
const GetArcadiaBlockSignatureHeader = "X-Arcadia-GetBlock-Sig"
const ActionChannelBufferSize = 20
const MinRateListenBlock = 500 * time.Millisecond
68 changes: 50 additions & 18 deletions seq/seqclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/AnomalyFi/Arcadia/common"
Expand Down Expand Up @@ -64,9 +65,11 @@ type BaseSeqClient interface {
var _ BaseSeqClient = (*SeqClient)(nil)

type SeqClient struct {
cfg *SeqClientConfig

srpc *srpc.JSONRPCClient
hrpc *hrpc.JSONRPCClient
wsCli *hrpc.WebSocketClient
wsCli atomic.Pointer[hrpc.WebSocketClient]

signer ed25519.PrivateKey

Expand Down Expand Up @@ -118,22 +121,16 @@ func NewSeqClient(config *SeqClientConfig) (*SeqClient, error) {
return nil, err
}

wsCli, err := hrpc.NewWebSocketClient(config.URL, hrpc.DefaultHandshakeTimeout, pubsub.MaxPendingMessages, pubsub.MaxReadMessageSize)
if err != nil {
return nil, err
}
if err := wsCli.RegisterBlocks(); err != nil {
return nil, err
}

stopSig := make(chan struct{})
hashFn := func(key uint64) uint64 { return key }
epochMap := NewShardedMap[uint64, int](1024, 16, hashFn)

client := SeqClient{
cfg: config,

srpc: scli,
hrpc: hcli,
wsCli: wsCli,
wsCli: atomic.Pointer[hrpc.WebSocketClient]{},
signer: config.PrivateKey,

currentValidators: make([]*hrpc.Validator, 0),
Expand All @@ -152,24 +149,44 @@ func NewSeqClient(config *SeqClientConfig) (*SeqClient, error) {
actsQueue: make(chan []chain.Action, ActionChannelBufferSize),
}

if err := client.instantiateWSClient(); err != nil {
return nil, err
}

// keep track of head of SEQ, this is used for calculating `Epoch` in SubmitBlock to Arcadia
go func() {
ctx := context.Background()
var lastLitenBlockAt time.Time
for {
select {
case <-stopSig:
client.logger.Info("stopping as receiving stop signal")
return
default:
if time.Since(lastLitenBlockAt) < MinRateListenBlock {
continue
}

wsCli := client.wsCli.Load()

client.logger.Debug("listening block")
lastLitenBlockAt = time.Now()
bctx, cancel := context.WithTimeout(ctx, config.BlockWaitTime)
blk, results, _, _, err := wsCli.ListenBlock(bctx, parser)
if err != nil {
client.logger.Error("unable to listen block", "err", err)
cancel()
cancel()

switch err {
case context.Canceled:
fallthrough
case context.DeadlineExceeded:
client.logger.WithError(err).Error("context cancelled on listening block")
continue
default:
client.logger.WithError(err).Error("error listening block, prob connection dropped by SEQ")
if err := client.instantiateWSClient(); err != nil {
client.logger.WithError(err).Error("unable to instantiate ws client")
}
}
cancel()

// release the lock after duty map is updated
client.blockHeadL.Lock()
Expand Down Expand Up @@ -353,8 +370,10 @@ func (s *SeqClient) SubmitActions(ctx context.Context, acts []chain.Action) (ids
}

func (s *SeqClient) submitAndWaitTx(ctx context.Context, tx *chain.Transaction) (ids.ID, *chain.Result, error) {
if err := s.wsCli.RegisterTx(tx); err != nil {
s.logger.WithError(err).Debug("unable to send tx")
wsCli := s.wsCli.Load()

if err := wsCli.RegisterTx(tx); err != nil {
s.logger.WithError(err).Warn("unable to send tx")
return ids.Empty, nil, err
}
s.logger.WithField("txID", tx.ID().String()).Info("seqclient tx submitted")
Expand All @@ -365,10 +384,10 @@ listenTx:
select {
case <-ctx.Done():
err := fmt.Errorf("timeout on waiting tx: %w", ctx.Err())
s.logger.WithError(err).Debug("time out waiting on tx")
s.logger.WithError(err).Warn("time out waiting on tx")
return ids.Empty, nil, err
default:
txID, dErr, result, err := s.wsCli.ListenTx(ctx)
txID, dErr, result, err := wsCli.ListenTx(ctx)
if dErr != nil {
return ids.Empty, nil, dErr
}
Expand Down Expand Up @@ -439,3 +458,16 @@ func (s *SeqClient) ProposerAtHeight(ctx context.Context, height uint64) (*hrpc.
func (s *SeqClient) GetHighestSettledToBNonce(ctx context.Context) (uint64, error) {
return s.srpc.GetHighestSettledToBNonce(ctx)
}

func (s *SeqClient) instantiateWSClient() error {
wsCli, err := hrpc.NewWebSocketClient(s.cfg.URL, hrpc.DefaultHandshakeTimeout, pubsub.MaxPendingMessages, pubsub.MaxReadMessageSize)
if err != nil {
return err
}
if err := wsCli.RegisterBlocks(); err != nil {
return err
}

s.wsCli.Store(wsCli)
return nil
}