Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bft/bft.go
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,7 @@ type (
CommitCertificate(qc *lib.QuorumCertificate, block *lib.Block, blockResult *lib.BlockResult, ts uint64) (err lib.ErrorI)
// GossipBlock() is a P2P call to gossip a completed Quorum Certificate with a Proposal
GossipBlock(certificate *lib.QuorumCertificate, sender []byte, timestamp uint64)
// GossipConsensus() is a P2P call to gossip a completed Quorum Certificate with a Proposal
// GossipConsensus() is a P2P call to gossip a consensus message
GossipConsensus(message *Message, senderPubExclude []byte)
// SendToSelf() is a P2P call to directly send a completed Quorum Certificate to self
SelfSendBlock(qc *lib.QuorumCertificate, timestamp uint64)
Expand Down
12 changes: 8 additions & 4 deletions cmd/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,14 +430,18 @@ func (c *Client) Transaction(tx lib.TransactionI) (hash *string, err lib.ErrorI)
return
}

func (c *Client) Transactions(txs []lib.TransactionI) (hash *string, err lib.ErrorI) {
func (c *Client) Transactions(txs []lib.TransactionI) (hashes []*string, err lib.ErrorI) {
bz, err := lib.MarshalJSON(txs)
if err != nil {
return nil, err
}
hash = new(string)
err = c.post(TxsRouteName, bz, hash)
return
// a single transaction returns a single string hash
if len(txs) == 1 {
hash := new(string)
err = c.post(TxsRouteName, bz, hash)
return []*string{hash}, err
}
return hashes, c.post(TxsRouteName, bz, &hashes)
}

func (c *Client) Keystore() (keystore *crypto.Keystore, err lib.ErrorI) {
Expand Down
9 changes: 7 additions & 2 deletions cmd/rpc/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,18 @@ func (s *Server) Transaction(w http.ResponseWriter, r *http.Request, _ httproute
// Transactions handles multiple transactions in a single request
func (s *Server) Transactions(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
// create a slice to hold the incoming transactions
var txs []lib.TransactionI
var txs []lib.Transaction
// unmarshal the HTTP request body into the transactions slice
if ok := unmarshal(w, r, &txs); !ok {
return
}
// cast txs to lib.TransactionI
txsI := make([]lib.TransactionI, len(txs))
for i := range txs {
txsI[i] = &txs[i]
}
// submit transactions to RPC server
s.submitTxs(w, txs)
s.submitTxs(w, txsI)
}

// Height responds with the next block version
Expand Down
7 changes: 4 additions & 3 deletions cmd/rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (s *Server) startHeapProfiler() {
}
}

// submitTxs submits transactions to the controller and writes http response
// submitTx submits transactions to the controller and writes http response
func (s *Server) submitTxs(w http.ResponseWriter, txs []lib.TransactionI) (ok bool) {
// marshal each transaction to bytes
var txBytes [][]byte
Expand All @@ -215,9 +215,10 @@ func (s *Server) submitTxs(w http.ResponseWriter, txs []lib.TransactionI) (ok bo
return
}
// return hashes of all submitted transactions
var hashes []string
var hashes []*string
for _, bz := range txBytes {
hashes = append(hashes, crypto.HashString(bz))
hash := crypto.HashString(bz)
hashes = append(hashes, &hash)
}
// if only one transaction was submitted, return the hash as a string
if len(hashes) == 1 {
Expand Down
2 changes: 1 addition & 1 deletion controller/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ func (c *Controller) HandlePeerBlock(msg *lib.BlockMessage, syncing bool) (*lib.
result = nil
}
// attempts to commit the QC to persistence of chain by playing it against the state machine
if err = c.CommitCertificate(qc, block, result, msg.Time); err != nil {
if err = c.CommitCertificateParallel(qc, block, result, msg.Time); err != nil {
// exit with error
return nil, err
}
Expand Down
14 changes: 7 additions & 7 deletions controller/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ func (c *Controller) ListenForTx() {
// exit
continue
}
func() {
// check and add the message to the cache to prevent duplicates
if ok := cache.Add(msg); !ok {
// if duplicate, exit
return
}
c.log.Debug("Handling transaction")
// check and add the message to the cache to prevent duplicates
if ok := cache.Add(msg); !ok {
// if duplicate, exit
continue
}
go func() {
// c.log.Debug("Handling transaction async")
// create a convenience variable for the identity of the sender
senderID := msg.Sender.Address.PublicKey
// try to unmarshal the p2p message as a tx message
Expand Down
3 changes: 2 additions & 1 deletion fsm/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (
"encoding/json"
"strings"

"sort"

"github.com/canopy-network/canopy/lib"
"github.com/canopy-network/canopy/lib/crypto"
"sort"
)

/* This file defines the account, pool, and supply tracker state interactions */
Expand Down
44 changes: 30 additions & 14 deletions fsm/dex.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package fsm

import (
"bytes"
"github.com/canopy-network/canopy/lib"
"github.com/canopy-network/canopy/lib/crypto"
"math/big"
"sort"
"strings"

"github.com/canopy-network/canopy/lib"
"github.com/canopy-network/canopy/lib/crypto"
)

/* Dex.go implements logic to handle AMM style atomic exchanges between root & nested chains
Expand Down Expand Up @@ -558,6 +559,8 @@ func (s *StateMachine) RotateDexBatches(receiptsHash []byte, lPoolSize, counterP
if err = s.Delete(KeyForNextBatch(chainId)); err != nil {
return
}
// delete it from the cache too
delete(s.cache.chainDexBatch, lib.BytesToString(KeyForNextBatch(chainId)))
// set the upcoming sell batch as 'last'
return s.SetDexBatch(KeyForLockedBatch(chainId), nextSellBatch)
}
Expand Down Expand Up @@ -663,7 +666,11 @@ func (s *StateMachine) SetDexBatch(key []byte, b *lib.DexBatch) (err lib.ErrorI)
if err != nil {
return
}
return s.Set(key, value)
if err := s.Set(key, value); err != nil {
return err
}
s.cache.chainDexBatch[lib.BytesToString(key)] = b
return nil
}

// GetDexBatch() retrieves a sell batch from the state store
Expand All @@ -673,11 +680,16 @@ func (s *StateMachine) GetDexBatch(chainId uint64, locked bool, withPoints ...bo
if locked {
key = KeyForLockedBatch(chainId)
}
// get bytes from state
bz, err := s.Get(key)
if err != nil {
return
}
// ensures the pool points are attached to the batch whether
// the batch is retrieved from cache or state
defer func() {
if err == nil && len(withPoints) == 1 && withPoints[0] {
// set the pool points
b.PoolPoints = lPool.Points
// set total pool points
b.TotalPoolPoints = lPool.TotalPoolPoints
}
}()
// retrieve the liquidity pool from state
lPool, err = s.GetPool(chainId + LiquidityPoolAddend)
if err != nil {
Expand All @@ -686,19 +698,23 @@ func (s *StateMachine) GetDexBatch(chainId uint64, locked bool, withPoints ...bo
// create a new batch object reference to ensure no 'nil' batches are used
b = &lib.DexBatch{Committee: chainId, PoolSize: lPool.Amount}
defer b.EnsureNonNil()
// first, try to obtain the current batch from the cache to avoid unnecessary unmarshalling
if cached, ok := s.cache.chainDexBatch[lib.BytesToString(key)]; ok {
b = cached
return
}
// otherwise, get bytes from state
bz, err := s.Get(key)
if err != nil {
return
}
// check for nil bytes
if len(bz) == 0 {
return
}
// populate the batch object with the bytes
err = lib.Unmarshal(bz, b)
// check if points should be attached
if len(withPoints) == 1 && withPoints[0] {
// set the pool points
b.PoolPoints = lPool.Points
// set total pool points
b.TotalPoolPoints = lPool.TotalPoolPoints
}
// exit
return
}
Expand Down
7 changes: 6 additions & 1 deletion fsm/dex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package fsm
import (
"bytes"
"fmt"
"github.com/canopy-network/canopy/lib/crypto"
"math"
"math/rand"
"os"
Expand All @@ -12,6 +11,8 @@ import (
"testing"
"time"

"github.com/canopy-network/canopy/lib/crypto"

"github.com/canopy-network/canopy/lib"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -2664,8 +2665,10 @@ func newDexChain(t *testing.T, chainId, counterId uint64, rng *rand.Rand) *dexCh
}

func (s *dexSim) advanceHeight() {
s.chainX.sm.cache.Reset()
s.chainX.sm.height++
s.chainY.sm.height++
s.chainY.sm.cache.Reset()
}

func genOps(sim *dexSim, rng *rand.Rand) {
Expand Down Expand Up @@ -3574,6 +3577,8 @@ func clearLocks(t *testing.T, chain1, chain2 StateMachine, chain1Id, chain2Id ui
require.NoError(t, chain1.Delete(KeyForNextBatch(chain2Id)))
require.NoError(t, chain2.Delete(KeyForLockedBatch(chain1Id)))
require.NoError(t, chain2.Delete(KeyForNextBatch(chain1Id)))
chain1.cache.Reset()
chain2.cache.Reset()
}

func TestGetPriceUsesBigIntForE6ScaledPrice(t *testing.T) {
Expand Down
14 changes: 10 additions & 4 deletions fsm/gov.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,10 +382,16 @@ func (s *StateMachine) ParsePollTransactions(b *lib.BlockResult) {
}
// for each transaction in the block
for _, tx := range b.Transactions {
// get the public key object
pub, e := crypto.NewPublicKeyFromBytes(tx.Transaction.Signature.PublicKey)
if e != nil {
return
// get the public key object from the cache
pub, ok := s.cache.publicKey.Get(string(tx.Transaction.Signature.PublicKey))
if !ok {
var e error
pub, e = crypto.NewPublicKeyFromBytes(tx.Transaction.Signature.PublicKey)
if e != nil {
return
}
// add the public key to the cache
s.cache.publicKey.Add(string(tx.Transaction.Signature.PublicKey), pub)
}
// check for a poll transaction
if err := ap.CheckForPollTransaction(pub.Address(), tx.Transaction.Memo, s.Height()); err != nil {
Expand Down
55 changes: 32 additions & 23 deletions fsm/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/canopy-network/canopy/lib"
"github.com/canopy-network/canopy/lib/crypto"
lru "github.com/hashicorp/golang-lru/v2"
)

const (
Expand Down Expand Up @@ -37,14 +38,6 @@ type StateMachine struct {
Plugin *lib.Plugin // extensible plugin for the FSM
}

// cache is the set of items to be cached used by the state machine
type cache struct {
accounts map[uint64]*Account // cache of accounts accessed
feeParams *FeeParams // fee params for the current block
valParams *ValidatorParams // validator params for the current block
rootDexBatch *lib.DexBatch // root dex batch
}

// New() creates a new instance of a StateMachine
func New(c lib.Config, store lib.StoreI, plugin *lib.Plugin, metrics *lib.Metrics, log lib.LoggerI) (*StateMachine, lib.ErrorI) {
// create the state machine object reference
Expand All @@ -59,9 +52,7 @@ func New(c lib.Config, store lib.StoreI, plugin *lib.Plugin, metrics *lib.Metric
Plugin: plugin,
log: log,
events: new(lib.EventsTracker),
cache: &cache{
accounts: make(map[uint64]*Account),
},
cache: newFSMCache(),
}
// initialize the state machine
genesis, err := sm.Initialize(store)
Expand Down Expand Up @@ -276,7 +267,7 @@ func (s *StateMachine) ApplyTransactions(ctx context.Context, txs [][]byte, r *l
// add to the failed list
r.AddFailed(lib.NewFailedTx(tx, e))
// discard the FSM cache
s.ResetCaches()
s.cache.Reset()
// clear any events accumulated for the failed transaction to avoid leaking them to subsequent txs
s.events.Reset()
// restore slash tracker to its pre-transaction state
Expand Down Expand Up @@ -518,6 +509,7 @@ func (s *StateMachine) Copy() (*StateMachine, lib.ErrorI) {
log: s.log,
cache: &cache{
accounts: make(map[uint64]*Account),
publicKey: s.cache.publicKey,
rootDexBatch: s.cache.rootDexBatch,
},
LastValidatorSet: s.LastValidatorSet,
Expand Down Expand Up @@ -620,21 +612,11 @@ func (s *StateMachine) Reset() {
// reset the slash tracker
s.slashTracker = NewSlashTracker()
// reset caches
s.ResetCaches()
s.cache.Reset()
// reset the state store
s.store.(lib.StoreI).Reset()
}

// ResetCaches() dumps the state machine caches
func (s *StateMachine) ResetCaches() {
s.cache.accounts = make(map[uint64]*Account)
// Params caches must not outlive the current store view, otherwise Reset()/rollback
// can leave the FSM reading stale values that disagree with the underlying store.
s.cache.valParams = nil
s.cache.feeParams = nil
s.cache.rootDexBatch = nil
}

// nonEmptyHash() ensures the hash isn't empty
// substituting a dummy hash in its place
func nonEmptyHash(h []byte) []byte {
Expand Down Expand Up @@ -725,3 +707,30 @@ func (s *StateMachine) StateWrite(request *lib.PluginStateWriteRequest) (respons
}
return
}

// cache is the set of items to be cached used by the state machine
type cache struct {
accounts map[uint64]*Account // cache of accounts accessed
feeParams *FeeParams // fee params for the current block
valParams *ValidatorParams // validator params for the current block
publicKey *lru.Cache[string, crypto.PublicKeyI] // public keys for block processing
rootDexBatch *lib.DexBatch // root dex batch
chainDexBatch map[string]*lib.DexBatch // dex local batches keyed by chain id
}

// newFSMCache() creates a new instance of the state machine cache with required initialized values
func newFSMCache() *cache {
publicKeyCache, _ := lru.New[string, crypto.PublicKeyI](10_000)
return &cache{
accounts: make(map[uint64]*Account),
publicKey: publicKeyCache,
chainDexBatch: make(map[string]*lib.DexBatch),
}
}

// Reset() resets the cache by reinitializing it
func (c *cache) Reset() {
// Params caches must not outlive the current store view, otherwise Reset()/rollback
// can leave the FSM reading stale values that disagree with the underlying store.
*c = *newFSMCache()
}
4 changes: 1 addition & 3 deletions fsm/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,9 +349,7 @@ func newTestStateMachine(t *testing.T) StateMachine {
},
events: new(lib.EventsTracker),
log: log,
cache: &cache{
accounts: make(map[uint64]*Account),
},
cache: newFSMCache(),
}
require.NoError(t, sm.SetParams(DefaultParams()))
db.Commit()
Expand Down
2 changes: 1 addition & 1 deletion fsm/swap.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (s *StateMachine) ParseCloseOrder(tx *lib.Transaction) (co *lib.CloseOrder,
// ProcessRootChainOrderBook() processes the order book from the root-chain and cross-references blocks on this chain to determine
// actions that warrant committee level changes to the root-chain order book like: LockOrder, ResetOrder and CloseOrder
func (s *StateMachine) ProcessRootChainOrderBook(book *lib.OrderBook, proposalBlock *lib.BlockResult) (lockOrders []*lib.LockOrder, closedOrders, resetOrders [][]byte) {
if book == nil {
if book == nil || len(book.Orders) == 0 {
return
}
blocks := []*lib.BlockResult{proposalBlock}
Expand Down
Loading
Loading