diff --git a/seq/consts.go b/seq/consts.go index 9206574c..bc06b12f 100644 --- a/seq/consts.go +++ b/seq/consts.go @@ -1,5 +1,8 @@ package seq +import "time" + const DefaultProposerLRUSize = 20 const GetArcadiaBlockSignatureHeader = "X-Arcadia-GetBlock-Sig" const ActionChannelBufferSize = 20 +const MinRateListenBlock = 500 * time.Millisecond diff --git a/seq/seqclient.go b/seq/seqclient.go index 86703ce6..2e03d2c0 100644 --- a/seq/seqclient.go +++ b/seq/seqclient.go @@ -6,6 +6,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "github.com/AnomalyFi/Arcadia/common" @@ -64,9 +65,11 @@ type BaseSeqClient interface { var _ BaseSeqClient = (*SeqClient)(nil) type SeqClient struct { + cfg *SeqClientConfig + srpc *srpc.JSONRPCClient hrpc *hrpc.JSONRPCClient - wsCli *hrpc.WebSocketClient + wsCli atomic.Pointer[hrpc.WebSocketClient] signer ed25519.PrivateKey @@ -118,22 +121,16 @@ func NewSeqClient(config *SeqClientConfig) (*SeqClient, error) { return nil, err } - wsCli, err := hrpc.NewWebSocketClient(config.URL, hrpc.DefaultHandshakeTimeout, pubsub.MaxPendingMessages, pubsub.MaxReadMessageSize) - if err != nil { - return nil, err - } - if err := wsCli.RegisterBlocks(); err != nil { - return nil, err - } - stopSig := make(chan struct{}) hashFn := func(key uint64) uint64 { return key } epochMap := NewShardedMap[uint64, int](1024, 16, hashFn) client := SeqClient{ + cfg: config, + srpc: scli, hrpc: hcli, - wsCli: wsCli, + wsCli: atomic.Pointer[hrpc.WebSocketClient]{}, signer: config.PrivateKey, currentValidators: make([]*hrpc.Validator, 0), @@ -152,24 +149,44 @@ func NewSeqClient(config *SeqClientConfig) (*SeqClient, error) { actsQueue: make(chan []chain.Action, ActionChannelBufferSize), } + if err := client.instantiateWSClient(); err != nil { + return nil, err + } + // keep track of head of SEQ, this is used for calculating `Epoch` in SubmitBlock to Arcadia go func() { ctx := context.Background() + var lastLitenBlockAt time.Time for { select { case <-stopSig: client.logger.Info("stopping as receiving stop signal") return default: + if time.Since(lastLitenBlockAt) < MinRateListenBlock { + continue + } + + wsCli := client.wsCli.Load() + client.logger.Debug("listening block") + lastLitenBlockAt = time.Now() bctx, cancel := context.WithTimeout(ctx, config.BlockWaitTime) blk, results, _, _, err := wsCli.ListenBlock(bctx, parser) - if err != nil { - client.logger.Error("unable to listen block", "err", err) - cancel() + cancel() + + switch err { + case context.Canceled: + fallthrough + case context.DeadlineExceeded: + client.logger.WithError(err).Error("context cancelled on listening block") continue + default: + client.logger.WithError(err).Error("error listening block, prob connection dropped by SEQ") + if err := client.instantiateWSClient(); err != nil { + client.logger.WithError(err).Error("unable to instantiate ws client") + } } - cancel() // release the lock after duty map is updated client.blockHeadL.Lock() @@ -353,8 +370,10 @@ func (s *SeqClient) SubmitActions(ctx context.Context, acts []chain.Action) (ids } func (s *SeqClient) submitAndWaitTx(ctx context.Context, tx *chain.Transaction) (ids.ID, *chain.Result, error) { - if err := s.wsCli.RegisterTx(tx); err != nil { - s.logger.WithError(err).Debug("unable to send tx") + wsCli := s.wsCli.Load() + + if err := wsCli.RegisterTx(tx); err != nil { + s.logger.WithError(err).Warn("unable to send tx") return ids.Empty, nil, err } s.logger.WithField("txID", tx.ID().String()).Info("seqclient tx submitted") @@ -365,10 +384,10 @@ listenTx: select { case <-ctx.Done(): err := fmt.Errorf("timeout on waiting tx: %w", ctx.Err()) - s.logger.WithError(err).Debug("time out waiting on tx") + s.logger.WithError(err).Warn("time out waiting on tx") return ids.Empty, nil, err default: - txID, dErr, result, err := s.wsCli.ListenTx(ctx) + txID, dErr, result, err := wsCli.ListenTx(ctx) if dErr != nil { return ids.Empty, nil, dErr } @@ -439,3 +458,16 @@ func (s *SeqClient) ProposerAtHeight(ctx context.Context, height uint64) (*hrpc. func (s *SeqClient) GetHighestSettledToBNonce(ctx context.Context) (uint64, error) { return s.srpc.GetHighestSettledToBNonce(ctx) } + +func (s *SeqClient) instantiateWSClient() error { + wsCli, err := hrpc.NewWebSocketClient(s.cfg.URL, hrpc.DefaultHandshakeTimeout, pubsub.MaxPendingMessages, pubsub.MaxReadMessageSize) + if err != nil { + return err + } + if err := wsCli.RegisterBlocks(); err != nil { + return err + } + + s.wsCli.Store(wsCli) + return nil +}