From 5461e9eac19ccd8f5d29abc52c420e2a458a2c71 Mon Sep 17 00:00:00 2001 From: Lazy Nina Date: Mon, 14 Jul 2025 15:06:00 -0400 Subject: [PATCH 1/6] invert dao coin limit order prefix to avoid reverse iteration --- lib/block_view_dao_coin_limit_order_test.go | 2 +- lib/db_utils.go | 19 ++++++++++++++----- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/lib/block_view_dao_coin_limit_order_test.go b/lib/block_view_dao_coin_limit_order_test.go index 07d214725..cd87c4d38 100644 --- a/lib/block_view_dao_coin_limit_order_test.go +++ b/lib/block_view_dao_coin_limit_order_test.go @@ -1152,7 +1152,7 @@ func TestDAOCoinLimitOrder(t *testing.T) { orderEntries, err = dbAdapter.GetAllDAOCoinLimitOrders() require.NoError(err) require.Equal(len(orderEntries), 2) - require.True(orderEntries[1].Eq(metadataM1.ToEntry(m1PKID.PKID, savedHeight, toPKID))) + require.True(orderEntries[0].Eq(metadataM1.ToEntry(m1PKID.PKID, savedHeight, toPKID))) // m1 submits order buying 5 $DESO nanos @ 12 DAO coin / $DESO. _doDAOCoinLimitOrderTxnWithTestMeta(testMeta, feeRateNanosPerKb, m1Pub, m1Priv, metadataM1) diff --git a/lib/db_utils.go b/lib/db_utils.go index 5ca9ef24d..fb7f41367 100644 --- a/lib/db_utils.go +++ b/lib/db_utils.go @@ -10073,14 +10073,23 @@ func DBGetPaginatedProfilesByDeSoLocked( func DBKeyForDAOCoinLimitOrder(order *DAOCoinLimitOrderEntry) []byte { key := DBPrefixKeyForDAOCoinLimitOrder(order) - key = append(key, VariableEncodeUint256(order.ScaledExchangeRateCoinsToSellPerCoinToBuy)...) - // Store MaxUint32 - block height to guarantee FIFO - // orders as we seek in reverse order. - key = append(key, _EncodeUint32(math.MaxUint32-order.BlockHeight)...) + // Store MaxUint256 - ScaledExchangeRateCoinsToSellPerCoinToBuy so we don't have to iterate in reverse. + key = append(key, FixedWidthEncodeUint256(uint256.NewInt(0).Sub(MaxUint256, order.ScaledExchangeRateCoinsToSellPerCoinToBuy))...) + key = append(key, _EncodeUint32(order.BlockHeight)...) key = append(key, order.OrderID.ToBytes()...) return key } +//func DBKeyForDAOCoinLimitOrder(order *DAOCoinLimitOrderEntry) []byte { +// key := DBPrefixKeyForDAOCoinLimitOrder(order) +// key = append(key, VariableEncodeUint256(order.ScaledExchangeRateCoinsToSellPerCoinToBuy)...) +// // Store MaxUint32 - block height to guarantee FIFO +// // orders as we seek in reverse order. +// key = append(key, _EncodeUint32(math.MaxUint32-order.BlockHeight)...) +// key = append(key, order.OrderID.ToBytes()...) +// return key +//} + func DBPrefixKeyForDAOCoinLimitOrder(order *DAOCoinLimitOrderEntry) []byte { key := append([]byte{}, Prefixes.PrefixDAOCoinLimitOrder...) key = append(key, order.BuyingDAOCoinCreatorPKID.ToBytes()...) @@ -10175,7 +10184,7 @@ func DBGetMatchingDAOCoinLimitOrders( // We break once we hit the input order's inverted scaled // price or the input order's quantity is fulfilled. opts := badger.DefaultIteratorOptions - opts.Reverse = true + //opts.Reverse = true opts.Prefix = prefixKey opts.PrefetchValues = false iterator := txn.NewIterator(opts) From f73ef707ffa1c8ad095789bb6dc31e8cfe31feed Mon Sep 17 00:00:00 2001 From: Lazy Nina Date: Mon, 14 Jul 2025 17:43:21 -0400 Subject: [PATCH 2/6] update key to avoid fork, add migration to migrate keys to new prefix --- lib/blockchain.go | 2 +- lib/constants.go | 1 + lib/db_utils.go | 145 +++++++++++++++++++++++++++++++++++++++++----- lib/server.go | 33 ++++++++++- 4 files changed, 164 insertions(+), 17 deletions(-) diff --git a/lib/blockchain.go b/lib/blockchain.go index 51b832550..e8dddb7fe 100644 --- a/lib/blockchain.go +++ b/lib/blockchain.go @@ -1135,7 +1135,7 @@ func NewBlockchain( checkpointSyncingProviders []string, blockIndexSize int, ) (*Blockchain, error) { - if err := RunBlockIndexMigrationOnce(db, params); err != nil { + if err := RunDBMigrationsOnce(db, snapshot, params); err != nil { return nil, errors.Wrapf(err, "NewBlockchain: Problem running block index migration") } diff --git a/lib/constants.go b/lib/constants.go index 2d8828eeb..9a73abf3c 100644 --- a/lib/constants.go +++ b/lib/constants.go @@ -1959,6 +1959,7 @@ const RoutePathGetCommittedTipBlockInfo = "/api/v0/get-committed-tip-block-info" // that indicates whether the block index migration has been run. See RunBlockIndexMigrationOnce // for more information. const BlockIndexMigrationFileName = "block_index_migration.txt" +const DAOCoinLimitOrderMigrationFileName = "dao_coin_limit_order_migration.txt" // BtcecPubKeyBytesLenUncompressed is a constant that was removed from newer version of Btcec const BtcecPubKeyBytesLenUncompressed = 65 diff --git a/lib/db_utils.go b/lib/db_utils.go index fb7f41367..df347ca78 100644 --- a/lib/db_utils.go +++ b/lib/db_utils.go @@ -328,9 +328,10 @@ type DBPrefixes struct { // _PrefixDAOCoinLimitOrderByOrderID // OrderID [32]byte // > -> - PrefixDAOCoinLimitOrder []byte `prefix_id:"[60]" is_state:"true" core_state:"true"` + PrefixDAOCoinLimitOrder []byte `prefix_id:"[60]" is_state:"true"` PrefixDAOCoinLimitOrderByTransactorPKID []byte `prefix_id:"[61]" is_state:"true"` PrefixDAOCoinLimitOrderByOrderID []byte `prefix_id:"[62]" is_state:"true"` + PrefixNewDAOCoinLimitOrder []byte `prefix_id:"[99]" is_state:"true" core_state:"true"` // User Association prefixes // PrefixUserAssociationByID: @@ -615,7 +616,7 @@ type DBPrefixes struct { // to find a block node given its hash was to do a full scan of // PrefixHeightHashToNodeInfo. PrefixHashToHeight []byte `prefix_id:"[98]"` - // NEXT_TAG: 99 + // NEXT_TAG: 100 } // DecodeStateKey decodes a state key into a DeSoEncoder type. This is useful for encoders which don't have a stored @@ -915,6 +916,9 @@ func StatePrefixToDeSoEncoder(prefix []byte) (_isEncoder bool, _encoder DeSoEnco } else if bytes.Equal(prefix, Prefixes.PrefixSnapshotValidatorBLSPublicKeyPKIDPairEntry) { // prefix_id:"[96]" return true, &BLSPublicKeyPKIDPairEntry{} + } else if bytes.Equal(prefix, Prefixes.PrefixNewDAOCoinLimitOrder) { + // prefix_id:"[99]" + return true, &DAOCoinLimitOrderEntry{} } return true, nil @@ -5887,6 +5891,82 @@ func RunBlockIndexMigration(handle *badger.DB, snapshot *Snapshot, eventManager }) } +// RunDAOCoinLimitOrderMigration runs a migration to update the key used in the DAOCoinLimitOrderPrefix to +// avoid reverse iteration. +func RunDAOCoinLimitOrderMigration( + handle *badger.DB, + snapshot *Snapshot, + eventManager *EventManager, +) error { + bestHash := DbGetBestHash(handle, snapshot, ChainTypeDeSoBlock) + blockHeight := uint64(0) + if bestHash != nil { + var err error + blockHeight, err = GetHeightForHash(handle, snapshot, bestHash) + if err != nil { + return errors.Wrapf(err, "RunDAOCoinLimitOrderMigration: Problem getting height for best hash %v", bestHash) + } + } + return handle.Update(func(txn *badger.Txn) error { + + // Get the prefix for the height hash to node index. + prefix := append([]byte{}, Prefixes.PrefixDAOCoinLimitOrder...) + opts := badger.DefaultIteratorOptions + opts.Prefix = prefix + // We don't need values for this migration since the height and hash are in the key. + opts.PrefetchValues = false + nodeIterator := txn.NewIterator(opts) + defer nodeIterator.Close() + // Initialize a map to store the dao coin limit orders. + ordersMap := make(map[BlockHash]*DAOCoinLimitOrderEntry) + // Iterate over all the keys in the height hash to node index, extract the height and hash, + // and batch write every 10k entries to the hash to height index. + startTime := time.Now() + ctr := 0 + for nodeIterator.Seek(prefix); nodeIterator.ValidForPrefix(prefix); nodeIterator.Next() { + item := nodeIterator.Item().Key() + value, err := nodeIterator.Item().ValueCopy(nil) + if err != nil { + return errors.Wrapf(err, "RunDAOCoinLimitOrderMigration: Problem getting value for key %v", item) + } + + // Parse the value to get the DAOCoinLimitOrderEntry. + order := &DAOCoinLimitOrderEntry{} + rr := bytes.NewReader(value) + if exist, err := DecodeFromBytes(order, rr); !exist || err != nil { + return errors.Wrapf(err, "RunDAOCoinLimitOrderMigration: decoding order from bytes for key %v", item) + } + + // Add to the map + ordersMap[*order.OrderID] = order + + // If we have fewer than 10K entries, continue. + if len(ordersMap) < 10000 { + continue + } + ctr++ + if ctr%10 == 0 { + glog.V(0).Infof("Time to run DAOCoinLimitOrderMigration for %v orders: %v", ctr*10000, time.Since(startTime)) + } + // If we have more than 10K entries, batch write the entries to the hash to height index + // and reset the map. + innerErr := MigrateDAOCoinLimitOrderBatch(handle, snapshot, blockHeight, ordersMap, eventManager) + if innerErr != nil { + return errors.Wrap(innerErr, "RunDAOCoinLimitOrderMigration: Problem migrating DAO coin limit order batch") + } + ordersMap = make(map[BlockHash]*DAOCoinLimitOrderEntry) + } + // If we have any entries left in the map, batch write them to the hash to height index. + if len(ordersMap) > 0 { + innerErr := MigrateDAOCoinLimitOrderBatch(handle, snapshot, blockHeight, ordersMap, eventManager) + if innerErr != nil { + return errors.Wrap(innerErr, "RunDAOCoinLimitOrderMigration: Problem migrating DAO coin limit order batch") + } + } + return nil + }) +} + // TODO: refactor to actually get the whole best chain if that's // what someone wants. It'll take a while and a lot of memory. func GetBestChain(tipNode *BlockNode, blockIndex *BlockIndex) ([]*BlockNode, error) { @@ -10076,21 +10156,38 @@ func DBKeyForDAOCoinLimitOrder(order *DAOCoinLimitOrderEntry) []byte { // Store MaxUint256 - ScaledExchangeRateCoinsToSellPerCoinToBuy so we don't have to iterate in reverse. key = append(key, FixedWidthEncodeUint256(uint256.NewInt(0).Sub(MaxUint256, order.ScaledExchangeRateCoinsToSellPerCoinToBuy))...) key = append(key, _EncodeUint32(order.BlockHeight)...) - key = append(key, order.OrderID.ToBytes()...) + // Store MaxBlockHash - OrderID so we don't have to iterate in reverse. + key = append(key, invertByteSlice(order.OrderID.ToBytes())...) return key } -//func DBKeyForDAOCoinLimitOrder(order *DAOCoinLimitOrderEntry) []byte { -// key := DBPrefixKeyForDAOCoinLimitOrder(order) -// key = append(key, VariableEncodeUint256(order.ScaledExchangeRateCoinsToSellPerCoinToBuy)...) -// // Store MaxUint32 - block height to guarantee FIFO -// // orders as we seek in reverse order. -// key = append(key, _EncodeUint32(math.MaxUint32-order.BlockHeight)...) -// key = append(key, order.OrderID.ToBytes()...) -// return key -//} +// Invert a byte slice by subtracting each byte from 0xff. +func invertByteSlice(a []byte) []byte { + result := make([]byte, len(a)) + for i := range a { + result[i] = 0xff - a[i] + } + return result +} + +func OLDDBKeyForDAOCoinLimitOrder(order *DAOCoinLimitOrderEntry) []byte { + key := DBPrefixKeyForDAOCoinLimitOrder(order) + key = append(key, VariableEncodeUint256(order.ScaledExchangeRateCoinsToSellPerCoinToBuy)...) + // Store MaxUint32 - block height to guarantee FIFO + // orders as we seek in reverse order. + key = append(key, _EncodeUint32(math.MaxUint32-order.BlockHeight)...) + key = append(key, order.OrderID.ToBytes()...) + return key +} func DBPrefixKeyForDAOCoinLimitOrder(order *DAOCoinLimitOrderEntry) []byte { + key := append([]byte{}, Prefixes.PrefixNewDAOCoinLimitOrder...) + key = append(key, order.BuyingDAOCoinCreatorPKID.ToBytes()...) + key = append(key, order.SellingDAOCoinCreatorPKID.ToBytes()...) + return key +} + +func DBPrefixKeyForOldDAOCoinLimitOrder(order *DAOCoinLimitOrderEntry) []byte { key := append([]byte{}, Prefixes.PrefixDAOCoinLimitOrder...) key = append(key, order.BuyingDAOCoinCreatorPKID.ToBytes()...) key = append(key, order.SellingDAOCoinCreatorPKID.ToBytes()...) @@ -10239,7 +10336,7 @@ func DBGetMatchingDAOCoinLimitOrders( func DBGetAllDAOCoinLimitOrders(handle *badger.DB) ([]*DAOCoinLimitOrderEntry, error) { // Get all DAO Coin limit orders. - key := append([]byte{}, Prefixes.PrefixDAOCoinLimitOrder...) + key := append([]byte{}, Prefixes.PrefixNewDAOCoinLimitOrder...) return _DBGetAllDAOCoinLimitOrdersByPrefix(handle, key) } @@ -10249,7 +10346,7 @@ func DBGetAllDAOCoinLimitOrdersForThisDAOCoinPair( sellingDAOCoinCreatorPKID *PKID) ([]*DAOCoinLimitOrderEntry, error) { // Get all DAO coin limit orders for this DAO coin pair. - key := append([]byte{}, Prefixes.PrefixDAOCoinLimitOrder...) + key := append([]byte{}, Prefixes.PrefixNewDAOCoinLimitOrder...) key = append(key, buyingDAOCoinCreatorPKID.ToBytes()...) key = append(key, sellingDAOCoinCreatorPKID.ToBytes()...) return _DBGetAllDAOCoinLimitOrdersByPrefix(handle, key) @@ -10377,6 +10474,26 @@ func DBUpsertDAOCoinLimitOrderWithTxn( return nil } +func MigrateDAOCoinLimitOrderBatch(handle *badger.DB, snapshot *Snapshot, blockHeight uint64, orders map[BlockHash]*DAOCoinLimitOrderEntry, eventManager *EventManager) error { + return handle.Update(func(txn *badger.Txn) error { + for _, order := range orders { + orderBytes := EncodeToBytes(blockHeight, order) + // Store in index: PrefixDAOCoinLimitOrderByTransactorPKID + key := DBKeyForDAOCoinLimitOrder(order) + + if err := DBSetWithTxn(txn, snapshot, key, orderBytes, eventManager); err != nil { + return errors.Wrapf(err, "PutDAOCoinLimitOrderBatch: problem storing limit order") + } + + oldKey := OLDDBKeyForDAOCoinLimitOrder(order) + if err := DBDeleteWithTxn(txn, snapshot, oldKey, eventManager, false); err != nil { + return errors.Wrapf(err, "PutDAOCoinLimitOrderBatch: problem deleting old limit order") + } + } + return nil + }) +} + func DBDeleteDAOCoinLimitOrderWithTxn(txn *badger.Txn, snap *Snapshot, order *DAOCoinLimitOrderEntry, eventManager *EventManager, entryIsDeleted bool) error { if order == nil { return nil diff --git a/lib/server.go b/lib/server.go index e6f1980f0..86c0ea7e4 100644 --- a/lib/server.go +++ b/lib/server.go @@ -361,9 +361,19 @@ func ValidateHyperSyncFlags(isHypersync bool, syncType NodeSyncType) { } } +func RunDBMigrationsOnce(db *badger.DB, snapshot *Snapshot, params *DeSoParams) error { + if err := RunBlockIndexMigrationOnce(db, snapshot, params); err != nil { + return errors.Wrapf(err, "RunDBMigrationsOnce: Problem running block index migration") + } + if err := RunDAOCoinLimitOrderMigrationOnce(db, snapshot); err != nil { + return errors.Wrapf(err, "RunDBMigrationsOnce: Problem running DAOCoin limit order migration") + } + return nil +} + // RunBlockIndexMigrationOnce runs the block index migration once and saves a file to // indicate that it has been run. -func RunBlockIndexMigrationOnce(db *badger.DB, params *DeSoParams) error { +func RunBlockIndexMigrationOnce(db *badger.DB, snapshot *Snapshot, params *DeSoParams) error { blockIndexMigrationFileName := filepath.Join(db.Opts().Dir, BlockIndexMigrationFileName) glog.V(2).Info("FileName: ", blockIndexMigrationFileName) hasRunMigration, err := ReadBoolFromFile(blockIndexMigrationFileName) @@ -372,7 +382,7 @@ func RunBlockIndexMigrationOnce(db *badger.DB, params *DeSoParams) error { return nil } glog.V(0).Info("Running block index migration") - if err = RunBlockIndexMigration(db, nil, nil, params); err != nil { + if err = RunBlockIndexMigration(db, snapshot, nil, params); err != nil { return errors.Wrapf(err, "Problem running block index migration") } if err = SaveBoolToFile(blockIndexMigrationFileName, true); err != nil { @@ -382,6 +392,25 @@ func RunBlockIndexMigrationOnce(db *badger.DB, params *DeSoParams) error { return nil } +func RunDAOCoinLimitOrderMigrationOnce(db *badger.DB, snapshot *Snapshot) error { + limitOrderMigrationFileName := filepath.Join(db.Opts().Dir, DAOCoinLimitOrderMigrationFileName) + glog.V(2).Info("FileName: ", limitOrderMigrationFileName) + hasRunMigration, err := ReadBoolFromFile(limitOrderMigrationFileName) + if err == nil && hasRunMigration { + glog.V(2).Info("DAOCoinLimitOrder index migration has already been run") + return nil + } + glog.V(0).Info("Running block index migration") + if err = RunDAOCoinLimitOrderMigration(db, snapshot, nil); err != nil { + return errors.Wrapf(err, "Problem running block index migration") + } + if err = SaveBoolToFile(limitOrderMigrationFileName, true); err != nil { + return errors.Wrapf(err, "Problem saving block index migration file") + } + glog.V(2).Info("Block index migration complete") + return nil +} + // NewServer initializes all of the internal data structures. Right now this basically // looks as follows: // - ConnectionManager starts and keeps track of peers. From 6ec7af9aa74bc4d1686279dfcbb46381b27beebe Mon Sep 17 00:00:00 2001 From: Lazy Nina Date: Mon, 14 Jul 2025 18:05:52 -0400 Subject: [PATCH 3/6] don't use snapshot in db migrations --- lib/server.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/server.go b/lib/server.go index 86c0ea7e4..ffe7e7226 100644 --- a/lib/server.go +++ b/lib/server.go @@ -362,10 +362,10 @@ func ValidateHyperSyncFlags(isHypersync bool, syncType NodeSyncType) { } func RunDBMigrationsOnce(db *badger.DB, snapshot *Snapshot, params *DeSoParams) error { - if err := RunBlockIndexMigrationOnce(db, snapshot, params); err != nil { + if err := RunBlockIndexMigrationOnce(db, nil, params); err != nil { return errors.Wrapf(err, "RunDBMigrationsOnce: Problem running block index migration") } - if err := RunDAOCoinLimitOrderMigrationOnce(db, snapshot); err != nil { + if err := RunDAOCoinLimitOrderMigrationOnce(db, nil); err != nil { return errors.Wrapf(err, "RunDBMigrationsOnce: Problem running DAOCoin limit order migration") } return nil From 9b3b21bd2d4b7332597afa85a459b585229a72e3 Mon Sep 17 00:00:00 2001 From: Lazy Nina Date: Mon, 14 Jul 2025 18:06:31 -0400 Subject: [PATCH 4/6] clean up logs in dao coin limit order migration --- lib/server.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/server.go b/lib/server.go index ffe7e7226..843f7451a 100644 --- a/lib/server.go +++ b/lib/server.go @@ -400,14 +400,14 @@ func RunDAOCoinLimitOrderMigrationOnce(db *badger.DB, snapshot *Snapshot) error glog.V(2).Info("DAOCoinLimitOrder index migration has already been run") return nil } - glog.V(0).Info("Running block index migration") + glog.V(0).Info("Running dao coin limit order index migration") if err = RunDAOCoinLimitOrderMigration(db, snapshot, nil); err != nil { - return errors.Wrapf(err, "Problem running block index migration") + return errors.Wrapf(err, "Problem running dao coin limit order index migration") } if err = SaveBoolToFile(limitOrderMigrationFileName, true); err != nil { - return errors.Wrapf(err, "Problem saving block index migration file") + return errors.Wrapf(err, "Problem saving dao coin limit order index migration file") } - glog.V(2).Info("Block index migration complete") + glog.V(2).Info("dao coin limit order index migration complete") return nil } From 718ac88444263907a7a5d29617786e07105f630e Mon Sep 17 00:00:00 2001 From: Lazy Nina Date: Mon, 14 Jul 2025 18:31:27 -0400 Subject: [PATCH 5/6] add logging for finished processing block for fast hot stuff consensus --- lib/server.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/server.go b/lib/server.go index 843f7451a..3115297c7 100644 --- a/lib/server.go +++ b/lib/server.go @@ -2594,6 +2594,10 @@ func (srv *Server) _handleBlock(pp *Peer, blk *MsgDeSoBlock, isLastBlock bool) { ))) blockHashesToRequest, err = srv.fastHotStuffConsensus.HandleBlock(pp, blk) isOrphan = len(blockHashesToRequest) > 0 + glog.V(0).Infof(CLog(Cyan, fmt.Sprintf( + "Server._handleBlock: Finished processing block %v with FastHotStuffConsensus with SyncState=%v for peer %v", + blk, srv.blockchain.chainState(), pp, + ))) } else if !verifySignatures { glog.V(0).Infof(CLog(Cyan, fmt.Sprintf( "Server._handleBlock: Processing block %v WITHOUT signature checking because SyncState=%v for peer %v", From 0392c3f81396c88cee98582a8f6e365b0ca26dc0 Mon Sep 17 00:00:00 2001 From: Lazy Nina Date: Tue, 15 Jul 2025 13:29:54 -0400 Subject: [PATCH 6/6] add comments and clean up code --- lib/blockchain.go | 2 +- lib/constants.go | 4 +++ lib/db_utils.go | 85 ++++++++++++++++++++++++++++++++--------------- lib/server.go | 41 ++++++++++++++++++----- 4 files changed, 96 insertions(+), 36 deletions(-) diff --git a/lib/blockchain.go b/lib/blockchain.go index e8dddb7fe..1ed21f172 100644 --- a/lib/blockchain.go +++ b/lib/blockchain.go @@ -1135,7 +1135,7 @@ func NewBlockchain( checkpointSyncingProviders []string, blockIndexSize int, ) (*Blockchain, error) { - if err := RunDBMigrationsOnce(db, snapshot, params); err != nil { + if err := RunDBMigrationsOnce(db, snapshot, eventManager, params); err != nil { return nil, errors.Wrapf(err, "NewBlockchain: Problem running block index migration") } diff --git a/lib/constants.go b/lib/constants.go index 9a73abf3c..1cf0aa473 100644 --- a/lib/constants.go +++ b/lib/constants.go @@ -1959,6 +1959,10 @@ const RoutePathGetCommittedTipBlockInfo = "/api/v0/get-committed-tip-block-info" // that indicates whether the block index migration has been run. See RunBlockIndexMigrationOnce // for more information. const BlockIndexMigrationFileName = "block_index_migration.txt" + +// DAOCoinLimitOrderMigrationFileName is the name of the file that contains a boolean value +// that indicates whether the DAOCoin limit order migration has been run. See RunDAOCoinLimitOrderMigrationOnce +// for more information. const DAOCoinLimitOrderMigrationFileName = "dao_coin_limit_order_migration.txt" // BtcecPubKeyBytesLenUncompressed is a constant that was removed from newer version of Btcec diff --git a/lib/db_utils.go b/lib/db_utils.go index df347ca78..e5400c6e2 100644 --- a/lib/db_utils.go +++ b/lib/db_utils.go @@ -303,6 +303,7 @@ type DBPrefixes struct { PrefixAuthorizeDerivedKey []byte `prefix_id:"[59]" is_state:"true" core_state:"true"` // Prefixes for DAO coin limit orders + // UPDATE: see new prefix below. // This index powers the order book. // < // _PrefixDAOCoinLimitOrder @@ -311,7 +312,7 @@ type DBPrefixes struct { // ScaledExchangeRateCoinsToSellPerCoinToBuy [32]byte // BlockHeight [32]byte // OrderID [32]byte - // > -> + // > -> // DEPRECATED // // This index allows users to query for their open orders. // < @@ -328,7 +329,19 @@ type DBPrefixes struct { // _PrefixDAOCoinLimitOrderByOrderID // OrderID [32]byte // > -> - PrefixDAOCoinLimitOrder []byte `prefix_id:"[60]" is_state:"true"` + // + // This index powers the order book. It allows us to + // iterate w/o reverse which speeds up processing of + // dao coin limit orders. + // < + // _PrefixNewDAOCoinLimitOrder + // BuyingDAOCoinCreatorPKID [33]byte + // SellingDAOCoinCreatorPKID [33]byte + // MaxUint256-ScaledExchangeRateCoinsToSellPerCoinToBuy [32]byte + // BlockHeight [32]byte + // MaxBlockHash-OrderID [32]byte + // > -> // DEPRECATED + PrefixDAOCoinLimitOrder []byte `prefix_id:"[60]" is_state:"true"` // Deprecated PrefixDAOCoinLimitOrderByTransactorPKID []byte `prefix_id:"[61]" is_state:"true"` PrefixDAOCoinLimitOrderByOrderID []byte `prefix_id:"[62]" is_state:"true"` PrefixNewDAOCoinLimitOrder []byte `prefix_id:"[99]" is_state:"true" core_state:"true"` @@ -5742,6 +5755,11 @@ func RunBlockIndexMigration(handle *badger.DB, snapshot *Snapshot, eventManager // places and uint32 in others. Specifically, we don't always validate that we have a uint32 when we go to get // the block from teh DB. return handle.Update(func(txn *badger.Txn) error { + // We're about to flush records to the main DB, so we initiate the snapshot update. + // This function prepares the data structures in the snapshot. + if snapshot != nil { + snapshot.PrepareAncestralRecordsFlush() + } // Get the prefix for the height hash to node index. prefix := _heightHashToNodeIndexPrefix(false) opts := badger.DefaultIteratorOptions @@ -5887,12 +5905,21 @@ func RunBlockIndexMigration(handle *badger.DB, snapshot *Snapshot, eventManager return errors.Wrap(err, "RunBlockIndexMigration: Problem putting block node batch") } } + // We can exit early if we're not using a snapshot. + if snapshot == nil { + return nil + } + // Flush the ancestral records to the DB. + if err = snapshot.FlushAncestralRecordsWithTxn(txn); err != nil { + return err + } return nil }) } // RunDAOCoinLimitOrderMigration runs a migration to update the key used in the DAOCoinLimitOrderPrefix to -// avoid reverse iteration. +// avoid reverse iteration. This migration iterates over the old prefix and then inserts into the new prefix +// and deletes from the old prefix. func RunDAOCoinLimitOrderMigration( handle *badger.DB, snapshot *Snapshot, @@ -5908,19 +5935,22 @@ func RunDAOCoinLimitOrderMigration( } } return handle.Update(func(txn *badger.Txn) error { - - // Get the prefix for the height hash to node index. + // We're about to flush records to the main DB, so we initiate the snapshot update. + // This function prepares the data structures in the snapshot. + if snapshot != nil { + snapshot.PrepareAncestralRecordsFlush() + } + // Get the old prefix for the dao coin limit order index. prefix := append([]byte{}, Prefixes.PrefixDAOCoinLimitOrder...) opts := badger.DefaultIteratorOptions opts.Prefix = prefix - // We don't need values for this migration since the height and hash are in the key. - opts.PrefetchValues = false nodeIterator := txn.NewIterator(opts) defer nodeIterator.Close() // Initialize a map to store the dao coin limit orders. ordersMap := make(map[BlockHash]*DAOCoinLimitOrderEntry) - // Iterate over all the keys in the height hash to node index, extract the height and hash, - // and batch write every 10k entries to the hash to height index. + // Iterate over all the keys in the dao coin limit order index + // and store them in the map. Every 10k entries, we will insert + // into the new prefix and delete from the old prefix. startTime := time.Now() ctr := 0 for nodeIterator.Seek(prefix); nodeIterator.ValidForPrefix(prefix); nodeIterator.Next() { @@ -5945,24 +5975,33 @@ func RunDAOCoinLimitOrderMigration( continue } ctr++ - if ctr%10 == 0 { + if ctr%5 == 0 { glog.V(0).Infof("Time to run DAOCoinLimitOrderMigration for %v orders: %v", ctr*10000, time.Since(startTime)) } - // If we have more than 10K entries, batch write the entries to the hash to height index - // and reset the map. + // If we have more than 10K entries, batch migrate the entries to new dao coin limit order index + // and delete the old entries from the old prefix and reset the map. innerErr := MigrateDAOCoinLimitOrderBatch(handle, snapshot, blockHeight, ordersMap, eventManager) if innerErr != nil { return errors.Wrap(innerErr, "RunDAOCoinLimitOrderMigration: Problem migrating DAO coin limit order batch") } ordersMap = make(map[BlockHash]*DAOCoinLimitOrderEntry) } - // If we have any entries left in the map, batch write them to the hash to height index. + // If we have any entries left in the map, batch migrate the entries to new dao coin limit order index + // and delete the old entries from the old prefix if len(ordersMap) > 0 { innerErr := MigrateDAOCoinLimitOrderBatch(handle, snapshot, blockHeight, ordersMap, eventManager) if innerErr != nil { return errors.Wrap(innerErr, "RunDAOCoinLimitOrderMigration: Problem migrating DAO coin limit order batch") } } + // We can exit early if we're not using a snapshot. + if snapshot == nil { + return nil + } + // Flush the ancestral records to the DB. + if innerErr := snapshot.FlushAncestralRecordsWithTxn(txn); innerErr != nil { + return innerErr + } return nil }) } @@ -10187,13 +10226,6 @@ func DBPrefixKeyForDAOCoinLimitOrder(order *DAOCoinLimitOrderEntry) []byte { return key } -func DBPrefixKeyForOldDAOCoinLimitOrder(order *DAOCoinLimitOrderEntry) []byte { - key := append([]byte{}, Prefixes.PrefixDAOCoinLimitOrder...) - key = append(key, order.BuyingDAOCoinCreatorPKID.ToBytes()...) - key = append(key, order.SellingDAOCoinCreatorPKID.ToBytes()...) - return key -} - func DBKeyForDAOCoinLimitOrderByTransactorPKID(order *DAOCoinLimitOrderEntry) []byte { key := append([]byte{}, Prefixes.PrefixDAOCoinLimitOrderByTransactorPKID...) key = append(key, order.TransactorPKID.ToBytes()...) @@ -10258,9 +10290,9 @@ func DBGetMatchingDAOCoinLimitOrders( // Convert the input BID order to the ASK order to query for. // Note that we seek in reverse for the best matching orders. // * Swap BuyingDAOCoinCreatorPKID and SellingDAOCoinCreatorPKID. - // * Set ScaledExchangeRateCoinsToSellPerCoinToBuy to MaxUint256. - // * Set BlockHeight to 0 as this becomes math.MaxUint32 in the key. - // * Set OrderID to MaxBlockHash. + // * Set ScaledExchangeRateCoinsToSellPerCoinToBuy to MaxUint256. This will be 0 in the key. + // * Set BlockHeight to 0. + // * Set OrderID to MaxBlockHash. This will be the zero block hash in the key. queryOrder.BuyingDAOCoinCreatorPKID = inputOrder.SellingDAOCoinCreatorPKID queryOrder.SellingDAOCoinCreatorPKID = inputOrder.BuyingDAOCoinCreatorPKID queryOrder.ScaledExchangeRateCoinsToSellPerCoinToBuy = MaxUint256.Clone() @@ -10277,7 +10309,7 @@ func DBGetMatchingDAOCoinLimitOrders( key = startKey } - // Go in reverse order to find the highest prices first. + // Go in order to find the highest prices first, since the prices are inverted in this index. // We break once we hit the input order's inverted scaled // price or the input order's quantity is fulfilled. opts := badger.DefaultIteratorOptions @@ -10474,11 +10506,12 @@ func DBUpsertDAOCoinLimitOrderWithTxn( return nil } +// MigrateDAOCoinLimitOrderBatch migrates a batch of DAOCoinLimitOrderEntries to the new prefix +// and deletes from the old prefix. func MigrateDAOCoinLimitOrderBatch(handle *badger.DB, snapshot *Snapshot, blockHeight uint64, orders map[BlockHash]*DAOCoinLimitOrderEntry, eventManager *EventManager) error { return handle.Update(func(txn *badger.Txn) error { for _, order := range orders { orderBytes := EncodeToBytes(blockHeight, order) - // Store in index: PrefixDAOCoinLimitOrderByTransactorPKID key := DBKeyForDAOCoinLimitOrder(order) if err := DBSetWithTxn(txn, snapshot, key, orderBytes, eventManager); err != nil { @@ -10499,7 +10532,7 @@ func DBDeleteDAOCoinLimitOrderWithTxn(txn *badger.Txn, snap *Snapshot, order *DA return nil } - // Delete from index: PrefixDAOCoinLimitOrder + // Delete from index: PrefixNewDAOCoinLimitOrder key := DBKeyForDAOCoinLimitOrder(order) if err := DBDeleteWithTxn(txn, snap, key, eventManager, entryIsDeleted); err != nil { return errors.Wrapf(err, "DBDeleteDAOCoinLimitOrderWithTxn: problem deleting limit order") diff --git a/lib/server.go b/lib/server.go index 3115297c7..391cdddbd 100644 --- a/lib/server.go +++ b/lib/server.go @@ -361,19 +361,35 @@ func ValidateHyperSyncFlags(isHypersync bool, syncType NodeSyncType) { } } -func RunDBMigrationsOnce(db *badger.DB, snapshot *Snapshot, params *DeSoParams) error { - if err := RunBlockIndexMigrationOnce(db, nil, params); err != nil { - return errors.Wrapf(err, "RunDBMigrationsOnce: Problem running block index migration") +// RunDBMigrationsOnce runs all the database migrations that need to be run +// once. It checks for the existence of files that indicate whether the migrations +// have already been run, and if not, runs them and saves the files to indicate +// that they have been run. This is useful for ensuring that the database schema +// is up to date with the current version of the code. This is only used for +// migrations that DO NOT result in a fork. +func RunDBMigrationsOnce(db *badger.DB, snapshot *Snapshot, eventManager *EventManager, params *DeSoParams) error { + + // List of migrations: + migrations := []func(*badger.DB, *Snapshot, *EventManager, *DeSoParams) error{ + RunBlockIndexMigrationOnce, + RunDAOCoinLimitOrderMigrationOnce, } - if err := RunDAOCoinLimitOrderMigrationOnce(db, nil); err != nil { - return errors.Wrapf(err, "RunDBMigrationsOnce: Problem running DAOCoin limit order migration") + for ii, migration := range migrations { + if err := migration(db, snapshot, eventManager, params); err != nil { + return errors.Wrapf(err, "RunDBMigrationsOnce: Problem running migration %v", ii) + } } return nil } // RunBlockIndexMigrationOnce runs the block index migration once and saves a file to // indicate that it has been run. -func RunBlockIndexMigrationOnce(db *badger.DB, snapshot *Snapshot, params *DeSoParams) error { +func RunBlockIndexMigrationOnce( + db *badger.DB, + snapshot *Snapshot, + eventManager *EventManager, + params *DeSoParams, +) error { blockIndexMigrationFileName := filepath.Join(db.Opts().Dir, BlockIndexMigrationFileName) glog.V(2).Info("FileName: ", blockIndexMigrationFileName) hasRunMigration, err := ReadBoolFromFile(blockIndexMigrationFileName) @@ -382,7 +398,7 @@ func RunBlockIndexMigrationOnce(db *badger.DB, snapshot *Snapshot, params *DeSoP return nil } glog.V(0).Info("Running block index migration") - if err = RunBlockIndexMigration(db, snapshot, nil, params); err != nil { + if err = RunBlockIndexMigration(db, snapshot, eventManager, params); err != nil { return errors.Wrapf(err, "Problem running block index migration") } if err = SaveBoolToFile(blockIndexMigrationFileName, true); err != nil { @@ -392,7 +408,14 @@ func RunBlockIndexMigrationOnce(db *badger.DB, snapshot *Snapshot, params *DeSoP return nil } -func RunDAOCoinLimitOrderMigrationOnce(db *badger.DB, snapshot *Snapshot) error { +// RunDAOCoinLimitOrderMigrationOnce runs the DAOCoin limit order migration once and saves a file to +// indicate that it has been run. +func RunDAOCoinLimitOrderMigrationOnce( + db *badger.DB, + snapshot *Snapshot, + eventManager *EventManager, + _ *DeSoParams, +) error { limitOrderMigrationFileName := filepath.Join(db.Opts().Dir, DAOCoinLimitOrderMigrationFileName) glog.V(2).Info("FileName: ", limitOrderMigrationFileName) hasRunMigration, err := ReadBoolFromFile(limitOrderMigrationFileName) @@ -401,7 +424,7 @@ func RunDAOCoinLimitOrderMigrationOnce(db *badger.DB, snapshot *Snapshot) error return nil } glog.V(0).Info("Running dao coin limit order index migration") - if err = RunDAOCoinLimitOrderMigration(db, snapshot, nil); err != nil { + if err = RunDAOCoinLimitOrderMigration(db, snapshot, eventManager); err != nil { return errors.Wrapf(err, "Problem running dao coin limit order index migration") } if err = SaveBoolToFile(limitOrderMigrationFileName, true); err != nil {