From bdb69181cb8df492864feaba79f9ef2fa5f47b09 Mon Sep 17 00:00:00 2001 From: bianyuanop Date: Wed, 5 Mar 2025 10:20:05 -0500 Subject: [PATCH 1/3] seq wscli reconnect --- seq/seqclient.go | 64 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 48 insertions(+), 16 deletions(-) diff --git a/seq/seqclient.go b/seq/seqclient.go index 86703ce6..0af90f13 100644 --- a/seq/seqclient.go +++ b/seq/seqclient.go @@ -6,6 +6,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "github.com/AnomalyFi/Arcadia/common" @@ -64,9 +65,11 @@ type BaseSeqClient interface { var _ BaseSeqClient = (*SeqClient)(nil) type SeqClient struct { + cfg *SeqClientConfig + srpc *srpc.JSONRPCClient hrpc *hrpc.JSONRPCClient - wsCli *hrpc.WebSocketClient + wsCli atomic.Pointer[hrpc.WebSocketClient] signer ed25519.PrivateKey @@ -118,22 +121,16 @@ func NewSeqClient(config *SeqClientConfig) (*SeqClient, error) { return nil, err } - wsCli, err := hrpc.NewWebSocketClient(config.URL, hrpc.DefaultHandshakeTimeout, pubsub.MaxPendingMessages, pubsub.MaxReadMessageSize) - if err != nil { - return nil, err - } - if err := wsCli.RegisterBlocks(); err != nil { - return nil, err - } - stopSig := make(chan struct{}) hashFn := func(key uint64) uint64 { return key } epochMap := NewShardedMap[uint64, int](1024, 16, hashFn) client := SeqClient{ + cfg: config, + srpc: scli, hrpc: hcli, - wsCli: wsCli, + wsCli: atomic.Pointer[hrpc.WebSocketClient]{}, signer: config.PrivateKey, currentValidators: make([]*hrpc.Validator, 0), @@ -152,24 +149,44 @@ func NewSeqClient(config *SeqClientConfig) (*SeqClient, error) { actsQueue: make(chan []chain.Action, ActionChannelBufferSize), } + if err := client.instantiateWSClient(); err != nil { + return nil, err + } + // keep track of head of SEQ, this is used for calculating `Epoch` in SubmitBlock to Arcadia go func() { ctx := context.Background() + var lastLitenBlockAt time.Time for { select { case <-stopSig: client.logger.Info("stopping as receiving stop signal") return default: + if time.Since(lastLitenBlockAt) < 500*time.Millisecond { + continue + } + + wsCli := client.wsCli.Load() + client.logger.Debug("listening block") + lastLitenBlockAt = time.Now() bctx, cancel := context.WithTimeout(ctx, config.BlockWaitTime) blk, results, _, _, err := wsCli.ListenBlock(bctx, parser) - if err != nil { - client.logger.Error("unable to listen block", "err", err) - cancel() + cancel() + + switch err { + case context.Canceled: + fallthrough + case context.DeadlineExceeded: + client.logger.WithError(err).Error("context cancelled on listening block") continue + default: + client.logger.WithError(err).Error("error listening block") + if err := client.instantiateWSClient(); err != nil { + client.logger.WithError(err).Error("unable to instantiate ws client") + } } - cancel() // release the lock after duty map is updated client.blockHeadL.Lock() @@ -353,7 +370,9 @@ func (s *SeqClient) SubmitActions(ctx context.Context, acts []chain.Action) (ids } func (s *SeqClient) submitAndWaitTx(ctx context.Context, tx *chain.Transaction) (ids.ID, *chain.Result, error) { - if err := s.wsCli.RegisterTx(tx); err != nil { + wsCli := s.wsCli.Load() + + if err := wsCli.RegisterTx(tx); err != nil { s.logger.WithError(err).Debug("unable to send tx") return ids.Empty, nil, err } @@ -368,7 +387,7 @@ listenTx: s.logger.WithError(err).Debug("time out waiting on tx") return ids.Empty, nil, err default: - txID, dErr, result, err := s.wsCli.ListenTx(ctx) + txID, dErr, result, err := wsCli.ListenTx(ctx) if dErr != nil { return ids.Empty, nil, dErr } @@ -439,3 +458,16 @@ func (s *SeqClient) ProposerAtHeight(ctx context.Context, height uint64) (*hrpc. func (s *SeqClient) GetHighestSettledToBNonce(ctx context.Context) (uint64, error) { return s.srpc.GetHighestSettledToBNonce(ctx) } + +func (s *SeqClient) instantiateWSClient() error { + wsCli, err := hrpc.NewWebSocketClient(s.cfg.URL, hrpc.DefaultHandshakeTimeout, pubsub.MaxPendingMessages, pubsub.MaxReadMessageSize) + if err != nil { + return err + } + if err := wsCli.RegisterBlocks(); err != nil { + return err + } + + s.wsCli.Store(wsCli) + return nil +} From 6ea5cd47a7a2c100fb8e3369e190d60cfefb2eeb Mon Sep 17 00:00:00 2001 From: bianyuanop Date: Wed, 5 Mar 2025 10:21:32 -0500 Subject: [PATCH 2/3] nits --- seq/consts.go | 3 +++ seq/seqclient.go | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/seq/consts.go b/seq/consts.go index 9206574c..bc06b12f 100644 --- a/seq/consts.go +++ b/seq/consts.go @@ -1,5 +1,8 @@ package seq +import "time" + const DefaultProposerLRUSize = 20 const GetArcadiaBlockSignatureHeader = "X-Arcadia-GetBlock-Sig" const ActionChannelBufferSize = 20 +const MinRateListenBlock = 500 * time.Millisecond diff --git a/seq/seqclient.go b/seq/seqclient.go index 0af90f13..439f5d90 100644 --- a/seq/seqclient.go +++ b/seq/seqclient.go @@ -163,7 +163,7 @@ func NewSeqClient(config *SeqClientConfig) (*SeqClient, error) { client.logger.Info("stopping as receiving stop signal") return default: - if time.Since(lastLitenBlockAt) < 500*time.Millisecond { + if time.Since(lastLitenBlockAt) < MinRateListenBlock { continue } From d1f5904127e3bad75b78c14c91c88a900d5ccc8a Mon Sep 17 00:00:00 2001 From: bianyuanop Date: Wed, 5 Mar 2025 10:32:14 -0500 Subject: [PATCH 3/3] resolve comments[no ci] --- seq/seqclient.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/seq/seqclient.go b/seq/seqclient.go index 439f5d90..2e03d2c0 100644 --- a/seq/seqclient.go +++ b/seq/seqclient.go @@ -182,7 +182,7 @@ func NewSeqClient(config *SeqClientConfig) (*SeqClient, error) { client.logger.WithError(err).Error("context cancelled on listening block") continue default: - client.logger.WithError(err).Error("error listening block") + client.logger.WithError(err).Error("error listening block, prob connection dropped by SEQ") if err := client.instantiateWSClient(); err != nil { client.logger.WithError(err).Error("unable to instantiate ws client") } @@ -373,7 +373,7 @@ func (s *SeqClient) submitAndWaitTx(ctx context.Context, tx *chain.Transaction) wsCli := s.wsCli.Load() if err := wsCli.RegisterTx(tx); err != nil { - s.logger.WithError(err).Debug("unable to send tx") + s.logger.WithError(err).Warn("unable to send tx") return ids.Empty, nil, err } s.logger.WithField("txID", tx.ID().String()).Info("seqclient tx submitted") @@ -384,7 +384,7 @@ listenTx: select { case <-ctx.Done(): err := fmt.Errorf("timeout on waiting tx: %w", ctx.Err()) - s.logger.WithError(err).Debug("time out waiting on tx") + s.logger.WithError(err).Warn("time out waiting on tx") return ids.Empty, nil, err default: txID, dErr, result, err := wsCli.ListenTx(ctx)