From 24d836b13b44b6b3562480a08d540fe567ced687 Mon Sep 17 00:00:00 2001 From: pasta Date: Thu, 9 Oct 2025 19:10:05 -0500 Subject: [PATCH 01/14] perf: reduce timing intervals in CSigSharesManager for improved responsiveness Adjusted the timing intervals in the WorkThreadMain function of CSigSharesManager to 10 milliseconds for both message sending and work interruption checks. This change enhances the responsiveness of the signing shares manager by allowing more frequent processing of pending signature shares and message sending. This results in ~33% latency improvements in a contrived local latency functional test / benchmark from ~500ms to ~333ms --- src/llmq/signing_shares.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/llmq/signing_shares.cpp b/src/llmq/signing_shares.cpp index ca80f5d45c39..1072d462c530 100644 --- a/src/llmq/signing_shares.cpp +++ b/src/llmq/signing_shares.cpp @@ -1585,7 +1585,7 @@ void CSigSharesManager::WorkThreadMain() bool fMoreWork = ProcessPendingSigShares(); SignPendingSigShares(); - if (TicksSinceEpoch(SystemClock::now()) - lastSendTime > 100) { + if (TicksSinceEpoch(SystemClock::now()) - lastSendTime > 10) { SendMessages(); lastSendTime = TicksSinceEpoch(SystemClock::now()); } @@ -1593,7 +1593,7 @@ void CSigSharesManager::WorkThreadMain() Cleanup(); // TODO Wakeup when pending signing is needed? - if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) { + if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(10))) { return; } } From 1df9ed588b998a84ce48e5569020572405acb893 Mon Sep 17 00:00:00 2001 From: pasta Date: Wed, 15 Oct 2025 14:20:17 -0500 Subject: [PATCH 02/14] feat: enhance worker thread responsiveness in CInstantSendManager and CSigSharesManager Added a NotifyWorker function to both CInstantSendManager and CSigSharesManager to signal the worker thread when new work is available. This change improves the responsiveness of the worker threads by allowing them to wake up promptly when there are pending tasks, thus enhancing overall performance and reducing latency in processing instant send locks and signature shares. --- src/instantsend/instantsend.cpp | 21 +++++++-- src/instantsend/instantsend.h | 10 ++++- src/llmq/signing.cpp | 47 ++++++++++++++++++-- src/llmq/signing.h | 10 ++++- src/llmq/signing_shares.cpp | 78 +++++++++++++++++++++++++++++++-- src/llmq/signing_shares.h | 10 ++++- 6 files changed, 159 insertions(+), 17 deletions(-) diff --git a/src/instantsend/instantsend.cpp b/src/instantsend/instantsend.cpp index 2ddb6e80083e..fc3af2221849 100644 --- a/src/instantsend/instantsend.cpp +++ b/src/instantsend/instantsend.cpp @@ -169,6 +169,7 @@ MessageProcessingResult CInstantSendManager::ProcessMessage(NodeId from, std::st LOCK(cs_pendingLocks); pendingInstantSendLocks.emplace(hash, std::make_pair(from, islock)); + NotifyWorker(); return ret; } @@ -464,6 +465,7 @@ void CInstantSendManager::TransactionAddedToMempool(const CTransactionRef& tx) } else { RemoveMempoolConflictsForLock(::SerializeHash(*islock), *islock); } + NotifyWorker(); } void CInstantSendManager::TransactionRemovedFromMempool(const CTransactionRef& tx) @@ -481,6 +483,7 @@ void CInstantSendManager::TransactionRemovedFromMempool(const CTransactionRef& t LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- transaction %s was removed from mempool\n", __func__, tx->GetHash().ToString()); RemoveConflictingLock(::SerializeHash(*islock), *islock); + NotifyWorker(); } void CInstantSendManager::BlockConnected(const std::shared_ptr& pblock, const CBlockIndex* pindex) @@ -511,12 +514,14 @@ void CInstantSendManager::BlockConnected(const std::shared_ptr& pb } db.WriteBlockInstantSendLocks(pblock, pindex); + NotifyWorker(); } void CInstantSendManager::BlockDisconnected(const std::shared_ptr& pblock, const CBlockIndex* pindexDisconnected) { db.RemoveBlockInstantSendLocks(pblock, pindexDisconnected); + NotifyWorker(); } void CInstantSendManager::AddNonLockedTx(const CTransactionRef& tx, const CBlockIndex* pindexMined) @@ -559,6 +564,7 @@ void CInstantSendManager::AddNonLockedTx(const CTransactionRef& tx, const CBlock LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, pindexMined=%s\n", __func__, tx->GetHash().ToString(), pindexMined ? pindexMined->GetBlockHash().ToString() : ""); + NotifyWorker(); } void CInstantSendManager::RemoveNonLockedTx(const uint256& txid, bool retryChildren) @@ -599,6 +605,7 @@ void CInstantSendManager::RemoveNonLockedTx(const uint256& txid, bool retryChild LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, retryChildren=%d, retryChildrenCount=%d\n", __func__, txid.ToString(), retryChildren, retryChildrenCount); + NotifyWorker(); } void CInstantSendManager::RemoveConflictedTx(const CTransaction& tx) @@ -607,6 +614,7 @@ void CInstantSendManager::RemoveConflictedTx(const CTransaction& tx) if (auto signer = m_signer.load(std::memory_order_acquire); signer) { signer->ClearInputsFromQueue(GetIdsFromLockable(tx.vin)); } + NotifyWorker(); } void CInstantSendManager::TruncateRecoveredSigsForInputs(const instantsend::InstantSendLock& islock) @@ -628,11 +636,13 @@ void CInstantSendManager::TryEmplacePendingLock(const uint256& hash, const NodeI if (!pendingInstantSendLocks.count(hash)) { pendingInstantSendLocks.emplace(hash, std::make_pair(id, islock)); } + NotifyWorker(); } void CInstantSendManager::NotifyChainLock(const CBlockIndex* pindexChainLock) { HandleFullyConfirmedBlock(pindexChainLock); + NotifyWorker(); } void CInstantSendManager::UpdatedBlockTip(const CBlockIndex* pindexNew) @@ -650,6 +660,7 @@ void CInstantSendManager::UpdatedBlockTip(const CBlockIndex* pindexNew) if (pindex) { HandleFullyConfirmedBlock(pindex); } + NotifyWorker(); } void CInstantSendManager::HandleFullyConfirmedBlock(const CBlockIndex* pindex) @@ -926,6 +937,7 @@ size_t CInstantSendManager::GetInstantSendLockCount() const void CInstantSendManager::WorkThreadMain(PeerManager& peerman) { while (!workInterrupt) { + uint64_t startEpoch = workEpoch.load(std::memory_order_acquire); bool fMoreWork = [&]() -> bool { if (!IsInstantSendEnabled()) return false; auto [more_work, peer_activity] = ProcessPendingInstantSendLocks(); @@ -953,10 +965,11 @@ void CInstantSendManager::WorkThreadMain(PeerManager& peerman) signer->ProcessPendingRetryLockTxs(txns); return more_work; }(); - - if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) { - return; - } + if (fMoreWork) continue; + std::unique_lock l(workMutex); + workCv.wait(l, [this, startEpoch]{ + return bool(workInterrupt) || workEpoch.load(std::memory_order_acquire) != startEpoch; + }); } } diff --git a/src/instantsend/instantsend.h b/src/instantsend/instantsend.h index d9343e9cf418..f7dea1f5895e 100644 --- a/src/instantsend/instantsend.h +++ b/src/instantsend/instantsend.h @@ -66,6 +66,10 @@ class CInstantSendManager final : public instantsend::InstantSendSignerParent std::thread workThread; CThreadInterrupt workInterrupt; + // Event to wake the worker thread when new work arrives + Mutex workMutex; + std::condition_variable_any workCv; + std::atomic workEpoch{0}; mutable Mutex cs_pendingLocks; // Incoming and not verified yet @@ -110,7 +114,7 @@ class CInstantSendManager final : public instantsend::InstantSendSignerParent void Start(PeerManager& peerman); void Stop(); - void InterruptWorkerThread() { workInterrupt(); }; + void InterruptWorkerThread() { workInterrupt(); workCv.notify_all(); }; private: instantsend::PendingState ProcessPendingInstantSendLocks() @@ -139,6 +143,10 @@ class CInstantSendManager final : public instantsend::InstantSendSignerParent void WorkThreadMain(PeerManager& peerman) EXCLUSIVE_LOCKS_REQUIRED(!cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry); + void NotifyWorker() { + workEpoch.fetch_add(1, std::memory_order_acq_rel); + workCv.notify_one(); + } void HandleFullyConfirmedBlock(const CBlockIndex* pindex) EXCLUSIVE_LOCKS_REQUIRED(!cs_nonLocked, !cs_pendingRetry); diff --git a/src/llmq/signing.cpp b/src/llmq/signing.cpp index 2511c1026a79..76b028deb2e5 100644 --- a/src/llmq/signing.cpp +++ b/src/llmq/signing.cpp @@ -432,6 +432,8 @@ MessageProcessingResult CSigningManager::ProcessMessage(NodeId from, std::string } pendingRecoveredSigs[from].emplace_back(recoveredSig); + workEpoch.fetch_add(1, std::memory_order_acq_rel); + NotifyWorker(); return ret; } @@ -522,6 +524,13 @@ bool CSigningManager::ProcessPendingRecoveredSigs(PeerManager& peerman) const size_t nMaxBatchSize{32}; CollectPendingRecoveredSigsToVerify(nMaxBatchSize, recSigsByNode, quorums); if (recSigsByNode.empty()) { + // Check if reconstructed queue has work pending so the caller can keep looping + { + LOCK(cs_pending); + if (!pendingReconstructedRecoveredSigs.empty() || !pendingRecoveredSigs.empty()) { + return true; + } + } return false; } @@ -574,7 +583,13 @@ bool CSigningManager::ProcessPendingRecoveredSigs(PeerManager& peerman) } } - return recSigsByNode.size() >= nMaxBatchSize; + // If we still have pending items in queues, report more work to avoid sleeping + bool more_in_queues = false; + { + LOCK(cs_pending); + more_in_queues = !pendingReconstructedRecoveredSigs.empty() || !pendingRecoveredSigs.empty(); + } + return more_in_queues || recSigsByNode.size() >= nMaxBatchSize; } // signature must be verified already @@ -622,12 +637,16 @@ void CSigningManager::ProcessRecoveredSig(const std::shared_ptrGetHash().ToString()); + workEpoch.fetch_add(1, std::memory_order_acq_rel); + NotifyWorker(); } void CSigningManager::PushReconstructedRecoveredSig(const std::shared_ptr& recoveredSig) { LOCK(cs_pending); pendingReconstructedRecoveredSigs.emplace(std::piecewise_construct, std::forward_as_tuple(recoveredSig->GetHash()), std::forward_as_tuple(recoveredSig)); + workEpoch.fetch_add(1, std::memory_order_acq_rel); + NotifyWorker(); } void CSigningManager::TruncateRecoveredSig(Consensus::LLMQType llmqType, const uint256& id) @@ -732,18 +751,38 @@ void CSigningManager::StopWorkerThread() void CSigningManager::InterruptWorkerThread() { workInterrupt(); + workCv.notify_all(); } void CSigningManager::WorkThreadMain(PeerManager& peerman) { while (!workInterrupt) { + uint64_t startEpoch = workEpoch.load(std::memory_order_acquire); bool fMoreWork = ProcessPendingRecoveredSigs(peerman); Cleanup(); - // TODO Wakeup when pending signing is needed? - if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) { - return; + if (fMoreWork) continue; + std::unique_lock l(workMutex); + // Compute next cleanup deadline (~5s cadence) and wait event-driven until either + // new work arrives or the deadline is reached. + auto next_deadline = std::chrono::steady_clock::time_point::max(); + { + int64_t now_ms = TicksSinceEpoch(SystemClock::now()); + int64_t target_ms = lastCleanupTime + 5000; + if (target_ms > now_ms) { + auto delta_ms = target_ms - now_ms; + next_deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(delta_ms); + } + } + if (next_deadline == std::chrono::steady_clock::time_point::max()) { + workCv.wait(l, [this, startEpoch]{ + return bool(workInterrupt) || workEpoch.load(std::memory_order_acquire) != startEpoch; + }); + } else { + workCv.wait_until(l, next_deadline, [this, startEpoch]{ + return bool(workInterrupt) || workEpoch.load(std::memory_order_acquire) != startEpoch; + }); } } } diff --git a/src/llmq/signing.h b/src/llmq/signing.h index 2950a3d60dc4..7496d329ed15 100644 --- a/src/llmq/signing.h +++ b/src/llmq/signing.h @@ -24,6 +24,8 @@ #include #include #include +#include +#include class CChainState; class CDataStream; @@ -241,13 +243,17 @@ class CSigningManager private: std::thread workThread; CThreadInterrupt workInterrupt; - void Cleanup(); // called from the worker thread of CSigSharesManager - void WorkThreadMain(PeerManager& peerman) EXCLUSIVE_LOCKS_REQUIRED(!cs_pending, !cs_listeners); + // Event to wake the worker thread when new work arrives + Mutex workMutex; + std::condition_variable_any workCv; + std::atomic workEpoch{0}; + void WorkThreadMain(PeerManager& peerman); public: void StartWorkerThread(PeerManager& peerman); void StopWorkerThread(); void InterruptWorkerThread(); + void NotifyWorker() { workEpoch.fetch_add(1, std::memory_order_acq_rel); workCv.notify_one(); } }; template diff --git a/src/llmq/signing_shares.cpp b/src/llmq/signing_shares.cpp index 1072d462c530..9f5410f395db 100644 --- a/src/llmq/signing_shares.cpp +++ b/src/llmq/signing_shares.cpp @@ -231,6 +231,8 @@ void CSigSharesManager::UnregisterAsRecoveredSigsListener() void CSigSharesManager::InterruptWorkerThread() { workInterrupt(); + // Wake the worker to allow prompt shutdown + workCv.notify_all(); } void CSigSharesManager::ProcessMessage(const CNode& pfrom, const std::string& msg_type, CDataStream& vRecv) @@ -310,6 +312,9 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, const std::string& ms return; } } + + // New inbound messages can create work (requests, shares, announcements) + NotifyWorker(); } bool CSigSharesManager::ProcessMessageSigSesAnn(const CNode& pfrom, const CSigSesAnn& ann) @@ -738,6 +743,7 @@ void CSigSharesManager::ProcessSigShare(const CSigShare& sigShare, const CQuorum return; } + bool queued_announce = false; { LOCK(cs); @@ -746,6 +752,7 @@ void CSigSharesManager::ProcessSigShare(const CSigShare& sigShare, const CQuorum } if (!isAllMembersConnectedEnabled) { sigSharesQueuedToAnnounce.Add(sigShare.GetKey(), true); + queued_announce = true; } // Update the time we've seen the last sigShare @@ -767,6 +774,10 @@ void CSigSharesManager::ProcessSigShare(const CSigShare& sigShare, const CQuorum } } + if (queued_announce) { + NotifyWorker(); + } + if (canTryRecovery) { TryRecoverSig(*quorum, sigShare.getId(), sigShare.getMsgHash()); } @@ -1573,6 +1584,8 @@ void CSigSharesManager::BanNode(NodeId nodeId) }); nodeState.requestedSigShares.Clear(); nodeState.banned = true; + // Banning affects request routing and can create immediate work + NotifyWorker(); } void CSigSharesManager::WorkThreadMain() @@ -1580,22 +1593,64 @@ void CSigSharesManager::WorkThreadMain() int64_t lastSendTime = 0; while (!workInterrupt) { + // capture epoch at loop start to detect intervening notifications + uint64_t startEpoch = workEpoch.load(std::memory_order_acquire); RemoveBannedNodeStates(); bool fMoreWork = ProcessPendingSigShares(); SignPendingSigShares(); + bool didSend = false; if (TicksSinceEpoch(SystemClock::now()) - lastSendTime > 10) { - SendMessages(); + didSend = SendMessages(); lastSendTime = TicksSinceEpoch(SystemClock::now()); } Cleanup(); - // TODO Wakeup when pending signing is needed? - if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(10))) { - return; + // If there is more work or we just sent something, iterate again without waiting + if (fMoreWork || didSend) { + continue; } + + // Compute next wake-up deadline for periodic tasks (recovery attempts and cleanup cadence) + auto next_deadline = std::chrono::steady_clock::time_point::max(); + // Respect cleanup cadence (~5s) even when idle + { + auto now_tp = std::chrono::steady_clock::now(); + int64_t now_s = GetTime().count(); + int64_t target_s = lastCleanupTime + 5; + if (target_s > now_s) { + auto delta_ms = (target_s - now_s) * 1000; + auto cand = now_tp + std::chrono::milliseconds(delta_ms); + if (cand < next_deadline) next_deadline = cand; + } + } + { + // Consider next recovery attempt times for signed sessions to avoid polling + LOCK(cs); + int64_t cur_ms = TicksSinceEpoch(SystemClock::now()); + for (const auto& [_, s] : signedSessions) { + if (s.nextAttemptTime > cur_ms) { + auto d = s.nextAttemptTime - cur_ms; + auto cand = std::chrono::steady_clock::now() + std::chrono::milliseconds(d); + if (cand < next_deadline) next_deadline = cand; + } + } + } + + // Wait event-driven until notified or deadline reached, or interrupted + std::unique_lock l(workMutex); + if (next_deadline == std::chrono::steady_clock::time_point::max()) { + workCv.wait_for(l, std::chrono::milliseconds(10), [this, startEpoch]{ + return bool(workInterrupt) || workEpoch.load(std::memory_order_acquire) != startEpoch; + }); + } else { + workCv.wait_until(l, next_deadline, [this, startEpoch]{ + return bool(workInterrupt) || workEpoch.load(std::memory_order_acquire) != startEpoch; + }); + } + // If epoch changed while we were waiting, loop will process immediately } } @@ -1603,6 +1658,8 @@ void CSigSharesManager::AsyncSign(CQuorumCPtr quorum, const uint256& id, const u { LOCK(cs_pendingSigns); pendingSigns.emplace_back(std::move(quorum), id, msgHash); + // Wake worker to handle new pending sign immediately + NotifyWorker(); } void CSigSharesManager::SignPendingSigShares() @@ -1627,6 +1684,8 @@ void CSigSharesManager::SignPendingSigShares() } } } + // New sig shares or recovery attempts may be available + NotifyWorker(); } std::optional CSigSharesManager::CreateSigShare(const CQuorum& quorum, const uint256& id, const uint256& msgHash) const @@ -1724,6 +1783,8 @@ void CSigSharesManager::ForceReAnnouncement(const CQuorum& quorum, Consensus::LL // we need to use a new session id as we don't know if the other node has run into a timeout already session->sendSessionId = UNINITIALIZED_SESSION_ID; } + // Wake worker so announcements are sent promptly + NotifyWorker(); } MessageProcessingResult CSigSharesManager::HandleNewRecoveredSig(const llmq::CRecoveredSig& recoveredSig) @@ -1731,6 +1792,15 @@ MessageProcessingResult CSigSharesManager::HandleNewRecoveredSig(const llmq::CRe auto signHash = recoveredSig.buildSignHash().Get(); LOCK(cs); RemoveSigSharesForSession(signHash); + // Cleaning up a session can free resources; wake worker to proceed + NotifyWorker(); return {}; } + +void CSigSharesManager::NotifyWorker() +{ + // Avoid spurious wake-ups causing contention; simple notify is fine + workEpoch.fetch_add(1, std::memory_order_acq_rel); + workCv.notify_one(); +} } // namespace llmq diff --git a/src/llmq/signing_shares.h b/src/llmq/signing_shares.h index 14e20edce459..a6db48f50356 100644 --- a/src/llmq/signing_shares.h +++ b/src/llmq/signing_shares.h @@ -26,6 +26,7 @@ #include #include #include +#include class CActiveMasternodeManager; class CNode; @@ -380,6 +381,10 @@ class CSigSharesManager : public CRecoveredSigsListener std::thread workThread; CThreadInterrupt workInterrupt; + // Event to wake the worker thread when new work arrives + Mutex workMutex; + std::condition_variable_any workCv; + std::atomic workEpoch{0}; SigShareMap sigShares GUARDED_BY(cs); Uint256HashMap signedSessions GUARDED_BY(cs); @@ -489,8 +494,9 @@ class CSigSharesManager : public CRecoveredSigsListener void CollectSigSharesToSendConcentrated(std::unordered_map>& sigSharesToSend, const std::vector& vNodes) EXCLUSIVE_LOCKS_REQUIRED(cs); void CollectSigSharesToAnnounce(std::unordered_map>& sigSharesToAnnounce) EXCLUSIVE_LOCKS_REQUIRED(cs); - void SignPendingSigShares() EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns); - void WorkThreadMain() EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns); + void SignPendingSigShares(); + void WorkThreadMain(); + void NotifyWorker(); }; } // namespace llmq From f67cd9abaa1358fb36f65912874a3ad63dbca3a2 Mon Sep 17 00:00:00 2001 From: pasta Date: Thu, 16 Oct 2025 09:00:39 -0500 Subject: [PATCH 03/14] refactor: clean up NotifyWorker function formatting in CInstantSendManager Adjusted the formatting of the NotifyWorker function in CInstantSendManager for improved readability. This change includes consistent spacing and indentation, enhancing code maintainability without altering functionality. --- src/instantsend/instantsend.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/instantsend/instantsend.h b/src/instantsend/instantsend.h index f7dea1f5895e..d2484e4038f9 100644 --- a/src/instantsend/instantsend.h +++ b/src/instantsend/instantsend.h @@ -143,9 +143,9 @@ class CInstantSendManager final : public instantsend::InstantSendSignerParent void WorkThreadMain(PeerManager& peerman) EXCLUSIVE_LOCKS_REQUIRED(!cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry); - void NotifyWorker() { + void NotifyWorker() { workEpoch.fetch_add(1, std::memory_order_acq_rel); - workCv.notify_one(); + workCv.notify_one(); } void HandleFullyConfirmedBlock(const CBlockIndex* pindex) From bb9c1e94489302230d418586f9da23bd7929ebdb Mon Sep 17 00:00:00 2001 From: pasta Date: Thu, 16 Oct 2025 09:01:23 -0500 Subject: [PATCH 04/14] refactor: update locking requirements for WorkThreadMain and SignPendingSigShares Modified the WorkThreadMain function in signing.h and SignPendingSigShares in signing_shares.h to specify exclusive lock requirements. This change enhances thread safety by ensuring proper locking mechanisms are in place, preventing potential race conditions during concurrent operations. --- src/llmq/signing.h | 2 +- src/llmq/signing_shares.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/llmq/signing.h b/src/llmq/signing.h index 7496d329ed15..2a8a7417b053 100644 --- a/src/llmq/signing.h +++ b/src/llmq/signing.h @@ -247,7 +247,7 @@ class CSigningManager Mutex workMutex; std::condition_variable_any workCv; std::atomic workEpoch{0}; - void WorkThreadMain(PeerManager& peerman); + void WorkThreadMain(PeerManager& peerman) EXCLUSIVE_LOCKS_REQUIRED(!cs_pending, !cs_listeners); public: void StartWorkerThread(PeerManager& peerman); diff --git a/src/llmq/signing_shares.h b/src/llmq/signing_shares.h index a6db48f50356..90c856494a03 100644 --- a/src/llmq/signing_shares.h +++ b/src/llmq/signing_shares.h @@ -494,7 +494,7 @@ class CSigSharesManager : public CRecoveredSigsListener void CollectSigSharesToSendConcentrated(std::unordered_map>& sigSharesToSend, const std::vector& vNodes) EXCLUSIVE_LOCKS_REQUIRED(cs); void CollectSigSharesToAnnounce(std::unordered_map>& sigSharesToAnnounce) EXCLUSIVE_LOCKS_REQUIRED(cs); - void SignPendingSigShares(); + void SignPendingSigShares() EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns); void WorkThreadMain(); void NotifyWorker(); }; From 8364978f62c26cabada461177807b635599c34b1 Mon Sep 17 00:00:00 2001 From: pasta Date: Thu, 16 Oct 2025 13:43:08 -0500 Subject: [PATCH 05/14] refactor: update locking requirements for WorkThreadMain in signing_shares.h Added exclusive lock requirements to the WorkThreadMain function in signing_shares.h to enhance thread safety. This change ensures that the function operates correctly under concurrent conditions, preventing potential race conditions. --- src/llmq/signing_shares.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/llmq/signing_shares.h b/src/llmq/signing_shares.h index 90c856494a03..ee863eade28d 100644 --- a/src/llmq/signing_shares.h +++ b/src/llmq/signing_shares.h @@ -495,7 +495,7 @@ class CSigSharesManager : public CRecoveredSigsListener void CollectSigSharesToAnnounce(std::unordered_map>& sigSharesToAnnounce) EXCLUSIVE_LOCKS_REQUIRED(cs); void SignPendingSigShares() EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns); - void WorkThreadMain(); + void WorkThreadMain() EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns); void NotifyWorker(); }; } // namespace llmq From 5fcc040bff67d5164214d5936f04fdd4677e447f Mon Sep 17 00:00:00 2001 From: pasta Date: Fri, 17 Oct 2025 17:40:54 -0500 Subject: [PATCH 06/14] refactor: replace pair with structured PendingISLockFromPeer in CInstantSendManager Updated the CInstantSendManager to use a new struct, PendingISLockFromPeer, for better clarity and type safety. This change replaces the use of std::pair for storing node ID and InstantSendLockPtr, enhancing code readability and maintainability across multiple functions handling instant send locks. --- src/instantsend/instantsend.cpp | 28 ++++++++++++++-------------- src/instantsend/instantsend.h | 11 ++++++++--- 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/src/instantsend/instantsend.cpp b/src/instantsend/instantsend.cpp index fc3af2221849..9ec873d28821 100644 --- a/src/instantsend/instantsend.cpp +++ b/src/instantsend/instantsend.cpp @@ -168,7 +168,7 @@ MessageProcessingResult CInstantSendManager::ProcessMessage(NodeId from, std::st } LOCK(cs_pendingLocks); - pendingInstantSendLocks.emplace(hash, std::make_pair(from, islock)); + pendingInstantSendLocks.emplace(hash, instantsend::PendingISLockFromPeer{from, islock}); NotifyWorker(); return ret; } @@ -240,7 +240,7 @@ instantsend::PendingState CInstantSendManager::ProcessPendingInstantSendLocks() Uint256HashSet CInstantSendManager::ProcessPendingInstantSendLocks( const Consensus::LLMQParams& llmq_params, int signOffset, bool ban, - const Uint256HashMap>& pend, + const Uint256HashMap& pend, std::vector>& peer_activity) { CBLSBatchVerifier batchVerifier(false, true, 8); @@ -250,8 +250,8 @@ Uint256HashSet CInstantSendManager::ProcessPendingInstantSendLocks( size_t alreadyVerified = 0; for (const auto& p : pend) { const auto& hash = p.first; - auto nodeId = p.second.first; - const auto& islock = p.second.second; + auto nodeId = p.second.node_id; + const auto& islock = p.second.islock; if (batchVerifier.badSources.count(nodeId)) { continue; @@ -322,8 +322,8 @@ Uint256HashSet CInstantSendManager::ProcessPendingInstantSendLocks( } for (const auto& p : pend) { const auto& hash = p.first; - auto nodeId = p.second.first; - const auto& islock = p.second.second; + auto nodeId = p.second.node_id; + const auto& islock = p.second.islock; if (batchVerifier.badMessages.count(hash)) { LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s: invalid sig in islock, peer=%d\n", @@ -400,7 +400,7 @@ MessageProcessingResult CInstantSendManager::ProcessInstantSendLock(NodeId from, } else { // put it in a separate pending map and try again later LOCK(cs_pendingLocks); - pendingNoTxInstantSendLocks.try_emplace(hash, std::make_pair(from, islock)); + pendingNoTxInstantSendLocks.try_emplace(hash, instantsend::PendingISLockFromPeer{from, islock}); } // This will also add children TXs to pendingRetryTxs @@ -443,11 +443,11 @@ void CInstantSendManager::TransactionAddedToMempool(const CTransactionRef& tx) LOCK(cs_pendingLocks); auto it = pendingNoTxInstantSendLocks.begin(); while (it != pendingNoTxInstantSendLocks.end()) { - if (it->second.second->txid == tx->GetHash()) { + if (it->second.islock->txid == tx->GetHash()) { // we received an islock earlier LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s\n", __func__, tx->GetHash().ToString(), it->first.ToString()); - islock = it->second.second; + islock = it->second.islock; pendingInstantSendLocks.try_emplace(it->first, it->second); pendingNoTxInstantSendLocks.erase(it); break; @@ -544,7 +544,7 @@ void CInstantSendManager::AddNonLockedTx(const CTransactionRef& tx, const CBlock LOCK(cs_pendingLocks); auto it = pendingNoTxInstantSendLocks.begin(); while (it != pendingNoTxInstantSendLocks.end()) { - if (it->second.second->txid == tx->GetHash()) { + if (it->second.islock->txid == tx->GetHash()) { // we received an islock earlier, let's put it back into pending and verify/lock LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s\n", __func__, tx->GetHash().ToString(), it->first.ToString()); @@ -634,7 +634,7 @@ void CInstantSendManager::TryEmplacePendingLock(const uint256& hash, const NodeI if (db.KnownInstantSendLock(hash)) return; LOCK(cs_pendingLocks); if (!pendingInstantSendLocks.count(hash)) { - pendingInstantSendLocks.emplace(hash, std::make_pair(id, islock)); + pendingInstantSendLocks.emplace(hash, instantsend::PendingISLockFromPeer{id, islock}); } NotifyWorker(); } @@ -859,11 +859,11 @@ bool CInstantSendManager::GetInstantSendLockByHash(const uint256& hash, instants LOCK(cs_pendingLocks); auto it = pendingInstantSendLocks.find(hash); if (it != pendingInstantSendLocks.end()) { - islock = it->second.second; + islock = it->second.islock; } else { auto itNoTx = pendingNoTxInstantSendLocks.find(hash); if (itNoTx != pendingNoTxInstantSendLocks.end()) { - islock = itNoTx->second.second; + islock = itNoTx->second.islock; } else { return false; } @@ -900,7 +900,7 @@ bool CInstantSendManager::IsWaitingForTx(const uint256& txHash) const LOCK(cs_pendingLocks); auto it = pendingNoTxInstantSendLocks.begin(); while (it != pendingNoTxInstantSendLocks.end()) { - if (it->second.second->txid == txHash) { + if (it->second.islock->txid == txHash) { LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s\n", __func__, txHash.ToString(), it->first.ToString()); return true; diff --git a/src/instantsend/instantsend.h b/src/instantsend/instantsend.h index d2484e4038f9..a7dbd0b0acdc 100644 --- a/src/instantsend/instantsend.h +++ b/src/instantsend/instantsend.h @@ -38,6 +38,11 @@ struct DbWrapperParams; namespace instantsend { class InstantSendSigner; +struct PendingISLockFromPeer { + NodeId node_id; + InstantSendLockPtr islock; +}; + struct PendingState { bool m_pending_work{false}; std::vector> m_peer_activity{}; @@ -73,9 +78,9 @@ class CInstantSendManager final : public instantsend::InstantSendSignerParent mutable Mutex cs_pendingLocks; // Incoming and not verified yet - Uint256HashMap> pendingInstantSendLocks GUARDED_BY(cs_pendingLocks); + Uint256HashMap pendingInstantSendLocks GUARDED_BY(cs_pendingLocks); // Tried to verify but there is no tx yet - Uint256HashMap> pendingNoTxInstantSendLocks GUARDED_BY(cs_pendingLocks); + Uint256HashMap pendingNoTxInstantSendLocks GUARDED_BY(cs_pendingLocks); // TXs which are neither IS locked nor ChainLocked. We use this to determine for which TXs we need to retry IS // locking of child TXs @@ -121,7 +126,7 @@ class CInstantSendManager final : public instantsend::InstantSendSignerParent EXCLUSIVE_LOCKS_REQUIRED(!cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry); Uint256HashSet ProcessPendingInstantSendLocks(const Consensus::LLMQParams& llmq_params, int signOffset, bool ban, - const Uint256HashMap>& pend, + const Uint256HashMap& pend, std::vector>& peer_activity) EXCLUSIVE_LOCKS_REQUIRED(!cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry); MessageProcessingResult ProcessInstantSendLock(NodeId from, const uint256& hash, From 6437404a67f46f7e3657e4a7568d0c838d6385de Mon Sep 17 00:00:00 2001 From: pasta Date: Fri, 17 Oct 2025 17:54:17 -0500 Subject: [PATCH 07/14] perf: use vector instead of hash map for ProcessPendingInstantSendLocks The 'pend' local variable in ProcessPendingInstantSendLocks was previously using the same data structure as pendingInstantSendLocks (a hash map). However, once we're in the processing step, we only iterate sequentially through the locks - there are no hash-based lookups. This commit changes 'pend' to use std::vector for better performance: - Improved cache locality with contiguous memory layout - Better CPU prefetching during iteration (3x through the data) - Eliminates hash map overhead (bucket allocation, pointer chasing) - Filtering step uses build-new-vector approach to maintain O(n) The typical case processes up to 32 locks, making the vector's sequential access pattern ideal for modern CPU cache hierarchies. --- src/instantsend/instantsend.cpp | 22 ++++++++++++---------- src/instantsend/instantsend.h | 3 ++- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/src/instantsend/instantsend.cpp b/src/instantsend/instantsend.cpp index 9ec873d28821..b0558392e863 100644 --- a/src/instantsend/instantsend.cpp +++ b/src/instantsend/instantsend.cpp @@ -175,7 +175,7 @@ MessageProcessingResult CInstantSendManager::ProcessMessage(NodeId from, std::st instantsend::PendingState CInstantSendManager::ProcessPendingInstantSendLocks() { - decltype(pendingInstantSendLocks) pend; + std::vector> pend; instantsend::PendingState ret; if (!IsInstantSendEnabled()) { @@ -190,6 +190,7 @@ instantsend::PendingState CInstantSendManager::ProcessPendingInstantSendLocks() // The keys of the removed values are temporaily stored here to avoid invalidating an iterator std::vector removed; removed.reserve(maxCount); + pend.reserve(maxCount); for (const auto& [islockHash, nodeid_islptr_pair] : pendingInstantSendLocks) { // Check if we've reached max count @@ -197,7 +198,7 @@ instantsend::PendingState CInstantSendManager::ProcessPendingInstantSendLocks() ret.m_pending_work = true; break; } - pend.emplace(islockHash, std::move(nodeid_islptr_pair)); + pend.emplace_back(islockHash, std::move(nodeid_islptr_pair)); removed.emplace_back(islockHash); } @@ -223,16 +224,17 @@ instantsend::PendingState CInstantSendManager::ProcessPendingInstantSendLocks() if (!badISLocks.empty()) { LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- doing verification on old active set\n", __func__); - // filter out valid IS locks from "pend" - for (auto it = pend.begin(); it != pend.end();) { - if (!badISLocks.count(it->first)) { - it = pend.erase(it); - } else { - ++it; + // filter out valid IS locks from "pend" - keep only bad ones + std::vector> filteredPend; + filteredPend.reserve(badISLocks.size()); + for (auto& p : pend) { + if (badISLocks.contains(p.first)) { + filteredPend.push_back(std::move(p)); } } + // Now check against the previous active set and perform banning if this fails - ProcessPendingInstantSendLocks(llmq_params, dkgInterval, /*ban=*/true, pend, ret.m_peer_activity); + ProcessPendingInstantSendLocks(llmq_params, dkgInterval, /*ban=*/true, filteredPend, ret.m_peer_activity); } return ret; @@ -240,7 +242,7 @@ instantsend::PendingState CInstantSendManager::ProcessPendingInstantSendLocks() Uint256HashSet CInstantSendManager::ProcessPendingInstantSendLocks( const Consensus::LLMQParams& llmq_params, int signOffset, bool ban, - const Uint256HashMap& pend, + const std::vector>& pend, std::vector>& peer_activity) { CBLSBatchVerifier batchVerifier(false, true, 8); diff --git a/src/instantsend/instantsend.h b/src/instantsend/instantsend.h index a7dbd0b0acdc..47de7c51cb63 100644 --- a/src/instantsend/instantsend.h +++ b/src/instantsend/instantsend.h @@ -20,6 +20,7 @@ #include #include #include +#include class CBlockIndex; class CChainState; @@ -126,7 +127,7 @@ class CInstantSendManager final : public instantsend::InstantSendSignerParent EXCLUSIVE_LOCKS_REQUIRED(!cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry); Uint256HashSet ProcessPendingInstantSendLocks(const Consensus::LLMQParams& llmq_params, int signOffset, bool ban, - const Uint256HashMap& pend, + const std::vector>& pend, std::vector>& peer_activity) EXCLUSIVE_LOCKS_REQUIRED(!cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry); MessageProcessingResult ProcessInstantSendLock(NodeId from, const uint256& hash, From a955485ce4b491a438038d1af8104d979b89f79d Mon Sep 17 00:00:00 2001 From: pasta Date: Fri, 17 Oct 2025 18:09:56 -0500 Subject: [PATCH 08/14] fix: separate mocked time from steady_clock in worker threads Fixes time-mixing bugs where mocked time (controllable in tests) was being used to compute steady_clock deadlines. Since mocked time and system time move independently, this caused incorrect wait behavior in tests. Changes: - Use steady_clock::time_point for all wait deadlines (nextAttemptTime, lastCleanupTimeSteady) - Keep mocked time (GetTime<>()) for business logic only (timeouts, session tracking) - Remove redundant workEpoch increments (NotifyWorker already does this) - Move NotifyWorker() calls to individual message handlers for better control This ensures that: 1. In production: steady_clock provides monotonic, reliable timing 2. In tests: mocked time controls business logic while steady_clock handles waits 3. No double-incrementing of workEpoch that could cause busy-wait issues --- src/llmq/signing.cpp | 37 +++++++++++------------------ src/llmq/signing.h | 1 + src/llmq/signing_shares.cpp | 46 +++++++++++++++++-------------------- src/llmq/signing_shares.h | 3 ++- 4 files changed, 38 insertions(+), 49 deletions(-) diff --git a/src/llmq/signing.cpp b/src/llmq/signing.cpp index 76b028deb2e5..ae957f86616b 100644 --- a/src/llmq/signing.cpp +++ b/src/llmq/signing.cpp @@ -432,7 +432,6 @@ MessageProcessingResult CSigningManager::ProcessMessage(NodeId from, std::string } pendingRecoveredSigs[from].emplace_back(recoveredSig); - workEpoch.fetch_add(1, std::memory_order_acq_rel); NotifyWorker(); return ret; } @@ -524,13 +523,9 @@ bool CSigningManager::ProcessPendingRecoveredSigs(PeerManager& peerman) const size_t nMaxBatchSize{32}; CollectPendingRecoveredSigsToVerify(nMaxBatchSize, recSigsByNode, quorums); if (recSigsByNode.empty()) { - // Check if reconstructed queue has work pending so the caller can keep looping - { - LOCK(cs_pending); - if (!pendingReconstructedRecoveredSigs.empty() || !pendingRecoveredSigs.empty()) { - return true; - } - } + // No work in this batch. Don't proactively check queues for work that may have been + // added by listeners during processing, as this causes busy-wait when combined with + // epoch changes. External threads will call NotifyWorker() to wake us if needed. return false; } @@ -583,13 +578,10 @@ bool CSigningManager::ProcessPendingRecoveredSigs(PeerManager& peerman) } } - // If we still have pending items in queues, report more work to avoid sleeping - bool more_in_queues = false; - { - LOCK(cs_pending); - more_in_queues = !pendingReconstructedRecoveredSigs.empty() || !pendingRecoveredSigs.empty(); - } - return more_in_queues || recSigsByNode.size() >= nMaxBatchSize; + // Only report more work if we processed a full batch, indicating there's likely more + // work from the original collection. Don't check queues for work added by listeners + // during processing, as that would cause busy-wait with epoch-based wake conditions. + return recSigsByNode.size() >= nMaxBatchSize; } // signature must be verified already @@ -637,15 +629,14 @@ void CSigningManager::ProcessRecoveredSig(const std::shared_ptrGetHash().ToString()); - workEpoch.fetch_add(1, std::memory_order_acq_rel); - NotifyWorker(); + // Note: Don't call NotifyWorker() here as this function is called by the worker thread itself + // NotifyWorker() is only needed when external threads add work } void CSigningManager::PushReconstructedRecoveredSig(const std::shared_ptr& recoveredSig) { LOCK(cs_pending); pendingReconstructedRecoveredSigs.emplace(std::piecewise_construct, std::forward_as_tuple(recoveredSig->GetHash()), std::forward_as_tuple(recoveredSig)); - workEpoch.fetch_add(1, std::memory_order_acq_rel); NotifyWorker(); } @@ -667,6 +658,7 @@ void CSigningManager::Cleanup() db.CleanupOldVotes(maxAge); lastCleanupTime = TicksSinceEpoch(SystemClock::now()); + lastCleanupTimeSteady = std::chrono::steady_clock::now(); } void CSigningManager::RegisterRecoveredSigsListener(CRecoveredSigsListener* l) @@ -768,11 +760,10 @@ void CSigningManager::WorkThreadMain(PeerManager& peerman) // new work arrives or the deadline is reached. auto next_deadline = std::chrono::steady_clock::time_point::max(); { - int64_t now_ms = TicksSinceEpoch(SystemClock::now()); - int64_t target_ms = lastCleanupTime + 5000; - if (target_ms > now_ms) { - auto delta_ms = target_ms - now_ms; - next_deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(delta_ms); + auto now_steady = std::chrono::steady_clock::now(); + auto next_cleanup = lastCleanupTimeSteady + std::chrono::milliseconds(5000); + if (next_cleanup > now_steady) { + next_deadline = next_cleanup; } } if (next_deadline == std::chrono::steady_clock::time_point::max()) { diff --git a/src/llmq/signing.h b/src/llmq/signing.h index 2a8a7417b053..2e75add8d62e 100644 --- a/src/llmq/signing.h +++ b/src/llmq/signing.h @@ -180,6 +180,7 @@ class CSigningManager FastRandomContext rnd GUARDED_BY(cs_pending); int64_t lastCleanupTime{0}; + std::chrono::steady_clock::time_point lastCleanupTimeSteady{}; mutable Mutex cs_listeners; std::vector recoveredSigsListeners GUARDED_BY(cs_listeners); diff --git a/src/llmq/signing_shares.cpp b/src/llmq/signing_shares.cpp index 9f5410f395db..79e7bb3e16e6 100644 --- a/src/llmq/signing_shares.cpp +++ b/src/llmq/signing_shares.cpp @@ -253,6 +253,7 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, const std::string& ms for (const auto& sigShare : receivedSigShares) { ProcessMessageSigShare(pfrom.GetId(), sigShare); } + NotifyWorker(); } if (msg_type == NetMsgType::QSIGSESANN) { @@ -268,6 +269,7 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, const std::string& ms BanNode(pfrom.GetId()); return; } + NotifyWorker(); } else if (msg_type == NetMsgType::QSIGSHARESINV) { std::vector msgs; vRecv >> msgs; @@ -281,6 +283,7 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, const std::string& ms BanNode(pfrom.GetId()); return; } + NotifyWorker(); } else if (msg_type == NetMsgType::QGETSIGSHARES) { std::vector msgs; vRecv >> msgs; @@ -294,6 +297,7 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, const std::string& ms BanNode(pfrom.GetId()); return; } + NotifyWorker(); } else if (msg_type == NetMsgType::QBSIGSHARES) { std::vector msgs; vRecv >> msgs; @@ -311,10 +315,8 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, const std::string& ms BanNode(pfrom.GetId()); return; } + NotifyWorker(); } - - // New inbound messages can create work (requests, shares, announcements) - NotifyWorker(); } bool CSigSharesManager::ProcessMessageSigSesAnn(const CNode& pfrom, const CSigSesAnn& ann) @@ -743,7 +745,6 @@ void CSigSharesManager::ProcessSigShare(const CSigShare& sigShare, const CQuorum return; } - bool queued_announce = false; { LOCK(cs); @@ -752,7 +753,6 @@ void CSigSharesManager::ProcessSigShare(const CSigShare& sigShare, const CQuorum } if (!isAllMembersConnectedEnabled) { sigSharesQueuedToAnnounce.Add(sigShare.GetKey(), true); - queued_announce = true; } // Update the time we've seen the last sigShare @@ -774,9 +774,9 @@ void CSigSharesManager::ProcessSigShare(const CSigShare& sigShare, const CQuorum } } - if (queued_announce) { - NotifyWorker(); - } + // Note: Don't call NotifyWorker() here even when queued_announce is true + // When called from worker thread: SendMessages() will handle announcements in same iteration + // When called from external thread: ProcessMessage() already calls NotifyWorker() (line 303) if (canTryRecovery) { TryRecoverSig(*quorum, sigShare.getId(), sigShare.getMsgHash()); @@ -1124,7 +1124,7 @@ void CSigSharesManager::CollectSigSharesToSendConcentrated(std::unordered_map().count(); + auto curTime = std::chrono::steady_clock::now(); for (auto& [_, signedSession] : signedSessions) { if (!IsAllMembersConnectedEnabled(signedSession.quorum->params.type, m_sporkman)) { @@ -1138,7 +1138,7 @@ void CSigSharesManager::CollectSigSharesToSendConcentrated(std::unordered_map= signedSession.nextAttemptTime) { int64_t waitTime = exp2(signedSession.attempt) * EXP_SEND_FOR_RECOVERY_TIMEOUT; waitTime = std::min(MAX_SEND_FOR_RECOVERY_TIMEOUT, waitTime); - signedSession.nextAttemptTime = curTime + waitTime; + signedSession.nextAttemptTime = curTime + std::chrono::milliseconds(waitTime); auto dmn = SelectMemberForRecovery(*signedSession.quorum, signedSession.sigShare.getId(), signedSession.attempt); signedSession.attempt++; @@ -1522,6 +1522,7 @@ void CSigSharesManager::Cleanup() } lastCleanupTime = GetTime().count(); + lastCleanupTimeSteady = std::chrono::steady_clock::now(); } void CSigSharesManager::RemoveSigSharesForSession(const uint256& signHash) @@ -1617,24 +1618,19 @@ void CSigSharesManager::WorkThreadMain() auto next_deadline = std::chrono::steady_clock::time_point::max(); // Respect cleanup cadence (~5s) even when idle { - auto now_tp = std::chrono::steady_clock::now(); - int64_t now_s = GetTime().count(); - int64_t target_s = lastCleanupTime + 5; - if (target_s > now_s) { - auto delta_ms = (target_s - now_s) * 1000; - auto cand = now_tp + std::chrono::milliseconds(delta_ms); - if (cand < next_deadline) next_deadline = cand; + auto now_steady = std::chrono::steady_clock::now(); + auto next_cleanup = lastCleanupTimeSteady + std::chrono::seconds(5); + if (next_cleanup > now_steady) { + if (next_cleanup < next_deadline) next_deadline = next_cleanup; } } { // Consider next recovery attempt times for signed sessions to avoid polling LOCK(cs); - int64_t cur_ms = TicksSinceEpoch(SystemClock::now()); + auto now_steady = std::chrono::steady_clock::now(); for (const auto& [_, s] : signedSessions) { - if (s.nextAttemptTime > cur_ms) { - auto d = s.nextAttemptTime - cur_ms; - auto cand = std::chrono::steady_clock::now() + std::chrono::milliseconds(d); - if (cand < next_deadline) next_deadline = cand; + if (s.nextAttemptTime > now_steady) { + if (s.nextAttemptTime < next_deadline) next_deadline = s.nextAttemptTime; } } } @@ -1679,13 +1675,13 @@ void CSigSharesManager::SignPendingSigShares() auto& session = signedSessions[sigShare.GetSignHash()]; session.sigShare = sigShare; session.quorum = pQuorum; - session.nextAttemptTime = 0; + session.nextAttemptTime = std::chrono::steady_clock::time_point{}; session.attempt = 0; } } } - // New sig shares or recovery attempts may be available - NotifyWorker(); + // Note: Don't call NotifyWorker() here as this function is called by the worker thread itself + // NotifyWorker() is only needed when external threads add work } std::optional CSigSharesManager::CreateSigShare(const CQuorum& quorum, const uint256& id, const uint256& msgHash) const diff --git a/src/llmq/signing_shares.h b/src/llmq/signing_shares.h index ee863eade28d..ad9af6f0e352 100644 --- a/src/llmq/signing_shares.h +++ b/src/llmq/signing_shares.h @@ -356,7 +356,7 @@ class CSignedSession CSigShare sigShare; CQuorumCPtr quorum; - int64_t nextAttemptTime{0}; + std::chrono::steady_clock::time_point nextAttemptTime{}; int attempt{0}; }; @@ -418,6 +418,7 @@ class CSigSharesManager : public CRecoveredSigsListener const CSporkManager& m_sporkman; int64_t lastCleanupTime{0}; + std::chrono::steady_clock::time_point lastCleanupTimeSteady{}; std::atomic recoveredSigsCounter{0}; public: From 07360f697eed77f05c2156b4984444c5fef803f9 Mon Sep 17 00:00:00 2001 From: pasta Date: Fri, 17 Oct 2025 18:50:15 -0500 Subject: [PATCH 09/14] refactor: optimize GetOrAdd method in signing_shares.h Replaced the previous implementation of the GetOrAdd method with a more efficient approach using emplace. This change simplifies the logic by directly attempting to insert a new entry into the internal map, improving performance and readability while maintaining the same functionality. --- src/llmq/signing_shares.h | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/llmq/signing_shares.h b/src/llmq/signing_shares.h index ad9af6f0e352..8b6827d30afe 100644 --- a/src/llmq/signing_shares.h +++ b/src/llmq/signing_shares.h @@ -211,12 +211,9 @@ class SigShareMap T& GetOrAdd(const SigShareKey& k) { - T* v = Get(k); - if (!v) { - Add(k, T()); - v = Get(k); - } - return *v; + auto& m = internalMap[k.first]; // Get or create outer map entry + auto result = m.emplace(k.second, T()); // Try to insert, returns pair + return result.first->second; // Return reference to the value (new or existing) } const T* GetFirst() const From 79a0c423a0cc690f41ad731b01dd4176cc6ab33e Mon Sep 17 00:00:00 2001 From: pasta Date: Sat, 18 Oct 2025 11:22:15 -0500 Subject: [PATCH 10/14] refactor: simplify loop variable unpacking in bls_batchverifier.h --- src/bls/bls_batchverifier.h | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/bls/bls_batchverifier.h b/src/bls/bls_batchverifier.h index a455b30d8d5f..e22bbf8d8240 100644 --- a/src/bls/bls_batchverifier.h +++ b/src/bls/bls_batchverifier.h @@ -82,27 +82,27 @@ class CBLSBatchVerifier } // revert to per-source verification - for (const auto& p : messagesBySource) { + for (const auto& [from, message_map] : messagesBySource) { bool batchValid = false; // no need to verify it again if there was just one source if (messagesBySource.size() != 1) { byMessageHash.clear(); - for (auto it = p.second.begin(); it != p.second.end(); ++it) { + for (auto it = message_map.begin(); it != message_map.end(); ++it) { byMessageHash[(*it)->second.msgHash].emplace_back(*it); } batchValid = VerifyBatch(byMessageHash); } if (!batchValid) { - badSources.emplace(p.first); + badSources.emplace(from); if (perMessageFallback) { // revert to per-message verification - if (p.second.size() == 1) { + if (message_map.size() == 1) { // no need to re-verify a single message - badMessages.emplace(p.second[0]->second.msgId); + badMessages.emplace(message_map[0]->second.msgId); } else { - for (const auto& msgIt : p.second) { + for (const auto& msgIt : message_map) { if (badMessages.count(msgIt->first)) { // same message might be invalid from different source, so no need to re-verify it continue; From b2a79b501a7f1488ab443280a87a2f0d3e8edb56 Mon Sep 17 00:00:00 2001 From: pasta Date: Sat, 18 Oct 2025 11:23:28 -0500 Subject: [PATCH 11/14] perf: avoid redundant signature validation in signing shares processing --- src/llmq/signing_shares.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/llmq/signing_shares.cpp b/src/llmq/signing_shares.cpp index 79e7bb3e16e6..5b188030bb5a 100644 --- a/src/llmq/signing_shares.cpp +++ b/src/llmq/signing_shares.cpp @@ -665,9 +665,12 @@ bool CSigSharesManager::ProcessPendingSigShares() continue; } + // Materialize the signature once. Get() internally validates, so if it returns an invalid signature, + // we know it's malformed. This avoids calling Get() twice (once for IsValid(), once for PushMessage). + CBLSSignature sig = sigShare.sigShare.Get(); // we didn't check this earlier because we use a lazy BLS signature and tried to avoid doing the expensive // deserialization in the message thread - if (!sigShare.sigShare.Get().IsValid()) { + if (!sig.IsValid()) { BanNode(nodeId); // don't process any additional shares from this node break; @@ -683,7 +686,7 @@ bool CSigSharesManager::ProcessPendingSigShares() assert(false); } - batchVerifier.PushMessage(nodeId, sigShare.GetKey(), sigShare.GetSignHash(), sigShare.sigShare.Get(), pubKeyShare); + batchVerifier.PushMessage(nodeId, sigShare.GetKey(), sigShare.GetSignHash(), sig, pubKeyShare); verifyCount++; } } From 615ed9fd450f0ab22f4c79af2cfc9e6e159b437c Mon Sep 17 00:00:00 2001 From: pasta Date: Sat, 18 Oct 2025 11:24:09 -0500 Subject: [PATCH 12/14] perf: improve signing latency by collecting lazy signatures under lock --- src/llmq/signing_shares.cpp | 53 +++++++++++++++++++++++++------------ 1 file changed, 36 insertions(+), 17 deletions(-) diff --git a/src/llmq/signing_shares.cpp b/src/llmq/signing_shares.cpp index 5b188030bb5a..afd5fefb9866 100644 --- a/src/llmq/signing_shares.cpp +++ b/src/llmq/signing_shares.cpp @@ -792,8 +792,11 @@ void CSigSharesManager::TryRecoverSig(const CQuorum& quorum, const uint256& id, return; } - std::vector sigSharesForRecovery; + // Collect lazy signatures (cheap copy) under lock, then materialize outside lock + std::vector lazySignatures; std::vector idsForRecovery; + bool isSingleNode = false; + { LOCK(cs); @@ -812,28 +815,44 @@ void CSigSharesManager::TryRecoverSig(const CQuorum& quorum, const uint256& id, return; } const auto& sigShare = sigSharesForSignHash->begin()->second; - CBLSSignature recoveredSig = sigShare.sigShare.Get(); + // Copy the lazy signature (cheap), materialize later outside lock + lazySignatures.emplace_back(sigShare.sigShare); + isSingleNode = true; LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- recover single-node signature. id=%s, msgHash=%s\n", __func__, id.ToString(), msgHash.ToString()); + } else { + // Collect lazy signatures and IDs under lock (cheap operations) + lazySignatures.reserve((size_t) quorum.params.threshold); + idsForRecovery.reserve((size_t) quorum.params.threshold); + for (auto it = sigSharesForSignHash->begin(); + it != sigSharesForSignHash->end() && lazySignatures.size() < size_t(quorum.params.threshold); + ++it) { + const auto& sigShare = it->second; + lazySignatures.emplace_back(sigShare.sigShare); // Cheap copy of lazy wrapper + idsForRecovery.emplace_back(quorum.members[sigShare.getQuorumMember()]->proTxHash); + } - auto rs = std::make_shared(quorum.params.type, quorum.qc->quorumHash, id, msgHash, - recoveredSig); - sigman.ProcessRecoveredSig(rs, m_peerman); - return; // end of single-quorum processing + // check if we can recover the final signature + if (lazySignatures.size() < size_t(quorum.params.threshold)) { + return; + } } + } // Release lock before expensive materialization - sigSharesForRecovery.reserve((size_t) quorum.params.threshold); - idsForRecovery.reserve((size_t) quorum.params.threshold); - for (auto it = sigSharesForSignHash->begin(); it != sigSharesForSignHash->end() && sigSharesForRecovery.size() < size_t(quorum.params.threshold); ++it) { - const auto& sigShare = it->second; - sigSharesForRecovery.emplace_back(sigShare.sigShare.Get()); - idsForRecovery.emplace_back(quorum.members[sigShare.getQuorumMember()]->proTxHash); - } + // Materialize signatures outside the critical section (expensive BLS operations) + if (isSingleNode) { + CBLSSignature recoveredSig = lazySignatures[0].Get(); + auto rs = std::make_shared(quorum.params.type, quorum.qc->quorumHash, id, msgHash, + recoveredSig); + sigman.ProcessRecoveredSig(rs, m_peerman); + return; // end of single-quorum processing + } - // check if we can recover the final signature - if (sigSharesForRecovery.size() < size_t(quorum.params.threshold)) { - return; - } + // Multi-node case: materialize all signatures + std::vector sigSharesForRecovery; + sigSharesForRecovery.reserve(lazySignatures.size()); + for (const auto& lazySig : lazySignatures) { + sigSharesForRecovery.emplace_back(lazySig.Get()); // Expensive, but outside lock } // now recover it From 7155376546376d68c6e531fcba87b41b8a9bf9aa Mon Sep 17 00:00:00 2001 From: pasta Date: Mon, 20 Oct 2025 11:23:17 -0500 Subject: [PATCH 13/14] perf: cache block tip pointer to reduce cs_main lock contention in TrySignChainTip --- src/chainlock/chainlock.cpp | 6 ++++++ src/chainlock/signing.cpp | 10 +++++++++- src/chainlock/signing.h | 6 ++++++ 3 files changed, 21 insertions(+), 1 deletion(-) diff --git a/src/chainlock/chainlock.cpp b/src/chainlock/chainlock.cpp index 39154d30d2d2..5ef9f9d344fc 100644 --- a/src/chainlock/chainlock.cpp +++ b/src/chainlock/chainlock.cpp @@ -217,6 +217,12 @@ void CChainLocksHandler::AcceptedBlockHeader(gsl::not_null p void CChainLocksHandler::UpdatedBlockTip(const llmq::CInstantSendManager& isman) { + // Update the cached tip in the signer before scheduling + const CBlockIndex* pindexNew = WITH_LOCK(::cs_main, return m_chainstate.m_chain.Tip()); + if (auto signer = m_signer.load(std::memory_order_acquire); signer && pindexNew) { + signer->UpdatedBlockTip(pindexNew); + } + // don't call TrySignChainTip directly but instead let the scheduler call it. This way we ensure that cs_main is // never locked and TrySignChainTip is not called twice in parallel. Also avoids recursive calls due to // EnforceBestChainLock switching chains. diff --git a/src/chainlock/signing.cpp b/src/chainlock/signing.cpp index 9b574d5cc6ee..f93477ac04be 100644 --- a/src/chainlock/signing.cpp +++ b/src/chainlock/signing.cpp @@ -26,6 +26,9 @@ ChainLockSigner::ChainLockSigner(CChainState& chainstate, ChainLockSignerParent& m_sporkman{sporkman}, m_mn_sync{mn_sync} { + // Initialize cached tip pointer + LOCK(::cs_main); + m_cached_tip.store(m_chainstate.m_chain.Tip(), std::memory_order_release); } ChainLockSigner::~ChainLockSigner() = default; @@ -40,6 +43,11 @@ void ChainLockSigner::Stop() m_sigman.UnregisterRecoveredSigsListener(this); } +void ChainLockSigner::UpdatedBlockTip(const CBlockIndex* pindexNew) +{ + m_cached_tip.store(pindexNew, std::memory_order_release); +} + void ChainLockSigner::TrySignChainTip(const llmq::CInstantSendManager& isman) { if (!m_mn_sync.IsBlockchainSynced()) { @@ -55,7 +63,7 @@ void ChainLockSigner::TrySignChainTip(const llmq::CInstantSendManager& isman) return; } - const CBlockIndex* pindex = WITH_LOCK(::cs_main, return m_chainstate.m_chain.Tip()); + const CBlockIndex* pindex = m_cached_tip.load(std::memory_order_acquire); if (!pindex || !pindex->pprev) { return; diff --git a/src/chainlock/signing.h b/src/chainlock/signing.h index 357b3295f757..78952b8bdea8 100644 --- a/src/chainlock/signing.h +++ b/src/chainlock/signing.h @@ -8,6 +8,7 @@ #include #include +class CBlockIndex; class CMasternodeSync; class CSporkManager; struct MessageProcessingResult; @@ -62,6 +63,9 @@ class ChainLockSigner final : public llmq::CRecoveredSigsListener uint256 lastSignedRequestId GUARDED_BY(cs_signer); uint256 lastSignedMsgHash GUARDED_BY(cs_signer); + // Cached tip pointer to avoid cs_main acquisition in TrySignChainTip + std::atomic m_cached_tip{nullptr}; + public: ChainLockSigner() = delete; ChainLockSigner(const ChainLockSigner&) = delete; @@ -73,6 +77,8 @@ class ChainLockSigner final : public llmq::CRecoveredSigsListener void Start(); void Stop(); + void UpdatedBlockTip(const CBlockIndex* pindexNew); + void EraseFromBlockHashTxidMap(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(!cs_signer); void UpdateBlockHashTxidMap(const uint256& hash, const std::vector& vtx) From 650241ea2e0f3e147b4e6a9689670f9882ff93a6 Mon Sep 17 00:00:00 2001 From: pasta Date: Tue, 4 Nov 2025 13:46:13 -0600 Subject: [PATCH 14/14] fix: restore Cleanup() declaration in CSigningManager header The Cleanup() method declaration was accidentally removed during rebase conflict resolution but the method is still defined and used in signing.cpp. --- src/llmq/signing.h | 1 + 1 file changed, 1 insertion(+) diff --git a/src/llmq/signing.h b/src/llmq/signing.h index 2e75add8d62e..18b95a796aed 100644 --- a/src/llmq/signing.h +++ b/src/llmq/signing.h @@ -248,6 +248,7 @@ class CSigningManager Mutex workMutex; std::condition_variable_any workCv; std::atomic workEpoch{0}; + void Cleanup(); // called from the worker thread void WorkThreadMain(PeerManager& peerman) EXCLUSIVE_LOCKS_REQUIRED(!cs_pending, !cs_listeners); public: