diff --git a/bft/bft.go b/bft/bft.go index 9f5c4984..fd01f219 100644 --- a/bft/bft.go +++ b/bft/bft.go @@ -943,7 +943,7 @@ type ( CommitCertificate(qc *lib.QuorumCertificate, block *lib.Block, blockResult *lib.BlockResult, ts uint64) (err lib.ErrorI) // GossipBlock() is a P2P call to gossip a completed Quorum Certificate with a Proposal GossipBlock(certificate *lib.QuorumCertificate, sender []byte, timestamp uint64) - // GossipConsensus() is a P2P call to gossip a completed Quorum Certificate with a Proposal + // GossipConsensus() is a P2P call to gossip a consensus message GossipConsensus(message *Message, senderPubExclude []byte) // SendToSelf() is a P2P call to directly send a completed Quorum Certificate to self SelfSendBlock(qc *lib.QuorumCertificate, timestamp uint64) diff --git a/cmd/rpc/client.go b/cmd/rpc/client.go index 7ff7646f..e282dd75 100644 --- a/cmd/rpc/client.go +++ b/cmd/rpc/client.go @@ -430,14 +430,18 @@ func (c *Client) Transaction(tx lib.TransactionI) (hash *string, err lib.ErrorI) return } -func (c *Client) Transactions(txs []lib.TransactionI) (hash *string, err lib.ErrorI) { +func (c *Client) Transactions(txs []lib.TransactionI) (hashes []*string, err lib.ErrorI) { bz, err := lib.MarshalJSON(txs) if err != nil { return nil, err } - hash = new(string) - err = c.post(TxsRouteName, bz, hash) - return + // a single transaction returns a single string hash + if len(txs) == 1 { + hash := new(string) + err = c.post(TxsRouteName, bz, hash) + return []*string{hash}, err + } + return hashes, c.post(TxsRouteName, bz, &hashes) } func (c *Client) Keystore() (keystore *crypto.Keystore, err lib.ErrorI) { diff --git a/cmd/rpc/query.go b/cmd/rpc/query.go index 5875f2de..ccd8d14e 100644 --- a/cmd/rpc/query.go +++ b/cmd/rpc/query.go @@ -38,13 +38,18 @@ func (s *Server) Transaction(w http.ResponseWriter, r *http.Request, _ httproute // Transactions handles multiple transactions in a single request func (s *Server) Transactions(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { // create a slice to hold the incoming transactions - var txs []lib.TransactionI + var txs []lib.Transaction // unmarshal the HTTP request body into the transactions slice if ok := unmarshal(w, r, &txs); !ok { return } + // cast txs to lib.TransactionI + txsI := make([]lib.TransactionI, len(txs)) + for i := range txs { + txsI[i] = &txs[i] + } // submit transactions to RPC server - s.submitTxs(w, txs) + s.submitTxs(w, txsI) } // Height responds with the next block version diff --git a/cmd/rpc/server.go b/cmd/rpc/server.go index 7f47afd9..65280a71 100644 --- a/cmd/rpc/server.go +++ b/cmd/rpc/server.go @@ -197,7 +197,7 @@ func (s *Server) startHeapProfiler() { } } -// submitTxs submits transactions to the controller and writes http response +// submitTx submits transactions to the controller and writes http response func (s *Server) submitTxs(w http.ResponseWriter, txs []lib.TransactionI) (ok bool) { // marshal each transaction to bytes var txBytes [][]byte @@ -215,9 +215,10 @@ func (s *Server) submitTxs(w http.ResponseWriter, txs []lib.TransactionI) (ok bo return } // return hashes of all submitted transactions - var hashes []string + var hashes []*string for _, bz := range txBytes { - hashes = append(hashes, crypto.HashString(bz)) + hash := crypto.HashString(bz) + hashes = append(hashes, &hash) } // if only one transaction was submitted, return the hash as a string if len(hashes) == 1 { diff --git a/controller/block.go b/controller/block.go index ba96af42..1dea8970 100644 --- a/controller/block.go +++ b/controller/block.go @@ -590,7 +590,7 @@ func (c *Controller) HandlePeerBlock(msg *lib.BlockMessage, syncing bool) (*lib. result = nil } // attempts to commit the QC to persistence of chain by playing it against the state machine - if err = c.CommitCertificate(qc, block, result, msg.Time); err != nil { + if err = c.CommitCertificateParallel(qc, block, result, msg.Time); err != nil { // exit with error return nil, err } diff --git a/controller/tx.go b/controller/tx.go index 58963993..c5a84c5a 100644 --- a/controller/tx.go +++ b/controller/tx.go @@ -38,13 +38,13 @@ func (c *Controller) ListenForTx() { // exit continue } - func() { - // check and add the message to the cache to prevent duplicates - if ok := cache.Add(msg); !ok { - // if duplicate, exit - return - } - c.log.Debug("Handling transaction") + // check and add the message to the cache to prevent duplicates + if ok := cache.Add(msg); !ok { + // if duplicate, exit + continue + } + go func() { + // c.log.Debug("Handling transaction async") // create a convenience variable for the identity of the sender senderID := msg.Sender.Address.PublicKey // try to unmarshal the p2p message as a tx message diff --git a/fsm/account.go b/fsm/account.go index 0e4a918e..91c185b5 100644 --- a/fsm/account.go +++ b/fsm/account.go @@ -5,9 +5,10 @@ import ( "encoding/json" "strings" + "sort" + "github.com/canopy-network/canopy/lib" "github.com/canopy-network/canopy/lib/crypto" - "sort" ) /* This file defines the account, pool, and supply tracker state interactions */ diff --git a/fsm/dex.go b/fsm/dex.go index 43213343..6e5d7cdc 100644 --- a/fsm/dex.go +++ b/fsm/dex.go @@ -2,11 +2,12 @@ package fsm import ( "bytes" - "github.com/canopy-network/canopy/lib" - "github.com/canopy-network/canopy/lib/crypto" "math/big" "sort" "strings" + + "github.com/canopy-network/canopy/lib" + "github.com/canopy-network/canopy/lib/crypto" ) /* Dex.go implements logic to handle AMM style atomic exchanges between root & nested chains @@ -558,6 +559,8 @@ func (s *StateMachine) RotateDexBatches(receiptsHash []byte, lPoolSize, counterP if err = s.Delete(KeyForNextBatch(chainId)); err != nil { return } + // delete it from the cache too + delete(s.cache.chainDexBatch, lib.BytesToString(KeyForNextBatch(chainId))) // set the upcoming sell batch as 'last' return s.SetDexBatch(KeyForLockedBatch(chainId), nextSellBatch) } @@ -663,7 +666,11 @@ func (s *StateMachine) SetDexBatch(key []byte, b *lib.DexBatch) (err lib.ErrorI) if err != nil { return } - return s.Set(key, value) + if err := s.Set(key, value); err != nil { + return err + } + s.cache.chainDexBatch[lib.BytesToString(key)] = b + return nil } // GetDexBatch() retrieves a sell batch from the state store @@ -673,11 +680,16 @@ func (s *StateMachine) GetDexBatch(chainId uint64, locked bool, withPoints ...bo if locked { key = KeyForLockedBatch(chainId) } - // get bytes from state - bz, err := s.Get(key) - if err != nil { - return - } + // ensures the pool points are attached to the batch whether + // the batch is retrieved from cache or state + defer func() { + if err == nil && len(withPoints) == 1 && withPoints[0] { + // set the pool points + b.PoolPoints = lPool.Points + // set total pool points + b.TotalPoolPoints = lPool.TotalPoolPoints + } + }() // retrieve the liquidity pool from state lPool, err = s.GetPool(chainId + LiquidityPoolAddend) if err != nil { @@ -686,6 +698,16 @@ func (s *StateMachine) GetDexBatch(chainId uint64, locked bool, withPoints ...bo // create a new batch object reference to ensure no 'nil' batches are used b = &lib.DexBatch{Committee: chainId, PoolSize: lPool.Amount} defer b.EnsureNonNil() + // first, try to obtain the current batch from the cache to avoid unnecessary unmarshalling + if cached, ok := s.cache.chainDexBatch[lib.BytesToString(key)]; ok { + b = cached + return + } + // otherwise, get bytes from state + bz, err := s.Get(key) + if err != nil { + return + } // check for nil bytes if len(bz) == 0 { return @@ -693,12 +715,6 @@ func (s *StateMachine) GetDexBatch(chainId uint64, locked bool, withPoints ...bo // populate the batch object with the bytes err = lib.Unmarshal(bz, b) // check if points should be attached - if len(withPoints) == 1 && withPoints[0] { - // set the pool points - b.PoolPoints = lPool.Points - // set total pool points - b.TotalPoolPoints = lPool.TotalPoolPoints - } // exit return } diff --git a/fsm/dex_test.go b/fsm/dex_test.go index a8d073ea..05db3ab7 100644 --- a/fsm/dex_test.go +++ b/fsm/dex_test.go @@ -3,7 +3,6 @@ package fsm import ( "bytes" "fmt" - "github.com/canopy-network/canopy/lib/crypto" "math" "math/rand" "os" @@ -12,6 +11,8 @@ import ( "testing" "time" + "github.com/canopy-network/canopy/lib/crypto" + "github.com/canopy-network/canopy/lib" "github.com/stretchr/testify/require" ) @@ -2664,8 +2665,10 @@ func newDexChain(t *testing.T, chainId, counterId uint64, rng *rand.Rand) *dexCh } func (s *dexSim) advanceHeight() { + s.chainX.sm.cache.Reset() s.chainX.sm.height++ s.chainY.sm.height++ + s.chainY.sm.cache.Reset() } func genOps(sim *dexSim, rng *rand.Rand) { @@ -3574,6 +3577,8 @@ func clearLocks(t *testing.T, chain1, chain2 StateMachine, chain1Id, chain2Id ui require.NoError(t, chain1.Delete(KeyForNextBatch(chain2Id))) require.NoError(t, chain2.Delete(KeyForLockedBatch(chain1Id))) require.NoError(t, chain2.Delete(KeyForNextBatch(chain1Id))) + chain1.cache.Reset() + chain2.cache.Reset() } func TestGetPriceUsesBigIntForE6ScaledPrice(t *testing.T) { diff --git a/fsm/gov.go b/fsm/gov.go index 444c3c8d..9880436f 100644 --- a/fsm/gov.go +++ b/fsm/gov.go @@ -382,10 +382,16 @@ func (s *StateMachine) ParsePollTransactions(b *lib.BlockResult) { } // for each transaction in the block for _, tx := range b.Transactions { - // get the public key object - pub, e := crypto.NewPublicKeyFromBytes(tx.Transaction.Signature.PublicKey) - if e != nil { - return + // get the public key object from the cache + pub, ok := s.cache.publicKey.Get(string(tx.Transaction.Signature.PublicKey)) + if !ok { + var e error + pub, e = crypto.NewPublicKeyFromBytes(tx.Transaction.Signature.PublicKey) + if e != nil { + return + } + // add the public key to the cache + s.cache.publicKey.Add(string(tx.Transaction.Signature.PublicKey), pub) } // check for a poll transaction if err := ap.CheckForPollTransaction(pub.Address(), tx.Transaction.Memo, s.Height()); err != nil { diff --git a/fsm/state.go b/fsm/state.go index fff39e87..d6acbb26 100644 --- a/fsm/state.go +++ b/fsm/state.go @@ -9,6 +9,7 @@ import ( "github.com/canopy-network/canopy/lib" "github.com/canopy-network/canopy/lib/crypto" + lru "github.com/hashicorp/golang-lru/v2" ) const ( @@ -37,14 +38,6 @@ type StateMachine struct { Plugin *lib.Plugin // extensible plugin for the FSM } -// cache is the set of items to be cached used by the state machine -type cache struct { - accounts map[uint64]*Account // cache of accounts accessed - feeParams *FeeParams // fee params for the current block - valParams *ValidatorParams // validator params for the current block - rootDexBatch *lib.DexBatch // root dex batch -} - // New() creates a new instance of a StateMachine func New(c lib.Config, store lib.StoreI, plugin *lib.Plugin, metrics *lib.Metrics, log lib.LoggerI) (*StateMachine, lib.ErrorI) { // create the state machine object reference @@ -59,9 +52,7 @@ func New(c lib.Config, store lib.StoreI, plugin *lib.Plugin, metrics *lib.Metric Plugin: plugin, log: log, events: new(lib.EventsTracker), - cache: &cache{ - accounts: make(map[uint64]*Account), - }, + cache: newFSMCache(), } // initialize the state machine genesis, err := sm.Initialize(store) @@ -276,7 +267,7 @@ func (s *StateMachine) ApplyTransactions(ctx context.Context, txs [][]byte, r *l // add to the failed list r.AddFailed(lib.NewFailedTx(tx, e)) // discard the FSM cache - s.ResetCaches() + s.cache.Reset() // clear any events accumulated for the failed transaction to avoid leaking them to subsequent txs s.events.Reset() // restore slash tracker to its pre-transaction state @@ -518,6 +509,7 @@ func (s *StateMachine) Copy() (*StateMachine, lib.ErrorI) { log: s.log, cache: &cache{ accounts: make(map[uint64]*Account), + publicKey: s.cache.publicKey, rootDexBatch: s.cache.rootDexBatch, }, LastValidatorSet: s.LastValidatorSet, @@ -620,21 +612,11 @@ func (s *StateMachine) Reset() { // reset the slash tracker s.slashTracker = NewSlashTracker() // reset caches - s.ResetCaches() + s.cache.Reset() // reset the state store s.store.(lib.StoreI).Reset() } -// ResetCaches() dumps the state machine caches -func (s *StateMachine) ResetCaches() { - s.cache.accounts = make(map[uint64]*Account) - // Params caches must not outlive the current store view, otherwise Reset()/rollback - // can leave the FSM reading stale values that disagree with the underlying store. - s.cache.valParams = nil - s.cache.feeParams = nil - s.cache.rootDexBatch = nil -} - // nonEmptyHash() ensures the hash isn't empty // substituting a dummy hash in its place func nonEmptyHash(h []byte) []byte { @@ -725,3 +707,30 @@ func (s *StateMachine) StateWrite(request *lib.PluginStateWriteRequest) (respons } return } + +// cache is the set of items to be cached used by the state machine +type cache struct { + accounts map[uint64]*Account // cache of accounts accessed + feeParams *FeeParams // fee params for the current block + valParams *ValidatorParams // validator params for the current block + publicKey *lru.Cache[string, crypto.PublicKeyI] // public keys for block processing + rootDexBatch *lib.DexBatch // root dex batch + chainDexBatch map[string]*lib.DexBatch // dex local batches keyed by chain id +} + +// newFSMCache() creates a new instance of the state machine cache with required initialized values +func newFSMCache() *cache { + publicKeyCache, _ := lru.New[string, crypto.PublicKeyI](10_000) + return &cache{ + accounts: make(map[uint64]*Account), + publicKey: publicKeyCache, + chainDexBatch: make(map[string]*lib.DexBatch), + } +} + +// Reset() resets the cache by reinitializing it +func (c *cache) Reset() { + // Params caches must not outlive the current store view, otherwise Reset()/rollback + // can leave the FSM reading stale values that disagree with the underlying store. + *c = *newFSMCache() +} diff --git a/fsm/state_test.go b/fsm/state_test.go index 7b296ddb..13b2039f 100644 --- a/fsm/state_test.go +++ b/fsm/state_test.go @@ -349,9 +349,7 @@ func newTestStateMachine(t *testing.T) StateMachine { }, events: new(lib.EventsTracker), log: log, - cache: &cache{ - accounts: make(map[uint64]*Account), - }, + cache: newFSMCache(), } require.NoError(t, sm.SetParams(DefaultParams())) db.Commit() diff --git a/fsm/swap.go b/fsm/swap.go index f97dbdd8..aa0c7b46 100644 --- a/fsm/swap.go +++ b/fsm/swap.go @@ -79,7 +79,7 @@ func (s *StateMachine) ParseCloseOrder(tx *lib.Transaction) (co *lib.CloseOrder, // ProcessRootChainOrderBook() processes the order book from the root-chain and cross-references blocks on this chain to determine // actions that warrant committee level changes to the root-chain order book like: LockOrder, ResetOrder and CloseOrder func (s *StateMachine) ProcessRootChainOrderBook(book *lib.OrderBook, proposalBlock *lib.BlockResult) (lockOrders []*lib.LockOrder, closedOrders, resetOrders [][]byte) { - if book == nil { + if book == nil || len(book.Orders) == 0 { return } blocks := []*lib.BlockResult{proposalBlock} diff --git a/fsm/transaction.go b/fsm/transaction.go index e1fbff2e..e9eae866 100644 --- a/fsm/transaction.go +++ b/fsm/transaction.go @@ -1,11 +1,13 @@ package fsm import ( + "time" + + "math" + "github.com/canopy-network/canopy/lib" "github.com/canopy-network/canopy/lib/crypto" "google.golang.org/protobuf/types/known/anypb" - "math" - "time" ) /* This file contains transaction handling logic - for the payload handling check message.go */ @@ -162,10 +164,17 @@ func (s *StateMachine) CheckSignature(tx *lib.Transaction, authorizedSigners [][ if err != nil { return nil, ErrTxSignBytes(err) } - // convert signature bytes to public key object - publicKey, e := crypto.NewPublicKeyFromBytes(tx.Signature.PublicKey) - if e != nil { - return nil, ErrInvalidPublicKey(e) + // try to obtain the public key from the cache + var e error + publicKey, ok := s.cache.publicKey.Get(string(tx.Signature.PublicKey)) + if !ok { + // convert signature bytes to public key object + publicKey, e = crypto.NewPublicKeyFromBytes(tx.Signature.PublicKey) + if e != nil { + return nil, ErrInvalidPublicKey(e) + } + // add the public key to the cache + s.cache.publicKey.Add(string(tx.Signature.PublicKey), publicKey) } // special case: check for a special RLP transaction if _, hasEthPubKey := publicKey.(*crypto.ETHSECP256K1PublicKey); hasEthPubKey && tx.Memo == RLPIndicator { diff --git a/lib/mempool.go b/lib/mempool.go index 387f979d..c22a2b26 100644 --- a/lib/mempool.go +++ b/lib/mempool.go @@ -1,12 +1,13 @@ package lib import ( - "github.com/canopy-network/canopy/lib/crypto" "maps" "math" "sort" "sync" "time" + + "github.com/canopy-network/canopy/lib/crypto" ) /* This file defines and implements a mempool that maintains an ordered list of 'valid, pending to be included' transactions in memory */ diff --git a/store/store.go b/store/store.go index 160ada9e..850bf3b9 100644 --- a/store/store.go +++ b/store/store.go @@ -495,7 +495,7 @@ func (s *Store) Root() (root []byte, err lib.ErrorI) { // set up the state commit store s.sc = NewDefaultSMT(NewTxn(s.ss.reader, s.ss.writer, stateCommitIDPrefix, false, false, true, nextVersion)) // commit the SMT directly using the txn ops - if err = s.sc.Commit(s.ss.txn.ops); err != nil { + if err = s.sc.CommitParallel(s.ss.txn.ops); err != nil { return nil, err } }