diff --git a/lib/block_view_dao_coin_limit_order_test.go b/lib/block_view_dao_coin_limit_order_test.go index 07d214725..cd87c4d38 100644 --- a/lib/block_view_dao_coin_limit_order_test.go +++ b/lib/block_view_dao_coin_limit_order_test.go @@ -1152,7 +1152,7 @@ func TestDAOCoinLimitOrder(t *testing.T) { orderEntries, err = dbAdapter.GetAllDAOCoinLimitOrders() require.NoError(err) require.Equal(len(orderEntries), 2) - require.True(orderEntries[1].Eq(metadataM1.ToEntry(m1PKID.PKID, savedHeight, toPKID))) + require.True(orderEntries[0].Eq(metadataM1.ToEntry(m1PKID.PKID, savedHeight, toPKID))) // m1 submits order buying 5 $DESO nanos @ 12 DAO coin / $DESO. _doDAOCoinLimitOrderTxnWithTestMeta(testMeta, feeRateNanosPerKb, m1Pub, m1Priv, metadataM1) diff --git a/lib/blockchain.go b/lib/blockchain.go index 51b832550..1ed21f172 100644 --- a/lib/blockchain.go +++ b/lib/blockchain.go @@ -1135,7 +1135,7 @@ func NewBlockchain( checkpointSyncingProviders []string, blockIndexSize int, ) (*Blockchain, error) { - if err := RunBlockIndexMigrationOnce(db, params); err != nil { + if err := RunDBMigrationsOnce(db, snapshot, eventManager, params); err != nil { return nil, errors.Wrapf(err, "NewBlockchain: Problem running block index migration") } diff --git a/lib/constants.go b/lib/constants.go index 2d8828eeb..1cf0aa473 100644 --- a/lib/constants.go +++ b/lib/constants.go @@ -1960,6 +1960,11 @@ const RoutePathGetCommittedTipBlockInfo = "/api/v0/get-committed-tip-block-info" // for more information. const BlockIndexMigrationFileName = "block_index_migration.txt" +// DAOCoinLimitOrderMigrationFileName is the name of the file that contains a boolean value +// that indicates whether the DAOCoin limit order migration has been run. See RunDAOCoinLimitOrderMigrationOnce +// for more information. +const DAOCoinLimitOrderMigrationFileName = "dao_coin_limit_order_migration.txt" + // BtcecPubKeyBytesLenUncompressed is a constant that was removed from newer version of Btcec const BtcecPubKeyBytesLenUncompressed = 65 diff --git a/lib/db_utils.go b/lib/db_utils.go index 5ca9ef24d..e5400c6e2 100644 --- a/lib/db_utils.go +++ b/lib/db_utils.go @@ -303,6 +303,7 @@ type DBPrefixes struct { PrefixAuthorizeDerivedKey []byte `prefix_id:"[59]" is_state:"true" core_state:"true"` // Prefixes for DAO coin limit orders + // UPDATE: see new prefix below. // This index powers the order book. // < // _PrefixDAOCoinLimitOrder @@ -311,7 +312,7 @@ type DBPrefixes struct { // ScaledExchangeRateCoinsToSellPerCoinToBuy [32]byte // BlockHeight [32]byte // OrderID [32]byte - // > -> + // > -> // DEPRECATED // // This index allows users to query for their open orders. // < @@ -328,9 +329,22 @@ type DBPrefixes struct { // _PrefixDAOCoinLimitOrderByOrderID // OrderID [32]byte // > -> - PrefixDAOCoinLimitOrder []byte `prefix_id:"[60]" is_state:"true" core_state:"true"` + // + // This index powers the order book. It allows us to + // iterate w/o reverse which speeds up processing of + // dao coin limit orders. + // < + // _PrefixNewDAOCoinLimitOrder + // BuyingDAOCoinCreatorPKID [33]byte + // SellingDAOCoinCreatorPKID [33]byte + // MaxUint256-ScaledExchangeRateCoinsToSellPerCoinToBuy [32]byte + // BlockHeight [32]byte + // MaxBlockHash-OrderID [32]byte + // > -> // DEPRECATED + PrefixDAOCoinLimitOrder []byte `prefix_id:"[60]" is_state:"true"` // Deprecated PrefixDAOCoinLimitOrderByTransactorPKID []byte `prefix_id:"[61]" is_state:"true"` PrefixDAOCoinLimitOrderByOrderID []byte `prefix_id:"[62]" is_state:"true"` + PrefixNewDAOCoinLimitOrder []byte `prefix_id:"[99]" is_state:"true" core_state:"true"` // User Association prefixes // PrefixUserAssociationByID: @@ -615,7 +629,7 @@ type DBPrefixes struct { // to find a block node given its hash was to do a full scan of // PrefixHeightHashToNodeInfo. PrefixHashToHeight []byte `prefix_id:"[98]"` - // NEXT_TAG: 99 + // NEXT_TAG: 100 } // DecodeStateKey decodes a state key into a DeSoEncoder type. This is useful for encoders which don't have a stored @@ -915,6 +929,9 @@ func StatePrefixToDeSoEncoder(prefix []byte) (_isEncoder bool, _encoder DeSoEnco } else if bytes.Equal(prefix, Prefixes.PrefixSnapshotValidatorBLSPublicKeyPKIDPairEntry) { // prefix_id:"[96]" return true, &BLSPublicKeyPKIDPairEntry{} + } else if bytes.Equal(prefix, Prefixes.PrefixNewDAOCoinLimitOrder) { + // prefix_id:"[99]" + return true, &DAOCoinLimitOrderEntry{} } return true, nil @@ -5738,6 +5755,11 @@ func RunBlockIndexMigration(handle *badger.DB, snapshot *Snapshot, eventManager // places and uint32 in others. Specifically, we don't always validate that we have a uint32 when we go to get // the block from teh DB. return handle.Update(func(txn *badger.Txn) error { + // We're about to flush records to the main DB, so we initiate the snapshot update. + // This function prepares the data structures in the snapshot. + if snapshot != nil { + snapshot.PrepareAncestralRecordsFlush() + } // Get the prefix for the height hash to node index. prefix := _heightHashToNodeIndexPrefix(false) opts := badger.DefaultIteratorOptions @@ -5883,6 +5905,103 @@ func RunBlockIndexMigration(handle *badger.DB, snapshot *Snapshot, eventManager return errors.Wrap(err, "RunBlockIndexMigration: Problem putting block node batch") } } + // We can exit early if we're not using a snapshot. + if snapshot == nil { + return nil + } + // Flush the ancestral records to the DB. + if err = snapshot.FlushAncestralRecordsWithTxn(txn); err != nil { + return err + } + return nil + }) +} + +// RunDAOCoinLimitOrderMigration runs a migration to update the key used in the DAOCoinLimitOrderPrefix to +// avoid reverse iteration. This migration iterates over the old prefix and then inserts into the new prefix +// and deletes from the old prefix. +func RunDAOCoinLimitOrderMigration( + handle *badger.DB, + snapshot *Snapshot, + eventManager *EventManager, +) error { + bestHash := DbGetBestHash(handle, snapshot, ChainTypeDeSoBlock) + blockHeight := uint64(0) + if bestHash != nil { + var err error + blockHeight, err = GetHeightForHash(handle, snapshot, bestHash) + if err != nil { + return errors.Wrapf(err, "RunDAOCoinLimitOrderMigration: Problem getting height for best hash %v", bestHash) + } + } + return handle.Update(func(txn *badger.Txn) error { + // We're about to flush records to the main DB, so we initiate the snapshot update. + // This function prepares the data structures in the snapshot. + if snapshot != nil { + snapshot.PrepareAncestralRecordsFlush() + } + // Get the old prefix for the dao coin limit order index. + prefix := append([]byte{}, Prefixes.PrefixDAOCoinLimitOrder...) + opts := badger.DefaultIteratorOptions + opts.Prefix = prefix + nodeIterator := txn.NewIterator(opts) + defer nodeIterator.Close() + // Initialize a map to store the dao coin limit orders. + ordersMap := make(map[BlockHash]*DAOCoinLimitOrderEntry) + // Iterate over all the keys in the dao coin limit order index + // and store them in the map. Every 10k entries, we will insert + // into the new prefix and delete from the old prefix. + startTime := time.Now() + ctr := 0 + for nodeIterator.Seek(prefix); nodeIterator.ValidForPrefix(prefix); nodeIterator.Next() { + item := nodeIterator.Item().Key() + value, err := nodeIterator.Item().ValueCopy(nil) + if err != nil { + return errors.Wrapf(err, "RunDAOCoinLimitOrderMigration: Problem getting value for key %v", item) + } + + // Parse the value to get the DAOCoinLimitOrderEntry. + order := &DAOCoinLimitOrderEntry{} + rr := bytes.NewReader(value) + if exist, err := DecodeFromBytes(order, rr); !exist || err != nil { + return errors.Wrapf(err, "RunDAOCoinLimitOrderMigration: decoding order from bytes for key %v", item) + } + + // Add to the map + ordersMap[*order.OrderID] = order + + // If we have fewer than 10K entries, continue. + if len(ordersMap) < 10000 { + continue + } + ctr++ + if ctr%5 == 0 { + glog.V(0).Infof("Time to run DAOCoinLimitOrderMigration for %v orders: %v", ctr*10000, time.Since(startTime)) + } + // If we have more than 10K entries, batch migrate the entries to new dao coin limit order index + // and delete the old entries from the old prefix and reset the map. + innerErr := MigrateDAOCoinLimitOrderBatch(handle, snapshot, blockHeight, ordersMap, eventManager) + if innerErr != nil { + return errors.Wrap(innerErr, "RunDAOCoinLimitOrderMigration: Problem migrating DAO coin limit order batch") + } + ordersMap = make(map[BlockHash]*DAOCoinLimitOrderEntry) + } + // If we have any entries left in the map, batch migrate the entries to new dao coin limit order index + // and delete the old entries from the old prefix + if len(ordersMap) > 0 { + innerErr := MigrateDAOCoinLimitOrderBatch(handle, snapshot, blockHeight, ordersMap, eventManager) + if innerErr != nil { + return errors.Wrap(innerErr, "RunDAOCoinLimitOrderMigration: Problem migrating DAO coin limit order batch") + } + } + // We can exit early if we're not using a snapshot. + if snapshot == nil { + return nil + } + // Flush the ancestral records to the DB. + if innerErr := snapshot.FlushAncestralRecordsWithTxn(txn); innerErr != nil { + return innerErr + } return nil }) } @@ -10072,6 +10191,25 @@ func DBGetPaginatedProfilesByDeSoLocked( // --------------------------------------------- func DBKeyForDAOCoinLimitOrder(order *DAOCoinLimitOrderEntry) []byte { + key := DBPrefixKeyForDAOCoinLimitOrder(order) + // Store MaxUint256 - ScaledExchangeRateCoinsToSellPerCoinToBuy so we don't have to iterate in reverse. + key = append(key, FixedWidthEncodeUint256(uint256.NewInt(0).Sub(MaxUint256, order.ScaledExchangeRateCoinsToSellPerCoinToBuy))...) + key = append(key, _EncodeUint32(order.BlockHeight)...) + // Store MaxBlockHash - OrderID so we don't have to iterate in reverse. + key = append(key, invertByteSlice(order.OrderID.ToBytes())...) + return key +} + +// Invert a byte slice by subtracting each byte from 0xff. +func invertByteSlice(a []byte) []byte { + result := make([]byte, len(a)) + for i := range a { + result[i] = 0xff - a[i] + } + return result +} + +func OLDDBKeyForDAOCoinLimitOrder(order *DAOCoinLimitOrderEntry) []byte { key := DBPrefixKeyForDAOCoinLimitOrder(order) key = append(key, VariableEncodeUint256(order.ScaledExchangeRateCoinsToSellPerCoinToBuy)...) // Store MaxUint32 - block height to guarantee FIFO @@ -10082,7 +10220,7 @@ func DBKeyForDAOCoinLimitOrder(order *DAOCoinLimitOrderEntry) []byte { } func DBPrefixKeyForDAOCoinLimitOrder(order *DAOCoinLimitOrderEntry) []byte { - key := append([]byte{}, Prefixes.PrefixDAOCoinLimitOrder...) + key := append([]byte{}, Prefixes.PrefixNewDAOCoinLimitOrder...) key = append(key, order.BuyingDAOCoinCreatorPKID.ToBytes()...) key = append(key, order.SellingDAOCoinCreatorPKID.ToBytes()...) return key @@ -10152,9 +10290,9 @@ func DBGetMatchingDAOCoinLimitOrders( // Convert the input BID order to the ASK order to query for. // Note that we seek in reverse for the best matching orders. // * Swap BuyingDAOCoinCreatorPKID and SellingDAOCoinCreatorPKID. - // * Set ScaledExchangeRateCoinsToSellPerCoinToBuy to MaxUint256. - // * Set BlockHeight to 0 as this becomes math.MaxUint32 in the key. - // * Set OrderID to MaxBlockHash. + // * Set ScaledExchangeRateCoinsToSellPerCoinToBuy to MaxUint256. This will be 0 in the key. + // * Set BlockHeight to 0. + // * Set OrderID to MaxBlockHash. This will be the zero block hash in the key. queryOrder.BuyingDAOCoinCreatorPKID = inputOrder.SellingDAOCoinCreatorPKID queryOrder.SellingDAOCoinCreatorPKID = inputOrder.BuyingDAOCoinCreatorPKID queryOrder.ScaledExchangeRateCoinsToSellPerCoinToBuy = MaxUint256.Clone() @@ -10171,11 +10309,11 @@ func DBGetMatchingDAOCoinLimitOrders( key = startKey } - // Go in reverse order to find the highest prices first. + // Go in order to find the highest prices first, since the prices are inverted in this index. // We break once we hit the input order's inverted scaled // price or the input order's quantity is fulfilled. opts := badger.DefaultIteratorOptions - opts.Reverse = true + //opts.Reverse = true opts.Prefix = prefixKey opts.PrefetchValues = false iterator := txn.NewIterator(opts) @@ -10230,7 +10368,7 @@ func DBGetMatchingDAOCoinLimitOrders( func DBGetAllDAOCoinLimitOrders(handle *badger.DB) ([]*DAOCoinLimitOrderEntry, error) { // Get all DAO Coin limit orders. - key := append([]byte{}, Prefixes.PrefixDAOCoinLimitOrder...) + key := append([]byte{}, Prefixes.PrefixNewDAOCoinLimitOrder...) return _DBGetAllDAOCoinLimitOrdersByPrefix(handle, key) } @@ -10240,7 +10378,7 @@ func DBGetAllDAOCoinLimitOrdersForThisDAOCoinPair( sellingDAOCoinCreatorPKID *PKID) ([]*DAOCoinLimitOrderEntry, error) { // Get all DAO coin limit orders for this DAO coin pair. - key := append([]byte{}, Prefixes.PrefixDAOCoinLimitOrder...) + key := append([]byte{}, Prefixes.PrefixNewDAOCoinLimitOrder...) key = append(key, buyingDAOCoinCreatorPKID.ToBytes()...) key = append(key, sellingDAOCoinCreatorPKID.ToBytes()...) return _DBGetAllDAOCoinLimitOrdersByPrefix(handle, key) @@ -10368,12 +10506,33 @@ func DBUpsertDAOCoinLimitOrderWithTxn( return nil } +// MigrateDAOCoinLimitOrderBatch migrates a batch of DAOCoinLimitOrderEntries to the new prefix +// and deletes from the old prefix. +func MigrateDAOCoinLimitOrderBatch(handle *badger.DB, snapshot *Snapshot, blockHeight uint64, orders map[BlockHash]*DAOCoinLimitOrderEntry, eventManager *EventManager) error { + return handle.Update(func(txn *badger.Txn) error { + for _, order := range orders { + orderBytes := EncodeToBytes(blockHeight, order) + key := DBKeyForDAOCoinLimitOrder(order) + + if err := DBSetWithTxn(txn, snapshot, key, orderBytes, eventManager); err != nil { + return errors.Wrapf(err, "PutDAOCoinLimitOrderBatch: problem storing limit order") + } + + oldKey := OLDDBKeyForDAOCoinLimitOrder(order) + if err := DBDeleteWithTxn(txn, snapshot, oldKey, eventManager, false); err != nil { + return errors.Wrapf(err, "PutDAOCoinLimitOrderBatch: problem deleting old limit order") + } + } + return nil + }) +} + func DBDeleteDAOCoinLimitOrderWithTxn(txn *badger.Txn, snap *Snapshot, order *DAOCoinLimitOrderEntry, eventManager *EventManager, entryIsDeleted bool) error { if order == nil { return nil } - // Delete from index: PrefixDAOCoinLimitOrder + // Delete from index: PrefixNewDAOCoinLimitOrder key := DBKeyForDAOCoinLimitOrder(order) if err := DBDeleteWithTxn(txn, snap, key, eventManager, entryIsDeleted); err != nil { return errors.Wrapf(err, "DBDeleteDAOCoinLimitOrderWithTxn: problem deleting limit order") diff --git a/lib/server.go b/lib/server.go index e6f1980f0..391cdddbd 100644 --- a/lib/server.go +++ b/lib/server.go @@ -361,9 +361,35 @@ func ValidateHyperSyncFlags(isHypersync bool, syncType NodeSyncType) { } } +// RunDBMigrationsOnce runs all the database migrations that need to be run +// once. It checks for the existence of files that indicate whether the migrations +// have already been run, and if not, runs them and saves the files to indicate +// that they have been run. This is useful for ensuring that the database schema +// is up to date with the current version of the code. This is only used for +// migrations that DO NOT result in a fork. +func RunDBMigrationsOnce(db *badger.DB, snapshot *Snapshot, eventManager *EventManager, params *DeSoParams) error { + + // List of migrations: + migrations := []func(*badger.DB, *Snapshot, *EventManager, *DeSoParams) error{ + RunBlockIndexMigrationOnce, + RunDAOCoinLimitOrderMigrationOnce, + } + for ii, migration := range migrations { + if err := migration(db, snapshot, eventManager, params); err != nil { + return errors.Wrapf(err, "RunDBMigrationsOnce: Problem running migration %v", ii) + } + } + return nil +} + // RunBlockIndexMigrationOnce runs the block index migration once and saves a file to // indicate that it has been run. -func RunBlockIndexMigrationOnce(db *badger.DB, params *DeSoParams) error { +func RunBlockIndexMigrationOnce( + db *badger.DB, + snapshot *Snapshot, + eventManager *EventManager, + params *DeSoParams, +) error { blockIndexMigrationFileName := filepath.Join(db.Opts().Dir, BlockIndexMigrationFileName) glog.V(2).Info("FileName: ", blockIndexMigrationFileName) hasRunMigration, err := ReadBoolFromFile(blockIndexMigrationFileName) @@ -372,7 +398,7 @@ func RunBlockIndexMigrationOnce(db *badger.DB, params *DeSoParams) error { return nil } glog.V(0).Info("Running block index migration") - if err = RunBlockIndexMigration(db, nil, nil, params); err != nil { + if err = RunBlockIndexMigration(db, snapshot, eventManager, params); err != nil { return errors.Wrapf(err, "Problem running block index migration") } if err = SaveBoolToFile(blockIndexMigrationFileName, true); err != nil { @@ -382,6 +408,32 @@ func RunBlockIndexMigrationOnce(db *badger.DB, params *DeSoParams) error { return nil } +// RunDAOCoinLimitOrderMigrationOnce runs the DAOCoin limit order migration once and saves a file to +// indicate that it has been run. +func RunDAOCoinLimitOrderMigrationOnce( + db *badger.DB, + snapshot *Snapshot, + eventManager *EventManager, + _ *DeSoParams, +) error { + limitOrderMigrationFileName := filepath.Join(db.Opts().Dir, DAOCoinLimitOrderMigrationFileName) + glog.V(2).Info("FileName: ", limitOrderMigrationFileName) + hasRunMigration, err := ReadBoolFromFile(limitOrderMigrationFileName) + if err == nil && hasRunMigration { + glog.V(2).Info("DAOCoinLimitOrder index migration has already been run") + return nil + } + glog.V(0).Info("Running dao coin limit order index migration") + if err = RunDAOCoinLimitOrderMigration(db, snapshot, eventManager); err != nil { + return errors.Wrapf(err, "Problem running dao coin limit order index migration") + } + if err = SaveBoolToFile(limitOrderMigrationFileName, true); err != nil { + return errors.Wrapf(err, "Problem saving dao coin limit order index migration file") + } + glog.V(2).Info("dao coin limit order index migration complete") + return nil +} + // NewServer initializes all of the internal data structures. Right now this basically // looks as follows: // - ConnectionManager starts and keeps track of peers. @@ -2565,6 +2617,10 @@ func (srv *Server) _handleBlock(pp *Peer, blk *MsgDeSoBlock, isLastBlock bool) { ))) blockHashesToRequest, err = srv.fastHotStuffConsensus.HandleBlock(pp, blk) isOrphan = len(blockHashesToRequest) > 0 + glog.V(0).Infof(CLog(Cyan, fmt.Sprintf( + "Server._handleBlock: Finished processing block %v with FastHotStuffConsensus with SyncState=%v for peer %v", + blk, srv.blockchain.chainState(), pp, + ))) } else if !verifySignatures { glog.V(0).Infof(CLog(Cyan, fmt.Sprintf( "Server._handleBlock: Processing block %v WITHOUT signature checking because SyncState=%v for peer %v",