diff --git a/src/bls/bls_batchverifier.h b/src/bls/bls_batchverifier.h index a455b30d8d5f..e22bbf8d8240 100644 --- a/src/bls/bls_batchverifier.h +++ b/src/bls/bls_batchverifier.h @@ -82,27 +82,27 @@ class CBLSBatchVerifier } // revert to per-source verification - for (const auto& p : messagesBySource) { + for (const auto& [from, message_map] : messagesBySource) { bool batchValid = false; // no need to verify it again if there was just one source if (messagesBySource.size() != 1) { byMessageHash.clear(); - for (auto it = p.second.begin(); it != p.second.end(); ++it) { + for (auto it = message_map.begin(); it != message_map.end(); ++it) { byMessageHash[(*it)->second.msgHash].emplace_back(*it); } batchValid = VerifyBatch(byMessageHash); } if (!batchValid) { - badSources.emplace(p.first); + badSources.emplace(from); if (perMessageFallback) { // revert to per-message verification - if (p.second.size() == 1) { + if (message_map.size() == 1) { // no need to re-verify a single message - badMessages.emplace(p.second[0]->second.msgId); + badMessages.emplace(message_map[0]->second.msgId); } else { - for (const auto& msgIt : p.second) { + for (const auto& msgIt : message_map) { if (badMessages.count(msgIt->first)) { // same message might be invalid from different source, so no need to re-verify it continue; diff --git a/src/chainlock/chainlock.cpp b/src/chainlock/chainlock.cpp index 39154d30d2d2..5ef9f9d344fc 100644 --- a/src/chainlock/chainlock.cpp +++ b/src/chainlock/chainlock.cpp @@ -217,6 +217,12 @@ void CChainLocksHandler::AcceptedBlockHeader(gsl::not_null p void CChainLocksHandler::UpdatedBlockTip(const llmq::CInstantSendManager& isman) { + // Update the cached tip in the signer before scheduling + const CBlockIndex* pindexNew = WITH_LOCK(::cs_main, return m_chainstate.m_chain.Tip()); + if (auto signer = m_signer.load(std::memory_order_acquire); signer && pindexNew) { + signer->UpdatedBlockTip(pindexNew); + } + // don't call TrySignChainTip directly but instead let the scheduler call it. This way we ensure that cs_main is // never locked and TrySignChainTip is not called twice in parallel. Also avoids recursive calls due to // EnforceBestChainLock switching chains. diff --git a/src/chainlock/signing.cpp b/src/chainlock/signing.cpp index 9b574d5cc6ee..f93477ac04be 100644 --- a/src/chainlock/signing.cpp +++ b/src/chainlock/signing.cpp @@ -26,6 +26,9 @@ ChainLockSigner::ChainLockSigner(CChainState& chainstate, ChainLockSignerParent& m_sporkman{sporkman}, m_mn_sync{mn_sync} { + // Initialize cached tip pointer + LOCK(::cs_main); + m_cached_tip.store(m_chainstate.m_chain.Tip(), std::memory_order_release); } ChainLockSigner::~ChainLockSigner() = default; @@ -40,6 +43,11 @@ void ChainLockSigner::Stop() m_sigman.UnregisterRecoveredSigsListener(this); } +void ChainLockSigner::UpdatedBlockTip(const CBlockIndex* pindexNew) +{ + m_cached_tip.store(pindexNew, std::memory_order_release); +} + void ChainLockSigner::TrySignChainTip(const llmq::CInstantSendManager& isman) { if (!m_mn_sync.IsBlockchainSynced()) { @@ -55,7 +63,7 @@ void ChainLockSigner::TrySignChainTip(const llmq::CInstantSendManager& isman) return; } - const CBlockIndex* pindex = WITH_LOCK(::cs_main, return m_chainstate.m_chain.Tip()); + const CBlockIndex* pindex = m_cached_tip.load(std::memory_order_acquire); if (!pindex || !pindex->pprev) { return; diff --git a/src/chainlock/signing.h b/src/chainlock/signing.h index 357b3295f757..78952b8bdea8 100644 --- a/src/chainlock/signing.h +++ b/src/chainlock/signing.h @@ -8,6 +8,7 @@ #include #include +class CBlockIndex; class CMasternodeSync; class CSporkManager; struct MessageProcessingResult; @@ -62,6 +63,9 @@ class ChainLockSigner final : public llmq::CRecoveredSigsListener uint256 lastSignedRequestId GUARDED_BY(cs_signer); uint256 lastSignedMsgHash GUARDED_BY(cs_signer); + // Cached tip pointer to avoid cs_main acquisition in TrySignChainTip + std::atomic m_cached_tip{nullptr}; + public: ChainLockSigner() = delete; ChainLockSigner(const ChainLockSigner&) = delete; @@ -73,6 +77,8 @@ class ChainLockSigner final : public llmq::CRecoveredSigsListener void Start(); void Stop(); + void UpdatedBlockTip(const CBlockIndex* pindexNew); + void EraseFromBlockHashTxidMap(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(!cs_signer); void UpdateBlockHashTxidMap(const uint256& hash, const std::vector& vtx) diff --git a/src/instantsend/instantsend.cpp b/src/instantsend/instantsend.cpp index 2ddb6e80083e..b0558392e863 100644 --- a/src/instantsend/instantsend.cpp +++ b/src/instantsend/instantsend.cpp @@ -168,13 +168,14 @@ MessageProcessingResult CInstantSendManager::ProcessMessage(NodeId from, std::st } LOCK(cs_pendingLocks); - pendingInstantSendLocks.emplace(hash, std::make_pair(from, islock)); + pendingInstantSendLocks.emplace(hash, instantsend::PendingISLockFromPeer{from, islock}); + NotifyWorker(); return ret; } instantsend::PendingState CInstantSendManager::ProcessPendingInstantSendLocks() { - decltype(pendingInstantSendLocks) pend; + std::vector> pend; instantsend::PendingState ret; if (!IsInstantSendEnabled()) { @@ -189,6 +190,7 @@ instantsend::PendingState CInstantSendManager::ProcessPendingInstantSendLocks() // The keys of the removed values are temporaily stored here to avoid invalidating an iterator std::vector removed; removed.reserve(maxCount); + pend.reserve(maxCount); for (const auto& [islockHash, nodeid_islptr_pair] : pendingInstantSendLocks) { // Check if we've reached max count @@ -196,7 +198,7 @@ instantsend::PendingState CInstantSendManager::ProcessPendingInstantSendLocks() ret.m_pending_work = true; break; } - pend.emplace(islockHash, std::move(nodeid_islptr_pair)); + pend.emplace_back(islockHash, std::move(nodeid_islptr_pair)); removed.emplace_back(islockHash); } @@ -222,16 +224,17 @@ instantsend::PendingState CInstantSendManager::ProcessPendingInstantSendLocks() if (!badISLocks.empty()) { LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- doing verification on old active set\n", __func__); - // filter out valid IS locks from "pend" - for (auto it = pend.begin(); it != pend.end();) { - if (!badISLocks.count(it->first)) { - it = pend.erase(it); - } else { - ++it; + // filter out valid IS locks from "pend" - keep only bad ones + std::vector> filteredPend; + filteredPend.reserve(badISLocks.size()); + for (auto& p : pend) { + if (badISLocks.contains(p.first)) { + filteredPend.push_back(std::move(p)); } } + // Now check against the previous active set and perform banning if this fails - ProcessPendingInstantSendLocks(llmq_params, dkgInterval, /*ban=*/true, pend, ret.m_peer_activity); + ProcessPendingInstantSendLocks(llmq_params, dkgInterval, /*ban=*/true, filteredPend, ret.m_peer_activity); } return ret; @@ -239,7 +242,7 @@ instantsend::PendingState CInstantSendManager::ProcessPendingInstantSendLocks() Uint256HashSet CInstantSendManager::ProcessPendingInstantSendLocks( const Consensus::LLMQParams& llmq_params, int signOffset, bool ban, - const Uint256HashMap>& pend, + const std::vector>& pend, std::vector>& peer_activity) { CBLSBatchVerifier batchVerifier(false, true, 8); @@ -249,8 +252,8 @@ Uint256HashSet CInstantSendManager::ProcessPendingInstantSendLocks( size_t alreadyVerified = 0; for (const auto& p : pend) { const auto& hash = p.first; - auto nodeId = p.second.first; - const auto& islock = p.second.second; + auto nodeId = p.second.node_id; + const auto& islock = p.second.islock; if (batchVerifier.badSources.count(nodeId)) { continue; @@ -321,8 +324,8 @@ Uint256HashSet CInstantSendManager::ProcessPendingInstantSendLocks( } for (const auto& p : pend) { const auto& hash = p.first; - auto nodeId = p.second.first; - const auto& islock = p.second.second; + auto nodeId = p.second.node_id; + const auto& islock = p.second.islock; if (batchVerifier.badMessages.count(hash)) { LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s: invalid sig in islock, peer=%d\n", @@ -399,7 +402,7 @@ MessageProcessingResult CInstantSendManager::ProcessInstantSendLock(NodeId from, } else { // put it in a separate pending map and try again later LOCK(cs_pendingLocks); - pendingNoTxInstantSendLocks.try_emplace(hash, std::make_pair(from, islock)); + pendingNoTxInstantSendLocks.try_emplace(hash, instantsend::PendingISLockFromPeer{from, islock}); } // This will also add children TXs to pendingRetryTxs @@ -442,11 +445,11 @@ void CInstantSendManager::TransactionAddedToMempool(const CTransactionRef& tx) LOCK(cs_pendingLocks); auto it = pendingNoTxInstantSendLocks.begin(); while (it != pendingNoTxInstantSendLocks.end()) { - if (it->second.second->txid == tx->GetHash()) { + if (it->second.islock->txid == tx->GetHash()) { // we received an islock earlier LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s\n", __func__, tx->GetHash().ToString(), it->first.ToString()); - islock = it->second.second; + islock = it->second.islock; pendingInstantSendLocks.try_emplace(it->first, it->second); pendingNoTxInstantSendLocks.erase(it); break; @@ -464,6 +467,7 @@ void CInstantSendManager::TransactionAddedToMempool(const CTransactionRef& tx) } else { RemoveMempoolConflictsForLock(::SerializeHash(*islock), *islock); } + NotifyWorker(); } void CInstantSendManager::TransactionRemovedFromMempool(const CTransactionRef& tx) @@ -481,6 +485,7 @@ void CInstantSendManager::TransactionRemovedFromMempool(const CTransactionRef& t LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- transaction %s was removed from mempool\n", __func__, tx->GetHash().ToString()); RemoveConflictingLock(::SerializeHash(*islock), *islock); + NotifyWorker(); } void CInstantSendManager::BlockConnected(const std::shared_ptr& pblock, const CBlockIndex* pindex) @@ -511,12 +516,14 @@ void CInstantSendManager::BlockConnected(const std::shared_ptr& pb } db.WriteBlockInstantSendLocks(pblock, pindex); + NotifyWorker(); } void CInstantSendManager::BlockDisconnected(const std::shared_ptr& pblock, const CBlockIndex* pindexDisconnected) { db.RemoveBlockInstantSendLocks(pblock, pindexDisconnected); + NotifyWorker(); } void CInstantSendManager::AddNonLockedTx(const CTransactionRef& tx, const CBlockIndex* pindexMined) @@ -539,7 +546,7 @@ void CInstantSendManager::AddNonLockedTx(const CTransactionRef& tx, const CBlock LOCK(cs_pendingLocks); auto it = pendingNoTxInstantSendLocks.begin(); while (it != pendingNoTxInstantSendLocks.end()) { - if (it->second.second->txid == tx->GetHash()) { + if (it->second.islock->txid == tx->GetHash()) { // we received an islock earlier, let's put it back into pending and verify/lock LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s\n", __func__, tx->GetHash().ToString(), it->first.ToString()); @@ -559,6 +566,7 @@ void CInstantSendManager::AddNonLockedTx(const CTransactionRef& tx, const CBlock LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, pindexMined=%s\n", __func__, tx->GetHash().ToString(), pindexMined ? pindexMined->GetBlockHash().ToString() : ""); + NotifyWorker(); } void CInstantSendManager::RemoveNonLockedTx(const uint256& txid, bool retryChildren) @@ -599,6 +607,7 @@ void CInstantSendManager::RemoveNonLockedTx(const uint256& txid, bool retryChild LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, retryChildren=%d, retryChildrenCount=%d\n", __func__, txid.ToString(), retryChildren, retryChildrenCount); + NotifyWorker(); } void CInstantSendManager::RemoveConflictedTx(const CTransaction& tx) @@ -607,6 +616,7 @@ void CInstantSendManager::RemoveConflictedTx(const CTransaction& tx) if (auto signer = m_signer.load(std::memory_order_acquire); signer) { signer->ClearInputsFromQueue(GetIdsFromLockable(tx.vin)); } + NotifyWorker(); } void CInstantSendManager::TruncateRecoveredSigsForInputs(const instantsend::InstantSendLock& islock) @@ -626,13 +636,15 @@ void CInstantSendManager::TryEmplacePendingLock(const uint256& hash, const NodeI if (db.KnownInstantSendLock(hash)) return; LOCK(cs_pendingLocks); if (!pendingInstantSendLocks.count(hash)) { - pendingInstantSendLocks.emplace(hash, std::make_pair(id, islock)); + pendingInstantSendLocks.emplace(hash, instantsend::PendingISLockFromPeer{id, islock}); } + NotifyWorker(); } void CInstantSendManager::NotifyChainLock(const CBlockIndex* pindexChainLock) { HandleFullyConfirmedBlock(pindexChainLock); + NotifyWorker(); } void CInstantSendManager::UpdatedBlockTip(const CBlockIndex* pindexNew) @@ -650,6 +662,7 @@ void CInstantSendManager::UpdatedBlockTip(const CBlockIndex* pindexNew) if (pindex) { HandleFullyConfirmedBlock(pindex); } + NotifyWorker(); } void CInstantSendManager::HandleFullyConfirmedBlock(const CBlockIndex* pindex) @@ -848,11 +861,11 @@ bool CInstantSendManager::GetInstantSendLockByHash(const uint256& hash, instants LOCK(cs_pendingLocks); auto it = pendingInstantSendLocks.find(hash); if (it != pendingInstantSendLocks.end()) { - islock = it->second.second; + islock = it->second.islock; } else { auto itNoTx = pendingNoTxInstantSendLocks.find(hash); if (itNoTx != pendingNoTxInstantSendLocks.end()) { - islock = itNoTx->second.second; + islock = itNoTx->second.islock; } else { return false; } @@ -889,7 +902,7 @@ bool CInstantSendManager::IsWaitingForTx(const uint256& txHash) const LOCK(cs_pendingLocks); auto it = pendingNoTxInstantSendLocks.begin(); while (it != pendingNoTxInstantSendLocks.end()) { - if (it->second.second->txid == txHash) { + if (it->second.islock->txid == txHash) { LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s\n", __func__, txHash.ToString(), it->first.ToString()); return true; @@ -926,6 +939,7 @@ size_t CInstantSendManager::GetInstantSendLockCount() const void CInstantSendManager::WorkThreadMain(PeerManager& peerman) { while (!workInterrupt) { + uint64_t startEpoch = workEpoch.load(std::memory_order_acquire); bool fMoreWork = [&]() -> bool { if (!IsInstantSendEnabled()) return false; auto [more_work, peer_activity] = ProcessPendingInstantSendLocks(); @@ -953,10 +967,11 @@ void CInstantSendManager::WorkThreadMain(PeerManager& peerman) signer->ProcessPendingRetryLockTxs(txns); return more_work; }(); - - if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) { - return; - } + if (fMoreWork) continue; + std::unique_lock l(workMutex); + workCv.wait(l, [this, startEpoch]{ + return bool(workInterrupt) || workEpoch.load(std::memory_order_acquire) != startEpoch; + }); } } diff --git a/src/instantsend/instantsend.h b/src/instantsend/instantsend.h index d9343e9cf418..47de7c51cb63 100644 --- a/src/instantsend/instantsend.h +++ b/src/instantsend/instantsend.h @@ -20,6 +20,7 @@ #include #include #include +#include class CBlockIndex; class CChainState; @@ -38,6 +39,11 @@ struct DbWrapperParams; namespace instantsend { class InstantSendSigner; +struct PendingISLockFromPeer { + NodeId node_id; + InstantSendLockPtr islock; +}; + struct PendingState { bool m_pending_work{false}; std::vector> m_peer_activity{}; @@ -66,12 +72,16 @@ class CInstantSendManager final : public instantsend::InstantSendSignerParent std::thread workThread; CThreadInterrupt workInterrupt; + // Event to wake the worker thread when new work arrives + Mutex workMutex; + std::condition_variable_any workCv; + std::atomic workEpoch{0}; mutable Mutex cs_pendingLocks; // Incoming and not verified yet - Uint256HashMap> pendingInstantSendLocks GUARDED_BY(cs_pendingLocks); + Uint256HashMap pendingInstantSendLocks GUARDED_BY(cs_pendingLocks); // Tried to verify but there is no tx yet - Uint256HashMap> pendingNoTxInstantSendLocks GUARDED_BY(cs_pendingLocks); + Uint256HashMap pendingNoTxInstantSendLocks GUARDED_BY(cs_pendingLocks); // TXs which are neither IS locked nor ChainLocked. We use this to determine for which TXs we need to retry IS // locking of child TXs @@ -110,14 +120,14 @@ class CInstantSendManager final : public instantsend::InstantSendSignerParent void Start(PeerManager& peerman); void Stop(); - void InterruptWorkerThread() { workInterrupt(); }; + void InterruptWorkerThread() { workInterrupt(); workCv.notify_all(); }; private: instantsend::PendingState ProcessPendingInstantSendLocks() EXCLUSIVE_LOCKS_REQUIRED(!cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry); Uint256HashSet ProcessPendingInstantSendLocks(const Consensus::LLMQParams& llmq_params, int signOffset, bool ban, - const Uint256HashMap>& pend, + const std::vector>& pend, std::vector>& peer_activity) EXCLUSIVE_LOCKS_REQUIRED(!cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry); MessageProcessingResult ProcessInstantSendLock(NodeId from, const uint256& hash, @@ -139,6 +149,10 @@ class CInstantSendManager final : public instantsend::InstantSendSignerParent void WorkThreadMain(PeerManager& peerman) EXCLUSIVE_LOCKS_REQUIRED(!cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry); + void NotifyWorker() { + workEpoch.fetch_add(1, std::memory_order_acq_rel); + workCv.notify_one(); + } void HandleFullyConfirmedBlock(const CBlockIndex* pindex) EXCLUSIVE_LOCKS_REQUIRED(!cs_nonLocked, !cs_pendingRetry); diff --git a/src/llmq/signing.cpp b/src/llmq/signing.cpp index 2511c1026a79..ae957f86616b 100644 --- a/src/llmq/signing.cpp +++ b/src/llmq/signing.cpp @@ -432,6 +432,7 @@ MessageProcessingResult CSigningManager::ProcessMessage(NodeId from, std::string } pendingRecoveredSigs[from].emplace_back(recoveredSig); + NotifyWorker(); return ret; } @@ -522,6 +523,9 @@ bool CSigningManager::ProcessPendingRecoveredSigs(PeerManager& peerman) const size_t nMaxBatchSize{32}; CollectPendingRecoveredSigsToVerify(nMaxBatchSize, recSigsByNode, quorums); if (recSigsByNode.empty()) { + // No work in this batch. Don't proactively check queues for work that may have been + // added by listeners during processing, as this causes busy-wait when combined with + // epoch changes. External threads will call NotifyWorker() to wake us if needed. return false; } @@ -574,6 +578,9 @@ bool CSigningManager::ProcessPendingRecoveredSigs(PeerManager& peerman) } } + // Only report more work if we processed a full batch, indicating there's likely more + // work from the original collection. Don't check queues for work added by listeners + // during processing, as that would cause busy-wait with epoch-based wake conditions. return recSigsByNode.size() >= nMaxBatchSize; } @@ -622,12 +629,15 @@ void CSigningManager::ProcessRecoveredSig(const std::shared_ptrGetHash().ToString()); + // Note: Don't call NotifyWorker() here as this function is called by the worker thread itself + // NotifyWorker() is only needed when external threads add work } void CSigningManager::PushReconstructedRecoveredSig(const std::shared_ptr& recoveredSig) { LOCK(cs_pending); pendingReconstructedRecoveredSigs.emplace(std::piecewise_construct, std::forward_as_tuple(recoveredSig->GetHash()), std::forward_as_tuple(recoveredSig)); + NotifyWorker(); } void CSigningManager::TruncateRecoveredSig(Consensus::LLMQType llmqType, const uint256& id) @@ -648,6 +658,7 @@ void CSigningManager::Cleanup() db.CleanupOldVotes(maxAge); lastCleanupTime = TicksSinceEpoch(SystemClock::now()); + lastCleanupTimeSteady = std::chrono::steady_clock::now(); } void CSigningManager::RegisterRecoveredSigsListener(CRecoveredSigsListener* l) @@ -732,18 +743,37 @@ void CSigningManager::StopWorkerThread() void CSigningManager::InterruptWorkerThread() { workInterrupt(); + workCv.notify_all(); } void CSigningManager::WorkThreadMain(PeerManager& peerman) { while (!workInterrupt) { + uint64_t startEpoch = workEpoch.load(std::memory_order_acquire); bool fMoreWork = ProcessPendingRecoveredSigs(peerman); Cleanup(); - // TODO Wakeup when pending signing is needed? - if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) { - return; + if (fMoreWork) continue; + std::unique_lock l(workMutex); + // Compute next cleanup deadline (~5s cadence) and wait event-driven until either + // new work arrives or the deadline is reached. + auto next_deadline = std::chrono::steady_clock::time_point::max(); + { + auto now_steady = std::chrono::steady_clock::now(); + auto next_cleanup = lastCleanupTimeSteady + std::chrono::milliseconds(5000); + if (next_cleanup > now_steady) { + next_deadline = next_cleanup; + } + } + if (next_deadline == std::chrono::steady_clock::time_point::max()) { + workCv.wait(l, [this, startEpoch]{ + return bool(workInterrupt) || workEpoch.load(std::memory_order_acquire) != startEpoch; + }); + } else { + workCv.wait_until(l, next_deadline, [this, startEpoch]{ + return bool(workInterrupt) || workEpoch.load(std::memory_order_acquire) != startEpoch; + }); } } } diff --git a/src/llmq/signing.h b/src/llmq/signing.h index 2950a3d60dc4..18b95a796aed 100644 --- a/src/llmq/signing.h +++ b/src/llmq/signing.h @@ -24,6 +24,8 @@ #include #include #include +#include +#include class CChainState; class CDataStream; @@ -178,6 +180,7 @@ class CSigningManager FastRandomContext rnd GUARDED_BY(cs_pending); int64_t lastCleanupTime{0}; + std::chrono::steady_clock::time_point lastCleanupTimeSteady{}; mutable Mutex cs_listeners; std::vector recoveredSigsListeners GUARDED_BY(cs_listeners); @@ -241,13 +244,18 @@ class CSigningManager private: std::thread workThread; CThreadInterrupt workInterrupt; - void Cleanup(); // called from the worker thread of CSigSharesManager + // Event to wake the worker thread when new work arrives + Mutex workMutex; + std::condition_variable_any workCv; + std::atomic workEpoch{0}; + void Cleanup(); // called from the worker thread void WorkThreadMain(PeerManager& peerman) EXCLUSIVE_LOCKS_REQUIRED(!cs_pending, !cs_listeners); public: void StartWorkerThread(PeerManager& peerman); void StopWorkerThread(); void InterruptWorkerThread(); + void NotifyWorker() { workEpoch.fetch_add(1, std::memory_order_acq_rel); workCv.notify_one(); } }; template diff --git a/src/llmq/signing_shares.cpp b/src/llmq/signing_shares.cpp index ca80f5d45c39..afd5fefb9866 100644 --- a/src/llmq/signing_shares.cpp +++ b/src/llmq/signing_shares.cpp @@ -231,6 +231,8 @@ void CSigSharesManager::UnregisterAsRecoveredSigsListener() void CSigSharesManager::InterruptWorkerThread() { workInterrupt(); + // Wake the worker to allow prompt shutdown + workCv.notify_all(); } void CSigSharesManager::ProcessMessage(const CNode& pfrom, const std::string& msg_type, CDataStream& vRecv) @@ -251,6 +253,7 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, const std::string& ms for (const auto& sigShare : receivedSigShares) { ProcessMessageSigShare(pfrom.GetId(), sigShare); } + NotifyWorker(); } if (msg_type == NetMsgType::QSIGSESANN) { @@ -266,6 +269,7 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, const std::string& ms BanNode(pfrom.GetId()); return; } + NotifyWorker(); } else if (msg_type == NetMsgType::QSIGSHARESINV) { std::vector msgs; vRecv >> msgs; @@ -279,6 +283,7 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, const std::string& ms BanNode(pfrom.GetId()); return; } + NotifyWorker(); } else if (msg_type == NetMsgType::QGETSIGSHARES) { std::vector msgs; vRecv >> msgs; @@ -292,6 +297,7 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, const std::string& ms BanNode(pfrom.GetId()); return; } + NotifyWorker(); } else if (msg_type == NetMsgType::QBSIGSHARES) { std::vector msgs; vRecv >> msgs; @@ -309,6 +315,7 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, const std::string& ms BanNode(pfrom.GetId()); return; } + NotifyWorker(); } } @@ -658,9 +665,12 @@ bool CSigSharesManager::ProcessPendingSigShares() continue; } + // Materialize the signature once. Get() internally validates, so if it returns an invalid signature, + // we know it's malformed. This avoids calling Get() twice (once for IsValid(), once for PushMessage). + CBLSSignature sig = sigShare.sigShare.Get(); // we didn't check this earlier because we use a lazy BLS signature and tried to avoid doing the expensive // deserialization in the message thread - if (!sigShare.sigShare.Get().IsValid()) { + if (!sig.IsValid()) { BanNode(nodeId); // don't process any additional shares from this node break; @@ -676,7 +686,7 @@ bool CSigSharesManager::ProcessPendingSigShares() assert(false); } - batchVerifier.PushMessage(nodeId, sigShare.GetKey(), sigShare.GetSignHash(), sigShare.sigShare.Get(), pubKeyShare); + batchVerifier.PushMessage(nodeId, sigShare.GetKey(), sigShare.GetSignHash(), sig, pubKeyShare); verifyCount++; } } @@ -767,6 +777,10 @@ void CSigSharesManager::ProcessSigShare(const CSigShare& sigShare, const CQuorum } } + // Note: Don't call NotifyWorker() here even when queued_announce is true + // When called from worker thread: SendMessages() will handle announcements in same iteration + // When called from external thread: ProcessMessage() already calls NotifyWorker() (line 303) + if (canTryRecovery) { TryRecoverSig(*quorum, sigShare.getId(), sigShare.getMsgHash()); } @@ -778,8 +792,11 @@ void CSigSharesManager::TryRecoverSig(const CQuorum& quorum, const uint256& id, return; } - std::vector sigSharesForRecovery; + // Collect lazy signatures (cheap copy) under lock, then materialize outside lock + std::vector lazySignatures; std::vector idsForRecovery; + bool isSingleNode = false; + { LOCK(cs); @@ -798,28 +815,44 @@ void CSigSharesManager::TryRecoverSig(const CQuorum& quorum, const uint256& id, return; } const auto& sigShare = sigSharesForSignHash->begin()->second; - CBLSSignature recoveredSig = sigShare.sigShare.Get(); + // Copy the lazy signature (cheap), materialize later outside lock + lazySignatures.emplace_back(sigShare.sigShare); + isSingleNode = true; LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- recover single-node signature. id=%s, msgHash=%s\n", __func__, id.ToString(), msgHash.ToString()); + } else { + // Collect lazy signatures and IDs under lock (cheap operations) + lazySignatures.reserve((size_t) quorum.params.threshold); + idsForRecovery.reserve((size_t) quorum.params.threshold); + for (auto it = sigSharesForSignHash->begin(); + it != sigSharesForSignHash->end() && lazySignatures.size() < size_t(quorum.params.threshold); + ++it) { + const auto& sigShare = it->second; + lazySignatures.emplace_back(sigShare.sigShare); // Cheap copy of lazy wrapper + idsForRecovery.emplace_back(quorum.members[sigShare.getQuorumMember()]->proTxHash); + } - auto rs = std::make_shared(quorum.params.type, quorum.qc->quorumHash, id, msgHash, - recoveredSig); - sigman.ProcessRecoveredSig(rs, m_peerman); - return; // end of single-quorum processing + // check if we can recover the final signature + if (lazySignatures.size() < size_t(quorum.params.threshold)) { + return; + } } + } // Release lock before expensive materialization - sigSharesForRecovery.reserve((size_t) quorum.params.threshold); - idsForRecovery.reserve((size_t) quorum.params.threshold); - for (auto it = sigSharesForSignHash->begin(); it != sigSharesForSignHash->end() && sigSharesForRecovery.size() < size_t(quorum.params.threshold); ++it) { - const auto& sigShare = it->second; - sigSharesForRecovery.emplace_back(sigShare.sigShare.Get()); - idsForRecovery.emplace_back(quorum.members[sigShare.getQuorumMember()]->proTxHash); - } + // Materialize signatures outside the critical section (expensive BLS operations) + if (isSingleNode) { + CBLSSignature recoveredSig = lazySignatures[0].Get(); + auto rs = std::make_shared(quorum.params.type, quorum.qc->quorumHash, id, msgHash, + recoveredSig); + sigman.ProcessRecoveredSig(rs, m_peerman); + return; // end of single-quorum processing + } - // check if we can recover the final signature - if (sigSharesForRecovery.size() < size_t(quorum.params.threshold)) { - return; - } + // Multi-node case: materialize all signatures + std::vector sigSharesForRecovery; + sigSharesForRecovery.reserve(lazySignatures.size()); + for (const auto& lazySig : lazySignatures) { + sigSharesForRecovery.emplace_back(lazySig.Get()); // Expensive, but outside lock } // now recover it @@ -1113,7 +1146,7 @@ void CSigSharesManager::CollectSigSharesToSendConcentrated(std::unordered_map().count(); + auto curTime = std::chrono::steady_clock::now(); for (auto& [_, signedSession] : signedSessions) { if (!IsAllMembersConnectedEnabled(signedSession.quorum->params.type, m_sporkman)) { @@ -1127,7 +1160,7 @@ void CSigSharesManager::CollectSigSharesToSendConcentrated(std::unordered_map= signedSession.nextAttemptTime) { int64_t waitTime = exp2(signedSession.attempt) * EXP_SEND_FOR_RECOVERY_TIMEOUT; waitTime = std::min(MAX_SEND_FOR_RECOVERY_TIMEOUT, waitTime); - signedSession.nextAttemptTime = curTime + waitTime; + signedSession.nextAttemptTime = curTime + std::chrono::milliseconds(waitTime); auto dmn = SelectMemberForRecovery(*signedSession.quorum, signedSession.sigShare.getId(), signedSession.attempt); signedSession.attempt++; @@ -1511,6 +1544,7 @@ void CSigSharesManager::Cleanup() } lastCleanupTime = GetTime().count(); + lastCleanupTimeSteady = std::chrono::steady_clock::now(); } void CSigSharesManager::RemoveSigSharesForSession(const uint256& signHash) @@ -1573,6 +1607,8 @@ void CSigSharesManager::BanNode(NodeId nodeId) }); nodeState.requestedSigShares.Clear(); nodeState.banned = true; + // Banning affects request routing and can create immediate work + NotifyWorker(); } void CSigSharesManager::WorkThreadMain() @@ -1580,22 +1616,59 @@ void CSigSharesManager::WorkThreadMain() int64_t lastSendTime = 0; while (!workInterrupt) { + // capture epoch at loop start to detect intervening notifications + uint64_t startEpoch = workEpoch.load(std::memory_order_acquire); RemoveBannedNodeStates(); bool fMoreWork = ProcessPendingSigShares(); SignPendingSigShares(); - if (TicksSinceEpoch(SystemClock::now()) - lastSendTime > 100) { - SendMessages(); + bool didSend = false; + if (TicksSinceEpoch(SystemClock::now()) - lastSendTime > 10) { + didSend = SendMessages(); lastSendTime = TicksSinceEpoch(SystemClock::now()); } Cleanup(); - // TODO Wakeup when pending signing is needed? - if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) { - return; + // If there is more work or we just sent something, iterate again without waiting + if (fMoreWork || didSend) { + continue; } + + // Compute next wake-up deadline for periodic tasks (recovery attempts and cleanup cadence) + auto next_deadline = std::chrono::steady_clock::time_point::max(); + // Respect cleanup cadence (~5s) even when idle + { + auto now_steady = std::chrono::steady_clock::now(); + auto next_cleanup = lastCleanupTimeSteady + std::chrono::seconds(5); + if (next_cleanup > now_steady) { + if (next_cleanup < next_deadline) next_deadline = next_cleanup; + } + } + { + // Consider next recovery attempt times for signed sessions to avoid polling + LOCK(cs); + auto now_steady = std::chrono::steady_clock::now(); + for (const auto& [_, s] : signedSessions) { + if (s.nextAttemptTime > now_steady) { + if (s.nextAttemptTime < next_deadline) next_deadline = s.nextAttemptTime; + } + } + } + + // Wait event-driven until notified or deadline reached, or interrupted + std::unique_lock l(workMutex); + if (next_deadline == std::chrono::steady_clock::time_point::max()) { + workCv.wait_for(l, std::chrono::milliseconds(10), [this, startEpoch]{ + return bool(workInterrupt) || workEpoch.load(std::memory_order_acquire) != startEpoch; + }); + } else { + workCv.wait_until(l, next_deadline, [this, startEpoch]{ + return bool(workInterrupt) || workEpoch.load(std::memory_order_acquire) != startEpoch; + }); + } + // If epoch changed while we were waiting, loop will process immediately } } @@ -1603,6 +1676,8 @@ void CSigSharesManager::AsyncSign(CQuorumCPtr quorum, const uint256& id, const u { LOCK(cs_pendingSigns); pendingSigns.emplace_back(std::move(quorum), id, msgHash); + // Wake worker to handle new pending sign immediately + NotifyWorker(); } void CSigSharesManager::SignPendingSigShares() @@ -1622,11 +1697,13 @@ void CSigSharesManager::SignPendingSigShares() auto& session = signedSessions[sigShare.GetSignHash()]; session.sigShare = sigShare; session.quorum = pQuorum; - session.nextAttemptTime = 0; + session.nextAttemptTime = std::chrono::steady_clock::time_point{}; session.attempt = 0; } } } + // Note: Don't call NotifyWorker() here as this function is called by the worker thread itself + // NotifyWorker() is only needed when external threads add work } std::optional CSigSharesManager::CreateSigShare(const CQuorum& quorum, const uint256& id, const uint256& msgHash) const @@ -1724,6 +1801,8 @@ void CSigSharesManager::ForceReAnnouncement(const CQuorum& quorum, Consensus::LL // we need to use a new session id as we don't know if the other node has run into a timeout already session->sendSessionId = UNINITIALIZED_SESSION_ID; } + // Wake worker so announcements are sent promptly + NotifyWorker(); } MessageProcessingResult CSigSharesManager::HandleNewRecoveredSig(const llmq::CRecoveredSig& recoveredSig) @@ -1731,6 +1810,15 @@ MessageProcessingResult CSigSharesManager::HandleNewRecoveredSig(const llmq::CRe auto signHash = recoveredSig.buildSignHash().Get(); LOCK(cs); RemoveSigSharesForSession(signHash); + // Cleaning up a session can free resources; wake worker to proceed + NotifyWorker(); return {}; } + +void CSigSharesManager::NotifyWorker() +{ + // Avoid spurious wake-ups causing contention; simple notify is fine + workEpoch.fetch_add(1, std::memory_order_acq_rel); + workCv.notify_one(); +} } // namespace llmq diff --git a/src/llmq/signing_shares.h b/src/llmq/signing_shares.h index 14e20edce459..8b6827d30afe 100644 --- a/src/llmq/signing_shares.h +++ b/src/llmq/signing_shares.h @@ -26,6 +26,7 @@ #include #include #include +#include class CActiveMasternodeManager; class CNode; @@ -210,12 +211,9 @@ class SigShareMap T& GetOrAdd(const SigShareKey& k) { - T* v = Get(k); - if (!v) { - Add(k, T()); - v = Get(k); - } - return *v; + auto& m = internalMap[k.first]; // Get or create outer map entry + auto result = m.emplace(k.second, T()); // Try to insert, returns pair + return result.first->second; // Return reference to the value (new or existing) } const T* GetFirst() const @@ -355,7 +353,7 @@ class CSignedSession CSigShare sigShare; CQuorumCPtr quorum; - int64_t nextAttemptTime{0}; + std::chrono::steady_clock::time_point nextAttemptTime{}; int attempt{0}; }; @@ -380,6 +378,10 @@ class CSigSharesManager : public CRecoveredSigsListener std::thread workThread; CThreadInterrupt workInterrupt; + // Event to wake the worker thread when new work arrives + Mutex workMutex; + std::condition_variable_any workCv; + std::atomic workEpoch{0}; SigShareMap sigShares GUARDED_BY(cs); Uint256HashMap signedSessions GUARDED_BY(cs); @@ -413,6 +415,7 @@ class CSigSharesManager : public CRecoveredSigsListener const CSporkManager& m_sporkman; int64_t lastCleanupTime{0}; + std::chrono::steady_clock::time_point lastCleanupTimeSteady{}; std::atomic recoveredSigsCounter{0}; public: @@ -491,6 +494,7 @@ class CSigSharesManager : public CRecoveredSigsListener EXCLUSIVE_LOCKS_REQUIRED(cs); void SignPendingSigShares() EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns); void WorkThreadMain() EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns); + void NotifyWorker(); }; } // namespace llmq