Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/block_view_dao_coin_limit_order_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1152,7 +1152,7 @@ func TestDAOCoinLimitOrder(t *testing.T) {
orderEntries, err = dbAdapter.GetAllDAOCoinLimitOrders()
require.NoError(err)
require.Equal(len(orderEntries), 2)
require.True(orderEntries[1].Eq(metadataM1.ToEntry(m1PKID.PKID, savedHeight, toPKID)))
require.True(orderEntries[0].Eq(metadataM1.ToEntry(m1PKID.PKID, savedHeight, toPKID)))

// m1 submits order buying 5 $DESO nanos @ 12 DAO coin / $DESO.
_doDAOCoinLimitOrderTxnWithTestMeta(testMeta, feeRateNanosPerKb, m1Pub, m1Priv, metadataM1)
Expand Down
2 changes: 1 addition & 1 deletion lib/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1135,7 +1135,7 @@ func NewBlockchain(
checkpointSyncingProviders []string,
blockIndexSize int,
) (*Blockchain, error) {
if err := RunBlockIndexMigrationOnce(db, params); err != nil {
if err := RunDBMigrationsOnce(db, snapshot, eventManager, params); err != nil {
return nil, errors.Wrapf(err, "NewBlockchain: Problem running block index migration")
}

Expand Down
5 changes: 5 additions & 0 deletions lib/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1960,6 +1960,11 @@ const RoutePathGetCommittedTipBlockInfo = "/api/v0/get-committed-tip-block-info"
// for more information.
const BlockIndexMigrationFileName = "block_index_migration.txt"

// DAOCoinLimitOrderMigrationFileName is the name of the file that contains a boolean value
// that indicates whether the DAOCoin limit order migration has been run. See RunDAOCoinLimitOrderMigrationOnce
// for more information.
const DAOCoinLimitOrderMigrationFileName = "dao_coin_limit_order_migration.txt"

// BtcecPubKeyBytesLenUncompressed is a constant that was removed from newer version of Btcec
const BtcecPubKeyBytesLenUncompressed = 65

Expand Down
183 changes: 171 additions & 12 deletions lib/db_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ type DBPrefixes struct {
PrefixAuthorizeDerivedKey []byte `prefix_id:"[59]" is_state:"true" core_state:"true"`

// Prefixes for DAO coin limit orders
// UPDATE: see new prefix below.
// This index powers the order book.
// <
// _PrefixDAOCoinLimitOrder
Expand All @@ -311,7 +312,7 @@ type DBPrefixes struct {
// ScaledExchangeRateCoinsToSellPerCoinToBuy [32]byte
// BlockHeight [32]byte
// OrderID [32]byte
// > -> <DAOCoinLimitOrderEntry>
// > -> <DAOCoinLimitOrderEntry> // DEPRECATED
//
// This index allows users to query for their open orders.
// <
Expand All @@ -328,9 +329,22 @@ type DBPrefixes struct {
// _PrefixDAOCoinLimitOrderByOrderID
// OrderID [32]byte
// > -> <DAOCoinLimitOrderEntry>
PrefixDAOCoinLimitOrder []byte `prefix_id:"[60]" is_state:"true" core_state:"true"`
//
// This index powers the order book. It allows us to
// iterate w/o reverse which speeds up processing of
// dao coin limit orders.
// <
// _PrefixNewDAOCoinLimitOrder
// BuyingDAOCoinCreatorPKID [33]byte
// SellingDAOCoinCreatorPKID [33]byte
// MaxUint256-ScaledExchangeRateCoinsToSellPerCoinToBuy [32]byte
// BlockHeight [32]byte
// MaxBlockHash-OrderID [32]byte
// > -> <DAOCoinLimitOrderEntry> // DEPRECATED
PrefixDAOCoinLimitOrder []byte `prefix_id:"[60]" is_state:"true"` // Deprecated
PrefixDAOCoinLimitOrderByTransactorPKID []byte `prefix_id:"[61]" is_state:"true"`
PrefixDAOCoinLimitOrderByOrderID []byte `prefix_id:"[62]" is_state:"true"`
PrefixNewDAOCoinLimitOrder []byte `prefix_id:"[99]" is_state:"true" core_state:"true"`

// User Association prefixes
// PrefixUserAssociationByID:
Expand Down Expand Up @@ -615,7 +629,7 @@ type DBPrefixes struct {
// to find a block node given its hash was to do a full scan of
// PrefixHeightHashToNodeInfo.
PrefixHashToHeight []byte `prefix_id:"[98]"`
// NEXT_TAG: 99
// NEXT_TAG: 100
}

// DecodeStateKey decodes a state key into a DeSoEncoder type. This is useful for encoders which don't have a stored
Expand Down Expand Up @@ -915,6 +929,9 @@ func StatePrefixToDeSoEncoder(prefix []byte) (_isEncoder bool, _encoder DeSoEnco
} else if bytes.Equal(prefix, Prefixes.PrefixSnapshotValidatorBLSPublicKeyPKIDPairEntry) {
// prefix_id:"[96]"
return true, &BLSPublicKeyPKIDPairEntry{}
} else if bytes.Equal(prefix, Prefixes.PrefixNewDAOCoinLimitOrder) {
// prefix_id:"[99]"
return true, &DAOCoinLimitOrderEntry{}
}

return true, nil
Expand Down Expand Up @@ -5738,6 +5755,11 @@ func RunBlockIndexMigration(handle *badger.DB, snapshot *Snapshot, eventManager
// places and uint32 in others. Specifically, we don't always validate that we have a uint32 when we go to get
// the block from teh DB.
return handle.Update(func(txn *badger.Txn) error {
// We're about to flush records to the main DB, so we initiate the snapshot update.
// This function prepares the data structures in the snapshot.
if snapshot != nil {
snapshot.PrepareAncestralRecordsFlush()
}
// Get the prefix for the height hash to node index.
prefix := _heightHashToNodeIndexPrefix(false)
opts := badger.DefaultIteratorOptions
Expand Down Expand Up @@ -5883,6 +5905,103 @@ func RunBlockIndexMigration(handle *badger.DB, snapshot *Snapshot, eventManager
return errors.Wrap(err, "RunBlockIndexMigration: Problem putting block node batch")
}
}
// We can exit early if we're not using a snapshot.
if snapshot == nil {
return nil
}
// Flush the ancestral records to the DB.
if err = snapshot.FlushAncestralRecordsWithTxn(txn); err != nil {
return err
}
return nil
})
}

// RunDAOCoinLimitOrderMigration runs a migration to update the key used in the DAOCoinLimitOrderPrefix to
// avoid reverse iteration. This migration iterates over the old prefix and then inserts into the new prefix
// and deletes from the old prefix.
func RunDAOCoinLimitOrderMigration(
handle *badger.DB,
snapshot *Snapshot,
eventManager *EventManager,
) error {
bestHash := DbGetBestHash(handle, snapshot, ChainTypeDeSoBlock)
blockHeight := uint64(0)
if bestHash != nil {
var err error
blockHeight, err = GetHeightForHash(handle, snapshot, bestHash)
if err != nil {
return errors.Wrapf(err, "RunDAOCoinLimitOrderMigration: Problem getting height for best hash %v", bestHash)
}
}
return handle.Update(func(txn *badger.Txn) error {
// We're about to flush records to the main DB, so we initiate the snapshot update.
// This function prepares the data structures in the snapshot.
if snapshot != nil {
snapshot.PrepareAncestralRecordsFlush()
}
// Get the old prefix for the dao coin limit order index.
prefix := append([]byte{}, Prefixes.PrefixDAOCoinLimitOrder...)
opts := badger.DefaultIteratorOptions
opts.Prefix = prefix
nodeIterator := txn.NewIterator(opts)
defer nodeIterator.Close()
// Initialize a map to store the dao coin limit orders.
ordersMap := make(map[BlockHash]*DAOCoinLimitOrderEntry)
// Iterate over all the keys in the dao coin limit order index
// and store them in the map. Every 10k entries, we will insert
// into the new prefix and delete from the old prefix.
startTime := time.Now()
ctr := 0
for nodeIterator.Seek(prefix); nodeIterator.ValidForPrefix(prefix); nodeIterator.Next() {
item := nodeIterator.Item().Key()
value, err := nodeIterator.Item().ValueCopy(nil)
if err != nil {
return errors.Wrapf(err, "RunDAOCoinLimitOrderMigration: Problem getting value for key %v", item)
}

// Parse the value to get the DAOCoinLimitOrderEntry.
order := &DAOCoinLimitOrderEntry{}
rr := bytes.NewReader(value)
if exist, err := DecodeFromBytes(order, rr); !exist || err != nil {
return errors.Wrapf(err, "RunDAOCoinLimitOrderMigration: decoding order from bytes for key %v", item)
}

// Add to the map
ordersMap[*order.OrderID] = order

// If we have fewer than 10K entries, continue.
if len(ordersMap) < 10000 {
continue
}
ctr++
if ctr%5 == 0 {
glog.V(0).Infof("Time to run DAOCoinLimitOrderMigration for %v orders: %v", ctr*10000, time.Since(startTime))
}
// If we have more than 10K entries, batch migrate the entries to new dao coin limit order index
// and delete the old entries from the old prefix and reset the map.
innerErr := MigrateDAOCoinLimitOrderBatch(handle, snapshot, blockHeight, ordersMap, eventManager)
if innerErr != nil {
return errors.Wrap(innerErr, "RunDAOCoinLimitOrderMigration: Problem migrating DAO coin limit order batch")
}
ordersMap = make(map[BlockHash]*DAOCoinLimitOrderEntry)
}
// If we have any entries left in the map, batch migrate the entries to new dao coin limit order index
// and delete the old entries from the old prefix
if len(ordersMap) > 0 {
innerErr := MigrateDAOCoinLimitOrderBatch(handle, snapshot, blockHeight, ordersMap, eventManager)
if innerErr != nil {
return errors.Wrap(innerErr, "RunDAOCoinLimitOrderMigration: Problem migrating DAO coin limit order batch")
}
}
// We can exit early if we're not using a snapshot.
if snapshot == nil {
return nil
}
// Flush the ancestral records to the DB.
if innerErr := snapshot.FlushAncestralRecordsWithTxn(txn); innerErr != nil {
return innerErr
}
return nil
})
}
Expand Down Expand Up @@ -10072,6 +10191,25 @@ func DBGetPaginatedProfilesByDeSoLocked(
// ---------------------------------------------

func DBKeyForDAOCoinLimitOrder(order *DAOCoinLimitOrderEntry) []byte {
key := DBPrefixKeyForDAOCoinLimitOrder(order)
// Store MaxUint256 - ScaledExchangeRateCoinsToSellPerCoinToBuy so we don't have to iterate in reverse.
key = append(key, FixedWidthEncodeUint256(uint256.NewInt(0).Sub(MaxUint256, order.ScaledExchangeRateCoinsToSellPerCoinToBuy))...)
key = append(key, _EncodeUint32(order.BlockHeight)...)
// Store MaxBlockHash - OrderID so we don't have to iterate in reverse.
key = append(key, invertByteSlice(order.OrderID.ToBytes())...)
return key
}

// Invert a byte slice by subtracting each byte from 0xff.
func invertByteSlice(a []byte) []byte {
result := make([]byte, len(a))
for i := range a {
result[i] = 0xff - a[i]
}
return result
}

func OLDDBKeyForDAOCoinLimitOrder(order *DAOCoinLimitOrderEntry) []byte {
key := DBPrefixKeyForDAOCoinLimitOrder(order)
key = append(key, VariableEncodeUint256(order.ScaledExchangeRateCoinsToSellPerCoinToBuy)...)
// Store MaxUint32 - block height to guarantee FIFO
Expand All @@ -10082,7 +10220,7 @@ func DBKeyForDAOCoinLimitOrder(order *DAOCoinLimitOrderEntry) []byte {
}

func DBPrefixKeyForDAOCoinLimitOrder(order *DAOCoinLimitOrderEntry) []byte {
key := append([]byte{}, Prefixes.PrefixDAOCoinLimitOrder...)
key := append([]byte{}, Prefixes.PrefixNewDAOCoinLimitOrder...)
key = append(key, order.BuyingDAOCoinCreatorPKID.ToBytes()...)
key = append(key, order.SellingDAOCoinCreatorPKID.ToBytes()...)
return key
Expand Down Expand Up @@ -10152,9 +10290,9 @@ func DBGetMatchingDAOCoinLimitOrders(
// Convert the input BID order to the ASK order to query for.
// Note that we seek in reverse for the best matching orders.
// * Swap BuyingDAOCoinCreatorPKID and SellingDAOCoinCreatorPKID.
// * Set ScaledExchangeRateCoinsToSellPerCoinToBuy to MaxUint256.
// * Set BlockHeight to 0 as this becomes math.MaxUint32 in the key.
// * Set OrderID to MaxBlockHash.
// * Set ScaledExchangeRateCoinsToSellPerCoinToBuy to MaxUint256. This will be 0 in the key.
// * Set BlockHeight to 0.
// * Set OrderID to MaxBlockHash. This will be the zero block hash in the key.
queryOrder.BuyingDAOCoinCreatorPKID = inputOrder.SellingDAOCoinCreatorPKID
queryOrder.SellingDAOCoinCreatorPKID = inputOrder.BuyingDAOCoinCreatorPKID
queryOrder.ScaledExchangeRateCoinsToSellPerCoinToBuy = MaxUint256.Clone()
Expand All @@ -10171,11 +10309,11 @@ func DBGetMatchingDAOCoinLimitOrders(
key = startKey
}

// Go in reverse order to find the highest prices first.
// Go in order to find the highest prices first, since the prices are inverted in this index.
// We break once we hit the input order's inverted scaled
// price or the input order's quantity is fulfilled.
opts := badger.DefaultIteratorOptions
opts.Reverse = true
//opts.Reverse = true
opts.Prefix = prefixKey
opts.PrefetchValues = false
iterator := txn.NewIterator(opts)
Expand Down Expand Up @@ -10230,7 +10368,7 @@ func DBGetMatchingDAOCoinLimitOrders(

func DBGetAllDAOCoinLimitOrders(handle *badger.DB) ([]*DAOCoinLimitOrderEntry, error) {
// Get all DAO Coin limit orders.
key := append([]byte{}, Prefixes.PrefixDAOCoinLimitOrder...)
key := append([]byte{}, Prefixes.PrefixNewDAOCoinLimitOrder...)
return _DBGetAllDAOCoinLimitOrdersByPrefix(handle, key)
}

Expand All @@ -10240,7 +10378,7 @@ func DBGetAllDAOCoinLimitOrdersForThisDAOCoinPair(
sellingDAOCoinCreatorPKID *PKID) ([]*DAOCoinLimitOrderEntry, error) {

// Get all DAO coin limit orders for this DAO coin pair.
key := append([]byte{}, Prefixes.PrefixDAOCoinLimitOrder...)
key := append([]byte{}, Prefixes.PrefixNewDAOCoinLimitOrder...)
key = append(key, buyingDAOCoinCreatorPKID.ToBytes()...)
key = append(key, sellingDAOCoinCreatorPKID.ToBytes()...)
return _DBGetAllDAOCoinLimitOrdersByPrefix(handle, key)
Expand Down Expand Up @@ -10368,12 +10506,33 @@ func DBUpsertDAOCoinLimitOrderWithTxn(
return nil
}

// MigrateDAOCoinLimitOrderBatch migrates a batch of DAOCoinLimitOrderEntries to the new prefix
// and deletes from the old prefix.
func MigrateDAOCoinLimitOrderBatch(handle *badger.DB, snapshot *Snapshot, blockHeight uint64, orders map[BlockHash]*DAOCoinLimitOrderEntry, eventManager *EventManager) error {
return handle.Update(func(txn *badger.Txn) error {
for _, order := range orders {
orderBytes := EncodeToBytes(blockHeight, order)
key := DBKeyForDAOCoinLimitOrder(order)

if err := DBSetWithTxn(txn, snapshot, key, orderBytes, eventManager); err != nil {
return errors.Wrapf(err, "PutDAOCoinLimitOrderBatch: problem storing limit order")
}

oldKey := OLDDBKeyForDAOCoinLimitOrder(order)
if err := DBDeleteWithTxn(txn, snapshot, oldKey, eventManager, false); err != nil {
return errors.Wrapf(err, "PutDAOCoinLimitOrderBatch: problem deleting old limit order")
}
}
return nil
})
}

func DBDeleteDAOCoinLimitOrderWithTxn(txn *badger.Txn, snap *Snapshot, order *DAOCoinLimitOrderEntry, eventManager *EventManager, entryIsDeleted bool) error {
if order == nil {
return nil
}

// Delete from index: PrefixDAOCoinLimitOrder
// Delete from index: PrefixNewDAOCoinLimitOrder
key := DBKeyForDAOCoinLimitOrder(order)
if err := DBDeleteWithTxn(txn, snap, key, eventManager, entryIsDeleted); err != nil {
return errors.Wrapf(err, "DBDeleteDAOCoinLimitOrderWithTxn: problem deleting limit order")
Expand Down
Loading