diff --git a/src/xrpld/app/misc/TxQ.h b/src/xrpld/app/misc/TxQ.h index 4f02d6d6170..6a490791237 100644 --- a/src/xrpld/app/misc/TxQ.h +++ b/src/xrpld/app/misc/TxQ.h @@ -714,7 +714,8 @@ class TxQ std::optional removeFromByFee( std::optional const& replacedTxIter, - std::shared_ptr const& tx); + std::shared_ptr const& tx, + std::lock_guard const&); using FeeHook = boost::intrusive:: member_hook, &MaybeTx::byFeeListHook>; @@ -769,7 +770,7 @@ class TxQ /// Is the queue at least `fillPercentage` full? template bool - isFull() const; + isFull(std::lock_guard const&) const; /** Checks if the indicated transaction fits the conditions for being stored in the queue. @@ -782,21 +783,24 @@ class TxQ std::shared_ptr const& sleAccount, AccountMap::iterator const&, std::optional const&, - std::lock_guard const& lock); + std::lock_guard const&); /// Erase and return the next entry in byFee_ (lower fee level) - FeeMultiSet::iterator_type erase(FeeMultiSet::const_iterator_type); + FeeMultiSet::iterator_type + erase(FeeMultiSet::const_iterator_type, std::lock_guard const&); /** Erase and return the next entry for the account (if fee level is higher), or next entry in byFee_ (lower fee level). Used to get the next "applicable" MaybeTx for accept(). */ - FeeMultiSet::iterator_type eraseAndAdvance(FeeMultiSet::const_iterator_type); + FeeMultiSet::iterator_type + eraseAndAdvance(FeeMultiSet::const_iterator_type, std::lock_guard const&); /// Erase a range of items, based on TxQAccount::TxMap iterators TxQAccount::TxMap::iterator erase( TxQAccount& txQAccount, TxQAccount::TxMap::const_iterator begin, - TxQAccount::TxMap::const_iterator end); + TxQAccount::TxMap::const_iterator end, + std::lock_guard const&); /** All-or-nothing attempt to try to apply the queued txs for @@ -815,6 +819,7 @@ class TxQ std::size_t const txExtraCount, ApplyFlags flags, FeeMetrics::Snapshot const& metricsSnapshot, + std::lock_guard const&, beast::Journal j); }; diff --git a/src/xrpld/app/misc/detail/TxQ.cpp b/src/xrpld/app/misc/detail/TxQ.cpp index 718b468ff04..3e48e7b40f7 100644 --- a/src/xrpld/app/misc/detail/TxQ.cpp +++ b/src/xrpld/app/misc/detail/TxQ.cpp @@ -336,7 +336,7 @@ TxQ::~TxQ() template bool -TxQ::isFull() const +TxQ::isFull(std::lock_guard const&) const { static_assert(fillPercentage > 0 && fillPercentage <= 100, "Invalid fill percentage"); return maxSize_ && byFee_.size() >= (*maxSize_ * fillPercentage / 100); @@ -408,7 +408,8 @@ TxQ::canBeHeld( } auto -TxQ::erase(TxQ::FeeMultiSet::const_iterator_type candidateIter) -> FeeMultiSet::iterator_type +TxQ::erase(TxQ::FeeMultiSet::const_iterator_type candidateIter, std::lock_guard const&) + -> FeeMultiSet::iterator_type { auto& txQAccount = byAccount_.at(candidateIter->account); auto const seqProx = candidateIter->seqProxy; @@ -423,8 +424,9 @@ TxQ::erase(TxQ::FeeMultiSet::const_iterator_type candidateIter) -> FeeMultiSet:: } auto -TxQ::eraseAndAdvance(TxQ::FeeMultiSet::const_iterator_type candidateIter) - -> FeeMultiSet::iterator_type +TxQ::eraseAndAdvance( + TxQ::FeeMultiSet::const_iterator_type candidateIter, + std::lock_guard const&) -> FeeMultiSet::iterator_type { auto& txQAccount = byAccount_.at(candidateIter->account); auto const accountIter = txQAccount.transactions.find(candidateIter->seqProxy); @@ -459,7 +461,8 @@ auto TxQ::erase( TxQ::TxQAccount& txQAccount, TxQ::TxQAccount::TxMap::const_iterator begin, - TxQ::TxQAccount::TxMap::const_iterator end) -> TxQAccount::TxMap::iterator + TxQ::TxQAccount::TxMap::const_iterator end, + std::lock_guard const&) -> TxQAccount::TxMap::iterator { for (auto it = begin; it != end; ++it) { @@ -480,6 +483,7 @@ TxQ::tryClearAccountQueueUpThruTx( std::size_t const txExtraCount, ApplyFlags flags, FeeMetrics::Snapshot const& metricsSnapshot, + std::lock_guard const& lock, beast::Journal j) { SeqProxy const tSeqProx{tx.getSeqProxy()}; @@ -554,10 +558,10 @@ TxQ::tryClearAccountQueueUpThruTx( { // All of the queued transactions applied, so remove them from the // queue. - endTxIter = erase(accountIter->second, beginTxIter, endTxIter); + endTxIter = erase(accountIter->second, beginTxIter, endTxIter, lock); // If `tx` is replacing a queued tx, delete that one, too. if (endTxIter != accountIter->second.transactions.end() && endTxIter->first == tSeqProx) - erase(accountIter->second, endTxIter, std::next(endTxIter)); + erase(accountIter->second, endTxIter, std::next(endTxIter), lock); } return txResult; @@ -1132,6 +1136,7 @@ TxQ::apply( view.txCount(), flags, metricsSnapshot, + lock, j); if (result.applied) { @@ -1158,7 +1163,7 @@ TxQ::apply( // If the queue is full, decide whether to drop the current // transaction or the last transaction for the account with // the lowest fee. - if (!replacedTxIter && isFull()) + if (!replacedTxIter && isFull(lock)) { auto lastRIter = byFee_.rbegin(); while (lastRIter != byFee_.rend() && lastRIter->account == account) @@ -1213,7 +1218,7 @@ TxQ::apply( JLOG(j_.info()) << "Removing last item of account " << lastRIter->account << " from queue with average fee of " << endEffectiveFeeLevel << " in favor of " << transactionID << " with fee of " << feeLevelPaid; - erase(byFee_.iterator_to(dropRIter->second)); + erase(byFee_.iterator_to(dropRIter->second), lock); } else { @@ -1226,7 +1231,7 @@ TxQ::apply( // Hold the transaction in the queue. if (replacedTxIter) { - replacedTxIter = removeFromByFee(replacedTxIter, tx); + replacedTxIter = removeFromByFee(replacedTxIter, tx, lock); } if (!accountIsInQueue) @@ -1288,7 +1293,7 @@ TxQ::processClosedLedger(Application& app, ReadView const& view, bool timeLeap) if (candidateIter->lastValid && *candidateIter->lastValid <= ledgerSeq) { byAccount_.at(candidateIter->account).dropPenalty = true; - candidateIter = erase(candidateIter); + candidateIter = erase(candidateIter, lock); } else { @@ -1385,7 +1390,7 @@ TxQ::accept(Application& app, OpenView& view) << " applied successfully with " << transToken(txnResult) << ". Remove from queue."; - candidateIter = eraseAndAdvance(candidateIter); + candidateIter = eraseAndAdvance(candidateIter, lock); ledgerChanged = true; } else if ( @@ -1398,7 +1403,7 @@ TxQ::accept(Application& app, OpenView& view) account.dropPenalty = true; JLOG(j_.debug()) << "Queued transaction " << candidateIter->txID << " failed with " << transToken(txnResult) << ". Remove from queue."; - candidateIter = eraseAndAdvance(candidateIter); + candidateIter = eraseAndAdvance(candidateIter, lock); } else { @@ -1410,7 +1415,7 @@ TxQ::accept(Application& app, OpenView& view) else --candidateIter->retriesRemaining; candidateIter->lastResult = txnResult; - if (account.dropPenalty && account.transactions.size() > 1 && isFull<95>()) + if (account.dropPenalty && account.transactions.size() > 1 && isFull<95>(lock)) { // The queue is close to full, this account has multiple // txs queued, and this account has had a transaction @@ -1423,7 +1428,7 @@ TxQ::accept(Application& app, OpenView& view) << "Queue is nearly full, and transaction " << candidateIter->txID << " failed with " << transToken(txnResult) << ". Removing ticketed tx from account " << account.account; - candidateIter = eraseAndAdvance(candidateIter); + candidateIter = eraseAndAdvance(candidateIter, lock); } else { @@ -1442,7 +1447,7 @@ TxQ::accept(Application& app, OpenView& view) << ". Removing last item from account " << account.account; auto endIter = byFee_.iterator_to(dropRIter->second); if (endIter != candidateIter) - erase(endIter); + erase(endIter, lock); ++candidateIter; } } @@ -1585,8 +1590,8 @@ TxQ::tryDirectApply( if (txSeqProx.isSeq() && txSeqProx != acctSeqProx) return {}; - FeeLevel64 const requiredFeeLevel = [this, &view, flags]() { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); + FeeLevel64 const requiredFeeLevel = [this, &view, flags, &lock]() { return getRequiredFeeLevel(view, flags, feeMetrics_.getSnapshot(), lock); }(); @@ -1610,7 +1615,6 @@ TxQ::tryDirectApply( { // If the applied transaction replaced a transaction in the // queue then remove the replaced transaction. - std::lock_guard lock(mutex_); AccountMap::iterator accountIter = byAccount_.find(account); if (accountIter != byAccount_.end()) @@ -1619,7 +1623,7 @@ TxQ::tryDirectApply( if (auto const existingIter = txQAcct.transactions.find(txSeqProx); existingIter != txQAcct.transactions.end()) { - removeFromByFee(existingIter, tx); + removeFromByFee(existingIter, tx, lock); } } } @@ -1631,7 +1635,8 @@ TxQ::tryDirectApply( std::optional TxQ::removeFromByFee( std::optional const& replacedTxIter, - std::shared_ptr const& tx) + std::shared_ptr const& tx, + std::lock_guard const& lock) { if (replacedTxIter && tx) { @@ -1649,7 +1654,7 @@ TxQ::removeFromByFee( deleteIter->account == (*tx)[sfAccount], "xrpl::TxQ::removeFromByFee : matching account"); - erase(deleteIter); + erase(deleteIter, lock); } return std::nullopt; } @@ -1668,7 +1673,8 @@ TxQ::getMetrics(OpenView const& view) const result.txInLedger = view.txCount(); result.txPerLedger = snapshot.txnsExpected; result.referenceFeeLevel = baseLevel; - result.minProcessingFeeLevel = isFull() ? byFee_.rbegin()->feeLevel + FeeLevel64{1} : baseLevel; + result.minProcessingFeeLevel = + isFull(lock) ? byFee_.rbegin()->feeLevel + FeeLevel64{1} : baseLevel; result.medFeeLevel = snapshot.escalationMultiplier; result.openLedgerFeeLevel = FeeMetrics::scaleFeeLevel(snapshot, view);