diff --git a/src/active/context.cpp b/src/active/context.cpp index d025c61b6149..d9782f5d0c7f 100644 --- a/src/active/context.cpp +++ b/src/active/context.cpp @@ -29,7 +29,7 @@ ActiveContext::ActiveContext(CBLSWorker& bls_worker, ChainstateManager& chainman CTxMemPool& mempool, llmq::CChainLocksHandler& clhandler, llmq::CInstantSendManager& isman, llmq::CQuorumBlockProcessor& qblockman, llmq::CQuorumManager& qman, llmq::CQuorumSnapshotManager& qsnapman, llmq::CSigningManager& sigman, - PeerManager& peerman, const CMasternodeSync& mn_sync, const CBLSSecretKey& operator_sk, + const CMasternodeSync& mn_sync, const CBLSSecretKey& operator_sk, const llmq::QvvecSyncModeMap& sync_map, const util::DbWrapperParams& db_params, bool quorums_recovery, bool quorums_watch) : m_clhandler{clhandler}, @@ -38,8 +38,7 @@ ActiveContext::ActiveContext(CBLSWorker& bls_worker, ChainstateManager& chainman nodeman{std::make_unique(connman, dmnman, operator_sk)}, dkgdbgman{std::make_unique()}, qdkgsman{std::make_unique(dmnman, qsnapman, chainman, sporkman, db_params, quorums_watch)}, - shareman{std::make_unique(connman, chainman.ActiveChainstate(), sigman, peerman, *nodeman, - qman, sporkman)}, + shareman{std::make_unique(connman, chainman, sigman, *nodeman, qman, sporkman)}, gov_signer{std::make_unique(connman, dmnman, govman, *nodeman, chainman, mn_sync)}, ehf_sighandler{std::make_unique(chainman, mnhfman, sigman, *shareman, qman)}, qman_handler{std::make_unique(bls_worker, connman, dmnman, qman, qsnapman, *nodeman, chainman, @@ -67,27 +66,18 @@ ActiveContext::~ActiveContext() m_clhandler.DisconnectSigner(); } -void ActiveContext::Interrupt() -{ - shareman->InterruptWorkerThread(); -} - void ActiveContext::Start(CConnman& connman, PeerManager& peerman) { qman_handler->Start(); qdkgsman->StartThreads(connman, peerman); - shareman->Start(); cl_signer->RegisterRecoveryInterface(); is_signer->RegisterRecoveryInterface(); - shareman->RegisterRecoveryInterface(); } void ActiveContext::Stop() { - shareman->UnregisterRecoveryInterface(); is_signer->UnregisterRecoveryInterface(); cl_signer->UnregisterRecoveryInterface(); - shareman->Stop(); qdkgsman->StopThreads(); qman_handler->Stop(); } @@ -115,8 +105,3 @@ void ActiveContext::UpdatedBlockTip(const CBlockIndex* pindexNew, const CBlockIn qdkgsman->UpdatedBlockTip(pindexNew, fInitialDownload); qman_handler->UpdatedBlockTip(pindexNew, fInitialDownload); } - -void ActiveContext::NotifyRecoveredSig(const std::shared_ptr& sig, bool proactive_relay) -{ - shareman->NotifyRecoveredSig(sig, proactive_relay); -} diff --git a/src/active/context.h b/src/active/context.h index 9b3d5896ec07..bf4b79c6644a 100644 --- a/src/active/context.h +++ b/src/active/context.h @@ -66,13 +66,12 @@ struct ActiveContext final : public CValidationInterface { CMNHFManager& mnhfman, CSporkManager& sporkman, CTxMemPool& mempool, llmq::CChainLocksHandler& clhandler, llmq::CInstantSendManager& isman, llmq::CQuorumBlockProcessor& qblockman, llmq::CQuorumManager& qman, - llmq::CQuorumSnapshotManager& qsnapman, llmq::CSigningManager& sigman, PeerManager& peerman, + llmq::CQuorumSnapshotManager& qsnapman, llmq::CSigningManager& sigman, const CMasternodeSync& mn_sync, const CBLSSecretKey& operator_sk, const llmq::QvvecSyncModeMap& sync_map, const util::DbWrapperParams& db_params, bool quorums_recovery, bool quorums_watch); ~ActiveContext(); - void Interrupt(); void Start(CConnman& connman, PeerManager& peerman); void Stop(); @@ -81,7 +80,6 @@ struct ActiveContext final : public CValidationInterface { protected: // CValidationInterface - void NotifyRecoveredSig(const std::shared_ptr& sig, bool proactive_relay) override; void UpdatedBlockTip(const CBlockIndex* pindexNew, const CBlockIndex* pindexFork, bool fInitialDownload) override; public: diff --git a/src/init.cpp b/src/init.cpp index f97d98f8a766..f55a7dafcd3b 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -252,9 +252,6 @@ void Interrupt(NodeContext& node) InterruptRPC(); InterruptREST(); InterruptTorControl(); - if (node.active_ctx) { - node.active_ctx->Interrupt(); - } if (node.peerman) { node.peerman->InterruptHandlers(); } @@ -2194,7 +2191,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) node.active_ctx = std::make_unique(*node.llmq_ctx->bls_worker, chainman, *node.connman, *node.dmnman, *node.govman, *node.mn_metaman, *node.mnhf_manager, *node.sporkman, *node.mempool, *node.llmq_ctx->clhandler, *node.llmq_ctx->isman, *node.llmq_ctx->quorum_block_processor, *node.llmq_ctx->qman, *node.llmq_ctx->qsnapman, *node.llmq_ctx->sigman, - *node.peerman, *node.mn_sync, operator_sk, sync_map, dash_db_params, quorums_recovery, quorums_watch); + *node.mn_sync, operator_sk, sync_map, dash_db_params, quorums_recovery, quorums_watch); RegisterValidationInterface(node.active_ctx.get()); } else if (quorums_watch) { node.observer_ctx = std::make_unique(*node.llmq_ctx->bls_worker, *node.connman, *node.dmnman, *node.mn_metaman, *node.mn_sync, @@ -2206,7 +2203,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) // ********************************************************* Step 7d: Setup other Dash services node.peerman->AddExtraHandler(std::make_unique(node.peerman.get(), *node.llmq_ctx->isman, *node.llmq_ctx->qman, chainman.ActiveChainstate())); - node.peerman->AddExtraHandler(std::make_unique(node.peerman.get(), *node.llmq_ctx->sigman)); + node.peerman->AddExtraHandler(std::make_unique(node.peerman.get(), *node.llmq_ctx->sigman, node.active_ctx ? node.active_ctx->shareman.get() : nullptr, *node.sporkman)); if (node.active_ctx) { auto cj_server = std::make_unique(node.peerman.get(), chainman, *node.connman, *node.dmnman, *node.dstxman, *node.mn_metaman, diff --git a/src/llmq/ehf_signals.cpp b/src/llmq/ehf_signals.cpp index 804276b98452..f082aa26862b 100644 --- a/src/llmq/ehf_signals.cpp +++ b/src/llmq/ehf_signals.cpp @@ -92,7 +92,6 @@ MessageProcessingResult CEHFSignalsHandler::HandleNewRecoveredSig(const CRecover return {}; } - MessageProcessingResult ret; const auto ehfSignals = mnhfman.GetSignalsStage(WITH_LOCK(::cs_main, return m_chainman.ActiveTip())); MNHFTxPayload mnhfPayload; for (const auto& deployment : Params().GetConsensus().vDeployments) { @@ -112,19 +111,20 @@ MessageProcessingResult CEHFSignalsHandler::HandleNewRecoveredSig(const CRecover CMutableTransaction tx = mnhfPayload.PrepareTx(); - { - CTransactionRef tx_to_sent = MakeTransactionRef(std::move(tx)); - LogPrintf("CEHFSignalsHandler::HandleNewRecoveredSig Special EHF TX is created hash=%s\n", tx_to_sent->GetHash().ToString()); - LOCK(::cs_main); - const MempoolAcceptResult result = m_chainman.ProcessTransaction(tx_to_sent); - if (result.m_result_type == MempoolAcceptResult::ResultType::VALID) { - ret.m_transactions.push_back(tx_to_sent->GetHash()); - } else { - LogPrintf("CEHFSignalsHandler::HandleNewRecoveredSig -- AcceptToMemoryPool failed: %s\n", result.m_state.ToString()); - } + CTransactionRef tx_to_sent = MakeTransactionRef(std::move(tx)); + LogPrintf("CEHFSignalsHandler::HandleNewRecoveredSig Special EHF TX is created hash=%s\n", + tx_to_sent->GetHash().ToString()); + LOCK(::cs_main); + const MempoolAcceptResult result = m_chainman.ProcessTransaction(tx_to_sent); + if (result.m_result_type == MempoolAcceptResult::ResultType::VALID) { + MessageProcessingResult ret; + ret.m_transactions.push_back(tx_to_sent->GetHash()); + return ret; } - break; + LogPrintf("CEHFSignalsHandler::HandleNewRecoveredSig -- AcceptToMemoryPool failed: %s\n", + result.m_state.ToString()); + return {}; } - return ret; + return {}; } } // namespace llmq diff --git a/src/llmq/net_signing.cpp b/src/llmq/net_signing.cpp index 575dc8ca3a8a..23f5c986a564 100644 --- a/src/llmq/net_signing.cpp +++ b/src/llmq/net_signing.cpp @@ -4,44 +4,130 @@ #include +#include #include #include +#include #include #include #include +#include #include #include #include #include +namespace llmq { void NetSigning::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv) { - if (msg_type != NetMsgType::QSIGREC) return; + if (msg_type == NetMsgType::QSIGREC) { + auto recoveredSig = std::make_shared(); + vRecv >> *recoveredSig; - auto recoveredSig = std::make_shared(); - vRecv >> *recoveredSig; + WITH_LOCK(cs_main, m_peer_manager->PeerEraseObjectRequest(pfrom.GetId(), CInv{MSG_QUORUM_RECOVERED_SIG, + recoveredSig->GetHash()})); - WITH_LOCK(cs_main, m_peer_manager->PeerEraseObjectRequest(pfrom.GetId(), - CInv{MSG_QUORUM_RECOVERED_SIG, recoveredSig->GetHash()})); + if (!Params().GetLLMQ(recoveredSig->getLlmqType()).has_value()) { + m_peer_manager->PeerMisbehaving(pfrom.GetId(), 100); + return; + } - if (!Params().GetLLMQ(recoveredSig->getLlmqType()).has_value()) { - m_peer_manager->PeerMisbehaving(pfrom.GetId(), 100); - return; + m_sig_manager.VerifyAndProcessRecoveredSig(pfrom.GetId(), std::move(recoveredSig)); } - m_sig_manager.VerifyAndProcessRecoveredSig(pfrom.GetId(), std::move(recoveredSig)); + if (m_shares_manager == nullptr) return; + + if (m_sporkman.IsSporkActive(SPORK_21_QUORUM_ALL_CONNECTED) && msg_type == NetMsgType::QSIGSHARE) { + std::vector receivedSigShares; + vRecv >> receivedSigShares; + + if (receivedSigShares.size() > CSigSharesManager::MAX_MSGS_SIG_SHARES) { + LogPrint(BCLog::LLMQ_SIGS, "NetSigning::%s -- too many sigs in QSIGSHARE message. cnt=%d, max=%d, node=%d\n", + __func__, receivedSigShares.size(), CSigSharesManager::MAX_MSGS_SIG_SHARES, pfrom.GetId()); + BanNode(pfrom.GetId()); + return; + } + + for (const auto& sigShare : receivedSigShares) { + if (!m_shares_manager->ProcessMessageSigShare(pfrom.GetId(), sigShare)) { + BanNode(pfrom.GetId()); + } + } + } + + if (msg_type == NetMsgType::QSIGSESANN) { + std::vector msgs; + vRecv >> msgs; + if (msgs.size() > CSigSharesManager::MAX_MSGS_CNT_QSIGSESANN) { + LogPrint(BCLog::LLMQ_SIGS, /* Continued */ + "NetSigning::%s -- too many announcements in QSIGSESANN message. cnt=%d, max=%d, node=%d\n", + __func__, msgs.size(), CSigSharesManager::MAX_MSGS_CNT_QSIGSESANN, pfrom.GetId()); + BanNode(pfrom.GetId()); + return; + } + if (!ranges::all_of(msgs, [this, &pfrom](const auto& ann) { + return m_shares_manager->ProcessMessageSigSesAnn(pfrom, ann); + })) { + BanNode(pfrom.GetId()); + return; + } + } else if (msg_type == NetMsgType::QSIGSHARESINV || msg_type == NetMsgType::QGETSIGSHARES) { + std::vector msgs; + vRecv >> msgs; + if (msgs.size() > CSigSharesManager::MAX_MSGS_CNT_QSIGSHARES) { + LogPrint(BCLog::LLMQ_SIGS, "NetSigning::%s -- too many invs in %s message. cnt=%d, max=%d, node=%d\n", + __func__, msg_type, msgs.size(), CSigSharesManager::MAX_MSGS_CNT_QSIGSHARES, pfrom.GetId()); + BanNode(pfrom.GetId()); + return; + } + if (!ranges::all_of(msgs, [this, &pfrom, &msg_type](const auto& inv) { + return m_shares_manager->ProcessMessageSigShares(pfrom, inv, msg_type); + })) { + BanNode(pfrom.GetId()); + return; + } + } else if (msg_type == NetMsgType::QBSIGSHARES) { + std::vector msgs; + vRecv >> msgs; + size_t totalSigsCount = 0; + for (const auto& bs : msgs) { + totalSigsCount += bs.sigShares.size(); + } + if (totalSigsCount > CSigSharesManager::MAX_MSGS_TOTAL_BATCHED_SIGS) { + LogPrint(BCLog::LLMQ_SIGS, "NetSigning::%s -- too many sigs in QBSIGSHARES message. cnt=%d, max=%d, node=%d\n", + __func__, msgs.size(), CSigSharesManager::MAX_MSGS_TOTAL_BATCHED_SIGS, pfrom.GetId()); + BanNode(pfrom.GetId()); + return; + } + if (!ranges::all_of(msgs, [this, &pfrom](const auto& bs) { + return m_shares_manager->ProcessMessageBatchedSigShares(pfrom, bs); + })) { + BanNode(pfrom.GetId()); + return; + } + } } void NetSigning::Start() { // can't start new thread if we have one running already - if (workThread.joinable()) { - assert(false); - } + assert(!signing_thread.joinable()); + assert(!shares_cleaning_thread.joinable()); + assert(!shares_dispatcher_thread.joinable()); + + signing_thread = std::thread(&util::TraceThread, "recsigs", [this] { WorkThreadSigning(); }); - workThread = std::thread(&util::TraceThread, "recsigs", [this] { WorkThreadMain(); }); + if (m_shares_manager) { + // Initialize worker pool + int worker_count = std::clamp(static_cast(std::thread::hardware_concurrency() / 2), 1, 4); + worker_pool.resize(worker_count); + RenameThreadPool(worker_pool, "sigsh-work"); + + shares_cleaning_thread = std::thread(&util::TraceThread, "sigsh-maint", [this] { WorkThreadCleaning(); }); + shares_dispatcher_thread = std::thread(&util::TraceThread, "sigsh-dispat", [this] { WorkThreadDispatcher(); }); + } } void NetSigning::Stop() @@ -51,36 +137,51 @@ void NetSigning::Stop() assert(false); } - if (workThread.joinable()) { - workThread.join(); + if (signing_thread.joinable()) { + signing_thread.join(); + } + + if (m_shares_manager) { + // Join threads FIRST to stop any pending push() calls + if (shares_cleaning_thread.joinable()) { + shares_cleaning_thread.join(); + } + if (shares_dispatcher_thread.joinable()) { + shares_dispatcher_thread.join(); + } + + // Then stop worker pool (now safe, no more push() calls) + worker_pool.clear_queue(); + worker_pool.stop(true); } } -void NetSigning::ProcessRecoveredSig(std::shared_ptr recoveredSig, bool consider_proactive_relay) +void NetSigning::ProcessRecoveredSig(std::shared_ptr recovered_sig, bool consider_proactive_relay) { - if (!m_sig_manager.ProcessRecoveredSig(recoveredSig)) return; + if (recovered_sig == nullptr) return; + if (!m_sig_manager.ProcessRecoveredSig(recovered_sig)) return; auto listeners = m_sig_manager.GetListeners(); for (auto& l : listeners) { - m_peer_manager->PeerPostProcessMessage(l->HandleNewRecoveredSig(*recoveredSig)); + m_peer_manager->PeerPostProcessMessage(l->HandleNewRecoveredSig(*recovered_sig)); } // TODO refactor to use a better abstraction analogous to IsAllMembersConnectedEnabled - auto proactive_relay = consider_proactive_relay && recoveredSig->getLlmqType() != Consensus::LLMQType::LLMQ_100_67 && - recoveredSig->getLlmqType() != Consensus::LLMQType::LLMQ_400_60 && - recoveredSig->getLlmqType() != Consensus::LLMQType::LLMQ_400_85; - GetMainSignals().NotifyRecoveredSig(recoveredSig, recoveredSig->GetHash().ToString(), proactive_relay); + auto proactive_relay = consider_proactive_relay && recovered_sig->getLlmqType() != Consensus::LLMQType::LLMQ_100_67 && + recovered_sig->getLlmqType() != Consensus::LLMQType::LLMQ_400_60 && + recovered_sig->getLlmqType() != Consensus::LLMQType::LLMQ_400_85; + GetMainSignals().NotifyRecoveredSig(recovered_sig, recovered_sig->GetHash().ToString(), proactive_relay); } bool NetSigning::ProcessPendingRecoveredSigs() { - Uint256HashMap> pending{m_sig_manager.FetchPendingReconstructed()}; + Uint256HashMap> pending{m_sig_manager.FetchPendingReconstructed()}; for (const auto& p : pending) { ProcessRecoveredSig(p.second, true); } - std::unordered_map>> recSigsByNode; + std::unordered_map>> recSigsByNode; std::unordered_map, CBLSPublicKey, StaticSaltedHasher> pubkeys; const size_t nMaxBatchSize{32}; @@ -135,7 +236,7 @@ bool NetSigning::ProcessPendingRecoveredSigs() return more_work; } -void NetSigning::WorkThreadMain() +void NetSigning::WorkThreadSigning() { while (!workInterrupt) { bool fMoreWork = ProcessPendingRecoveredSigs(); @@ -151,3 +252,153 @@ void NetSigning::WorkThreadMain() } } } + +void NetSigning::RemoveBannedNodeStates() +{ + assert(m_shares_manager != nullptr); + // Called regularly to cleanup local node states for banned nodes + m_shares_manager->RemoveNodesIf([this](NodeId node_id) { return m_peer_manager->PeerIsBanned(node_id); }); +} + +void NetSigning::BanNode(NodeId nodeId) +{ + if (nodeId == -1) return; + + m_peer_manager->PeerMisbehaving(nodeId, 100); + if (m_shares_manager) { + m_shares_manager->MarkAsBanned(nodeId); + } +} + +void NetSigning::WorkThreadCleaning() +{ + assert(m_shares_manager); + + while (!workInterrupt) { + RemoveBannedNodeStates(); + + m_shares_manager->SendMessages(); + m_shares_manager->Cleanup(); + + workInterrupt.sleep_for(std::chrono::milliseconds(100)); + } +} + +void NetSigning::WorkThreadDispatcher() +{ + assert(m_shares_manager); + + while (!workInterrupt) { + // Dispatch all pending signs (individual tasks) + { + auto signs = m_shares_manager->DispatchPendingSigns(); + // Dispatch all signs to worker pool + for (auto& work : signs) { + if (workInterrupt) break; + + worker_pool.push([this, work = std::move(work)](int) mutable { + auto rs = m_shares_manager->SignAndProcessSingleShare(std::move(work)); + ProcessRecoveredSig(rs, true); + }); + } + } + + if (m_shares_manager->IsAnyPendingProcessing()) { + // If there's processing work, spawn a helper worker + worker_pool.push([this](int) { + while (!workInterrupt) { + bool moreWork = ProcessPendingSigShares(); + + if (!moreWork) { + return; // No work found, exit immediately + } + } + }); + } + + // Always sleep briefly between checks + workInterrupt.sleep_for(std::chrono::milliseconds(10)); + } +} + +void NetSigning::NotifyRecoveredSig(const std::shared_ptr& sig, bool proactive_relay) +{ + m_peer_manager->PeerRelayRecoveredSig(*sig, proactive_relay); +} + +bool NetSigning::ProcessPendingSigShares() +{ + std::unordered_map> sigSharesByNodes; + std::unordered_map, CQuorumCPtr, StaticSaltedHasher> quorums; + + const size_t nMaxBatchSize{32}; + bool more_work = m_shares_manager->CollectPendingSigSharesToVerify(nMaxBatchSize, sigSharesByNodes, quorums); + if (sigSharesByNodes.empty()) { + return false; + } + + // It's ok to perform insecure batched verification here as we verify against the quorum public key shares, + // which are not craftable by individual entities, making the rogue public key attack impossible + CBLSBatchVerifier batchVerifier(false, true); + + cxxtimer::Timer prepareTimer(true); + size_t verifyCount = 0; + for (const auto& [nodeId, v] : sigSharesByNodes) { + for (const auto& sigShare : v) { + if (m_sig_manager.HasRecoveredSigForId(sigShare.getLlmqType(), sigShare.getId())) { + continue; + } + + // Materialize the signature once. Get() internally validates, so if it returns an invalid signature, + // we know it's malformed. This avoids calling Get() twice (once for IsValid(), once for PushMessage). + CBLSSignature sig = sigShare.sigShare.Get(); + // we didn't check this earlier because we use a lazy BLS signature and tried to avoid doing the expensive + // deserialization in the message thread + if (!sig.IsValid()) { + BanNode(nodeId); + // don't process any additional shares from this node + break; + } + + auto quorum = quorums.at(std::make_pair(sigShare.getLlmqType(), sigShare.getQuorumHash())); + auto pubKeyShare = quorum->GetPubKeyShare(sigShare.getQuorumMember()); + + if (!pubKeyShare.IsValid()) { + // this should really not happen (we already ensured we have the quorum vvec, + // so we should also be able to create all pubkey shares) + LogPrintf("NetSigning::%s -- pubKeyShare is invalid, which should not be possible here\n", __func__); + assert(false); + } + + batchVerifier.PushMessage(nodeId, sigShare.GetKey(), sigShare.GetSignHash(), sig, pubKeyShare); + verifyCount++; + } + } + prepareTimer.stop(); + + cxxtimer::Timer verifyTimer(true); + batchVerifier.Verify(); + verifyTimer.stop(); + + LogPrint(BCLog::LLMQ_SIGS, "NetSigning::%s -- verified sig shares. count=%d, pt=%d, vt=%d, nodes=%d\n", __func__, + verifyCount, prepareTimer.count(), verifyTimer.count(), sigSharesByNodes.size()); + + for (const auto& [nodeId, v] : sigSharesByNodes) { + if (batchVerifier.badSources.count(nodeId) != 0) { + LogPrint(BCLog::LLMQ_SIGS, "NetSigning::%s -- invalid sig shares from other node, banning peer=%d\n", + __func__, nodeId); + // this will also cause re-requesting of the shares that were sent by this node + BanNode(nodeId); + continue; + } + + auto rec_sigs = m_shares_manager->ProcessPendingSigShares(v, quorums); + for (auto& rs : rec_sigs) { + ProcessRecoveredSig(rs, true); + } + } + + return more_work; +} + +} // namespace llmq diff --git a/src/llmq/net_signing.h b/src/llmq/net_signing.h index d77b4b916eed..10ef78d4cd09 100644 --- a/src/llmq/net_signing.h +++ b/src/llmq/net_signing.h @@ -5,43 +5,73 @@ #ifndef BITCOIN_LLMQ_NET_SIGNING_H #define BITCOIN_LLMQ_NET_SIGNING_H +#include #include - #include #include +#include #include +#include + +class CSporkManager; + namespace llmq { +class CSigSharesManager; class CSigningManager; -} // namespace llmq -class NetSigning final : public NetHandler +class NetSigning final : public NetHandler, public CValidationInterface { public: - NetSigning(PeerManagerInternal* peer_manager, llmq::CSigningManager& sig_manager) : + NetSigning(PeerManagerInternal* peer_manager, CSigningManager& sig_manager, CSigSharesManager* shares_manager, + const CSporkManager& sporkman) : NetHandler(peer_manager), - m_sig_manager(sig_manager) + m_sig_manager{sig_manager}, + m_shares_manager{shares_manager}, + m_sporkman{sporkman} { workInterrupt.reset(); } void ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv) override; [[nodiscard]] bool ProcessPendingRecoveredSigs(); - void ProcessRecoveredSig(std::shared_ptr recoveredSig, bool consider_proactive_relay); + void ProcessRecoveredSig(std::shared_ptr recoveredSig, bool consider_proactive_relay); + +protected: + // CValidationInterface + void NotifyRecoveredSig(const std::shared_ptr& sig, bool proactive_relay) override; + // NetSigning void Start() override; void Stop() override; void Interrupt() override { workInterrupt(); }; - void WorkThreadMain(); +private: + void WorkThreadSigning(); + void WorkThreadCleaning(); + void WorkThreadDispatcher(); + + bool ProcessPendingSigShares(); + + void RemoveBannedNodeStates(); + void BanNode(NodeId nodeid); private: - llmq::CSigningManager& m_sig_manager; + CSigningManager& m_sig_manager; + CSigSharesManager* m_shares_manager; + const CSporkManager& m_sporkman; CleanupThrottler cleanupThrottler; - std::thread workThread; + + std::thread signing_thread; + std::thread shares_cleaning_thread; + std::thread shares_dispatcher_thread; + mutable ctpl::thread_pool worker_pool; + CThreadInterrupt workInterrupt; }; +} // namespace llmq + #endif // BITCOIN_LLMQ_NET_SIGNING_H diff --git a/src/llmq/signing_shares.cpp b/src/llmq/signing_shares.cpp index 5892b6e51bc5..89cdacd60baa 100644 --- a/src/llmq/signing_shares.cpp +++ b/src/llmq/signing_shares.cpp @@ -5,22 +5,20 @@ #include #include -#include +#include #include #include #include +#include #include #include #include -#include -#include -#include - -#include -#include +#include #include +#include #include #include +#include #include #include @@ -180,63 +178,20 @@ void CSigSharesNodeState::RemoveSession(const uint256& signHash) ////////////////////// -CSigSharesManager::CSigSharesManager(CConnman& connman, CChainState& chainstate, CSigningManager& _sigman, - PeerManager& peerman, const CActiveMasternodeManager& mn_activeman, - const CQuorumManager& _qman, const CSporkManager& sporkman) : +CSigSharesManager::CSigSharesManager(CConnman& connman, const ChainstateManager& chainman, CSigningManager& _sigman, + const CActiveMasternodeManager& mn_activeman, const CQuorumManager& _qman, + const CSporkManager& sporkman) : m_connman{connman}, - m_chainstate{chainstate}, + m_chainman{chainman}, sigman{_sigman}, - m_peerman{peerman}, m_mn_activeman{mn_activeman}, qman{_qman}, m_sporkman{sporkman} { - workInterrupt.reset(); } CSigSharesManager::~CSigSharesManager() = default; -void CSigSharesManager::Start() -{ - // can't start if threads are already running - if (housekeepingThread.joinable() || dispatcherThread.joinable()) { - assert(false); - } - - // Initialize worker pool - int workerCount = std::clamp(static_cast(std::thread::hardware_concurrency() / 2), 1, 4); - workerPool.resize(workerCount); - RenameThreadPool(workerPool, "sigsh-work"); - - // Start housekeeping thread - housekeepingThread = std::thread(&util::TraceThread, "sigsh-maint", - [this] { HousekeepingThreadMain(); }); - - // Start dispatcher thread - dispatcherThread = std::thread(&util::TraceThread, "sigsh-dispat", - [this] { WorkDispatcherThreadMain(); }); -} - -void CSigSharesManager::Stop() -{ - // make sure to call InterruptWorkerThread() first - if (!workInterrupt) { - assert(false); - } - - // Join threads FIRST to stop any pending push() calls - if (housekeepingThread.joinable()) { - housekeepingThread.join(); - } - if (dispatcherThread.joinable()) { - dispatcherThread.join(); - } - - // Then stop worker pool (now safe, no more push() calls) - workerPool.clear_queue(); - workerPool.stop(true); -} - void CSigSharesManager::RegisterRecoveryInterface() { sigman.RegisterRecoveredSigsListener(this); @@ -247,90 +202,6 @@ void CSigSharesManager::UnregisterRecoveryInterface() sigman.UnregisterRecoveredSigsListener(this); } -void CSigSharesManager::InterruptWorkerThread() -{ - workInterrupt(); -} - -void CSigSharesManager::ProcessMessage(const CNode& pfrom, const std::string& msg_type, CDataStream& vRecv) -{ - // non-masternodes are not interested in sigshares - if (m_mn_activeman.GetProTxHash().IsNull()) return; - - if (m_sporkman.IsSporkActive(SPORK_21_QUORUM_ALL_CONNECTED) && msg_type == NetMsgType::QSIGSHARE) { - std::vector receivedSigShares; - vRecv >> receivedSigShares; - - if (receivedSigShares.size() > MAX_MSGS_SIG_SHARES) { - LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many sigs in QSIGSHARE message. cnt=%d, max=%d, node=%d\n", __func__, receivedSigShares.size(), MAX_MSGS_SIG_SHARES, pfrom.GetId()); - BanNode(pfrom.GetId()); - return; - } - - for (const auto& sigShare : receivedSigShares) { - ProcessMessageSigShare(pfrom.GetId(), sigShare); - } - } - - if (msg_type == NetMsgType::QSIGSESANN) { - std::vector msgs; - vRecv >> msgs; - if (msgs.size() > MAX_MSGS_CNT_QSIGSESANN) { - LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many announcements in QSIGSESANN message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QSIGSESANN, pfrom.GetId()); - BanNode(pfrom.GetId()); - return; - } - if (!ranges::all_of(msgs, - [this, &pfrom](const auto& ann){ return ProcessMessageSigSesAnn(pfrom, ann); })) { - BanNode(pfrom.GetId()); - return; - } - } else if (msg_type == NetMsgType::QSIGSHARESINV) { - std::vector msgs; - vRecv >> msgs; - if (msgs.size() > MAX_MSGS_CNT_QSIGSHARESINV) { - LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many invs in QSIGSHARESINV message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QSIGSHARESINV, pfrom.GetId()); - BanNode(pfrom.GetId()); - return; - } - if (!ranges::all_of(msgs, - [this, &pfrom](const auto& inv){ return ProcessMessageSigSharesInv(pfrom, inv); })) { - BanNode(pfrom.GetId()); - return; - } - } else if (msg_type == NetMsgType::QGETSIGSHARES) { - std::vector msgs; - vRecv >> msgs; - if (msgs.size() > MAX_MSGS_CNT_QGETSIGSHARES) { - LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many invs in QGETSIGSHARES message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QGETSIGSHARES, pfrom.GetId()); - BanNode(pfrom.GetId()); - return; - } - if (!ranges::all_of(msgs, - [this, &pfrom](const auto& inv){ return ProcessMessageGetSigShares(pfrom, inv); })) { - BanNode(pfrom.GetId()); - return; - } - } else if (msg_type == NetMsgType::QBSIGSHARES) { - std::vector msgs; - vRecv >> msgs; - size_t totalSigsCount = 0; - for (const auto& bs : msgs) { - totalSigsCount += bs.sigShares.size(); - } - if (totalSigsCount > MAX_MSGS_TOTAL_BATCHED_SIGS) { - LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many sigs in QBSIGSHARES message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_TOTAL_BATCHED_SIGS, pfrom.GetId()); - BanNode(pfrom.GetId()); - return; - } - if (!ranges::all_of(msgs, - [this, &pfrom](const auto& bs){ return ProcessMessageBatchedSigShares(pfrom, bs); })) { - BanNode(pfrom.GetId()); - return; - } - } -} - bool CSigSharesManager::ProcessMessageSigSesAnn(const CNode& pfrom, const CSigSesAnn& ann) { auto llmqType = ann.getLlmqType(); @@ -363,13 +234,13 @@ bool CSigSharesManager::ProcessMessageSigSesAnn(const CNode& pfrom, const CSigSe return true; } -bool CSigSharesManager::VerifySigSharesInv(Consensus::LLMQType llmqType, const CSigSharesInv& inv) +static bool VerifySigSharesInv(Consensus::LLMQType llmqType, const CSigSharesInv& inv) { const auto& llmq_params_opt = Params().GetLLMQ(llmqType); return llmq_params_opt.has_value() && (inv.inv.size() == size_t(llmq_params_opt->size)); } -bool CSigSharesManager::ProcessMessageSigSharesInv(const CNode& pfrom, const CSigSharesInv& inv) +bool CSigSharesManager::ProcessMessageSigShares(const CNode& pfrom, const CSigSharesInv& inv, const std::string& msg_type) { CSigSharesNodeState::SessionInfo sessionInfo; if (!GetSessionInfoByRecvId(pfrom.GetId(), inv.sessionId, sessionInfo)) { @@ -388,7 +259,7 @@ bool CSigSharesManager::ProcessMessageSigSharesInv(const CNode& pfrom, const CSi LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- signHash=%s, inv={%s}, node=%d\n", __func__, sessionInfo.signHash.ToString(), inv.ToString(), pfrom.GetId()); - if (!sessionInfo.quorum->HasVerificationVector()) { + if (msg_type == NetMsgType::QSIGSHARESINV && !sessionInfo.quorum->HasVerificationVector()) { // TODO we should allow to ask other nodes for the quorum vvec if we missed it in the DKG LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- we don't have the quorum vvec for %s, not requesting sig shares. node=%d\n", __func__, sessionInfo.quorumHash.ToString(), pfrom.GetId()); @@ -401,38 +272,56 @@ bool CSigSharesManager::ProcessMessageSigSharesInv(const CNode& pfrom, const CSi if (session == nullptr) { return true; } - session->announced.Merge(inv); + if (msg_type == NetMsgType::QSIGSHARESINV) { + session->announced.Merge(inv); + } else { // msg_type == NetMsgType::QGETSIGSHARES + session->requested.Merge(inv); + } + session->knows.Merge(inv); return true; } -bool CSigSharesManager::ProcessMessageGetSigShares(const CNode& pfrom, const CSigSharesInv& inv) +// Failure is not issue, we should not ban node +static bool PreVerifySigShareQuorum(const CActiveMasternodeManager& mn_activeman, const CQuorumManager& quorum_manager, + const CQuorumCPtr& quorum, Consensus::LLMQType llmqType) { - CSigSharesNodeState::SessionInfo sessionInfo; - if (!GetSessionInfoByRecvId(pfrom.GetId(), inv.sessionId, sessionInfo)) { - return true; + if (!IsQuorumActive(llmqType, quorum_manager, quorum->qc->quorumHash)) { + // quorum is too old + return false; } - - if (!VerifySigSharesInv(sessionInfo.llmqType, inv)) { + if (!quorum->IsMember(mn_activeman.GetProTxHash())) { + // we're not a member so we can't verify it (we actually shouldn't have received it) return false; } - - // TODO for PoSe, we should consider propagating shares even if we already have a recovered sig - if (sigman.HasRecoveredSigForSession(sessionInfo.signHash.Get())) { - return true; + if (!quorum->HasVerificationVector()) { + // TODO we should allow to ask other nodes for the quorum vvec if we missed it in the DKG + LogPrint(BCLog::LLMQ_SIGS, "%s -- we don't have the quorum vvec for %s, no verification possible.\n", __func__, + quorum->qc->quorumHash.ToString()); + return false; } + return true; +} - LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- signHash=%s, inv={%s}, node=%d\n", __func__, - sessionInfo.signHash.ToString(), inv.ToString(), pfrom.GetId()); +// Ban node if PreVerifyBatchedSigShares failed +bool PreVerifyBatchedSigShares(const CSigSharesNodeState::SessionInfo& session, const CBatchedSigShares& batchedSigShares) +{ + std::unordered_set dupMembers; - LOCK(cs); - auto& nodeState = nodeStates[pfrom.GetId()]; - auto* session = nodeState.GetSessionByRecvId(inv.sessionId); - if (session == nullptr) { - return true; + for (const auto& [quorumMember, _] : batchedSigShares.sigShares) { + if (!dupMembers.emplace(quorumMember).second) { + return false; + } + + if (quorumMember >= session.quorum->members.size()) { + LogPrint(BCLog::LLMQ_SIGS, "%s -- quorumMember out of bounds\n", __func__); + return false; + } + if (!session.quorum->qc->validMembers[quorumMember]) { + LogPrint(BCLog::LLMQ_SIGS, "%s -- quorumMember not valid\n", __func__); + return false; + } } - session->requested.Merge(inv); - session->knows.Merge(inv); return true; } @@ -443,8 +332,12 @@ bool CSigSharesManager::ProcessMessageBatchedSigShares(const CNode& pfrom, const return true; } - if (bool ban{false}; !PreVerifyBatchedSigShares(m_mn_activeman, qman, sessionInfo, batchedSigShares, ban)) { - return !ban; + if (!PreVerifySigShareQuorum(m_mn_activeman, qman, sessionInfo.quorum, sessionInfo.llmqType)) { + return true; + } + + if (!PreVerifyBatchedSigShares(sessionInfo, batchedSigShares)) { + return false; // ban node } std::vector sigSharesToProcess; @@ -490,36 +383,23 @@ bool CSigSharesManager::ProcessMessageBatchedSigShares(const CNode& pfrom, const return true; } -void CSigSharesManager::ProcessMessageSigShare(NodeId fromId, const CSigShare& sigShare) +bool CSigSharesManager::ProcessMessageSigShare(NodeId fromId, const CSigShare& sigShare) { auto quorum = qman.GetQuorum(sigShare.getLlmqType(), sigShare.getQuorumHash()); if (!quorum) { - return; - } - if (!IsQuorumActive(sigShare.getLlmqType(), qman, quorum->qc->quorumHash)) { - // quorum is too old - return; - } - if (!quorum->IsMember(m_mn_activeman.GetProTxHash())) { - // we're not a member so we can't verify it (we actually shouldn't have received it) - return; + return true; } - if (!quorum->HasVerificationVector()) { - // TODO we should allow to ask other nodes for the quorum vvec if we missed it in the DKG - LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- we don't have the quorum vvec for %s, no verification possible. node=%d\n", __func__, - quorum->qc->quorumHash.ToString(), fromId); - return; + if (!PreVerifySigShareQuorum(m_mn_activeman, qman, quorum, sigShare.getLlmqType())) { + return true; } if (sigShare.getQuorumMember() >= quorum->members.size()) { LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- quorumMember out of bounds\n", __func__); - BanNode(fromId); - return; + return false; } if (!quorum->qc->validMembers[sigShare.getQuorumMember()]) { LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- quorumMember not valid\n", __func__); - BanNode(fromId); - return; + return false; } const auto signHash = sigShare.GetSignHash(); @@ -535,11 +415,11 @@ void CSigSharesManager::ProcessMessageSigShare(NodeId fromId, const CSigShare& s "msgHash=%s, member=%d, node=%d\n", __func__, signHash.ToString(), sigShare.getId().ToString(), sigShare.getMsgHash().ToString(), sigShare.getQuorumMember(), fromId); - return; + return true; } if (sigShares.Has(sigShare.GetKey())) { - return; + return true; } auto& nodeState = nodeStates[fromId]; @@ -548,47 +428,6 @@ void CSigSharesManager::ProcessMessageSigShare(NodeId fromId, const CSigShare& s LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- signHash=%s, id=%s, msgHash=%s, member=%d, node=%d\n", __func__, signHash.ToString(), sigShare.getId().ToString(), sigShare.getMsgHash().ToString(), sigShare.getQuorumMember(), fromId); -} - -bool CSigSharesManager::PreVerifyBatchedSigShares(const CActiveMasternodeManager& mn_activeman, const CQuorumManager& quorum_manager, - const CSigSharesNodeState::SessionInfo& session, const CBatchedSigShares& batchedSigShares, bool& retBan) -{ - retBan = false; - - if (!IsQuorumActive(session.llmqType, quorum_manager, session.quorum->qc->quorumHash)) { - // quorum is too old - return false; - } - if (!session.quorum->IsMember(mn_activeman.GetProTxHash())) { - // we're not a member so we can't verify it (we actually shouldn't have received it) - return false; - } - if (!session.quorum->HasVerificationVector()) { - // TODO we should allow to ask other nodes for the quorum vvec if we missed it in the DKG - LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- we don't have the quorum vvec for %s, no verification possible.\n", __func__, - session.quorumHash.ToString()); - return false; - } - - std::unordered_set dupMembers; - - for (const auto& [quorumMember, _] : batchedSigShares.sigShares) { - if (!dupMembers.emplace(quorumMember).second) { - retBan = true; - return false; - } - - if (quorumMember >= session.quorum->members.size()) { - LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- quorumMember out of bounds\n", __func__); - retBan = true; - return false; - } - if (!session.quorum->qc->validMembers[quorumMember]) { - LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- quorumMember not valid\n", __func__); - retBan = true; - return false; - } - } return true; } @@ -672,98 +511,31 @@ bool CSigSharesManager::CollectPendingSigSharesToVerify( return more_work; } -bool CSigSharesManager::ProcessPendingSigShares() -{ - std::unordered_map> sigSharesByNodes; - std::unordered_map, CQuorumCPtr, StaticSaltedHasher> quorums; - - const size_t nMaxBatchSize{32}; - bool more_work = CollectPendingSigSharesToVerify(nMaxBatchSize, sigSharesByNodes, quorums); - if (sigSharesByNodes.empty()) { - return false; - } - - // It's ok to perform insecure batched verification here as we verify against the quorum public key shares, - // which are not craftable by individual entities, making the rogue public key attack impossible - CBLSBatchVerifier batchVerifier(false, true); - - cxxtimer::Timer prepareTimer(true); - size_t verifyCount = 0; - for (const auto& [nodeId, v] : sigSharesByNodes) { - for (const auto& sigShare : v) { - if (sigman.HasRecoveredSigForId(sigShare.getLlmqType(), sigShare.getId())) { - continue; - } - - // Materialize the signature once. Get() internally validates, so if it returns an invalid signature, - // we know it's malformed. This avoids calling Get() twice (once for IsValid(), once for PushMessage). - CBLSSignature sig = sigShare.sigShare.Get(); - // we didn't check this earlier because we use a lazy BLS signature and tried to avoid doing the expensive - // deserialization in the message thread - if (!sig.IsValid()) { - BanNode(nodeId); - // don't process any additional shares from this node - break; - } - - auto quorum = quorums.at(std::make_pair(sigShare.getLlmqType(), sigShare.getQuorumHash())); - auto pubKeyShare = quorum->GetPubKeyShare(sigShare.getQuorumMember()); - - if (!pubKeyShare.IsValid()) { - // this should really not happen (we already ensured we have the quorum vvec, - // so we should also be able to create all pubkey shares) - LogPrintf("CSigSharesManager::%s -- pubKeyShare is invalid, which should not be possible here\n", __func__); - assert(false); - } - - batchVerifier.PushMessage(nodeId, sigShare.GetKey(), sigShare.GetSignHash(), sig, pubKeyShare); - verifyCount++; - } - } - prepareTimer.stop(); - - cxxtimer::Timer verifyTimer(true); - batchVerifier.Verify(); - verifyTimer.stop(); - - LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- verified sig shares. count=%d, pt=%d, vt=%d, nodes=%d\n", __func__, verifyCount, prepareTimer.count(), verifyTimer.count(), sigSharesByNodes.size()); - - for (const auto& [nodeId, v] : sigSharesByNodes) { - if (batchVerifier.badSources.count(nodeId) != 0) { - LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- invalid sig shares from other node, banning peer=%d\n", - __func__, nodeId); - // this will also cause re-requesting of the shares that were sent by this node - BanNode(nodeId); - continue; - } - - ProcessPendingSigShares(v, quorums); - } - - return more_work; -} - // It's ensured that no duplicates are passed to this method -void CSigSharesManager::ProcessPendingSigShares( +std::vector> CSigSharesManager::ProcessPendingSigShares( const std::vector& sigSharesToProcess, const std::unordered_map, CQuorumCPtr, StaticSaltedHasher>& quorums) { cxxtimer::Timer t(true); + std::vector> recovered_sigs; for (const auto& sigShare : sigSharesToProcess) { auto quorumKey = std::make_pair(sigShare.getLlmqType(), sigShare.getQuorumHash()); - ProcessSigShare(sigShare, quorums.at(quorumKey)); + auto rs = ProcessSigShare(sigShare, quorums.at(quorumKey)); + if (rs != nullptr) { + recovered_sigs.emplace_back(std::move(rs)); + } } t.stop(); LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- processed sigShare batch. shares=%d, time=%ds\n", __func__, sigSharesToProcess.size(), t.count()); + return recovered_sigs; } // sig shares are already verified when entering this method -void CSigSharesManager::ProcessSigShare(const CSigShare& sigShare, const CQuorumCPtr& quorum) +std::shared_ptr CSigSharesManager::ProcessSigShare(const CSigShare& sigShare, const CQuorumCPtr& quorum) { auto llmqType = quorum->params.type; - bool canTryRecovery = false; const bool isAllMembersConnectedEnabled = IsAllMembersConnectedEnabled(llmqType, m_sporkman); @@ -775,14 +547,15 @@ void CSigSharesManager::ProcessSigShare(const CSigShare& sigShare, const CQuorum } if (sigman.HasRecoveredSigForId(llmqType, sigShare.getId())) { - return; + return nullptr; } + bool canTryRecovery = false; { LOCK(cs); if (!sigShares.Add(sigShare.GetKey(), sigShare)) { - return; + return nullptr; } if (!isAllMembersConnectedEnabled) { sigSharesQueuedToAnnounce.Add(sigShare.GetKey(), true); @@ -806,24 +579,9 @@ void CSigSharesManager::ProcessSigShare(const CSigShare& sigShare, const CQuorum canTryRecovery = true; } } + if (!canTryRecovery) return nullptr; - if (canTryRecovery) { - auto rs = TryRecoverSig(*quorum, sigShare.getId(), sigShare.getMsgHash()); - if (rs != nullptr) { - if (sigman.ProcessRecoveredSig(rs)) { - // TODO: remove duplicated code with NetSigning - auto listeners = sigman.GetListeners(); - for (auto& l : listeners) { - m_peerman.PostProcessMessage(l->HandleNewRecoveredSig(*rs)); - } - - bool proactive_relay = rs->getLlmqType() != Consensus::LLMQType::LLMQ_100_67 && - rs->getLlmqType() != Consensus::LLMQType::LLMQ_400_60 && - rs->getLlmqType() != Consensus::LLMQType::LLMQ_400_85; - GetMainSignals().NotifyRecoveredSig(rs, rs->GetHash().ToString(), proactive_relay); - } - } - } + return TryRecoverSig(*quorum, sigShare.getId(), sigShare.getMsgHash()); } std::shared_ptr CSigSharesManager::TryRecoverSig(const CQuorum& quorum, const uint256& id, @@ -942,7 +700,7 @@ bool CSigSharesManager::AsyncSignIfMember(Consensus::LLMQType llmqType, CSigning // the quorum list and no recovered signature has been created in the mean time const auto& llmq_params_opt = Params().GetLLMQ(llmqType); assert(llmq_params_opt.has_value()); - return SelectQuorumForSigning(llmq_params_opt.value(), m_chainstate.m_chain, qman, id); + return SelectQuorumForSigning(llmq_params_opt.value(), m_chainman.ActiveChain(), qman, id); } else { return qman.GetQuorum(llmqType, quorumHash); } @@ -1004,11 +762,6 @@ bool CSigSharesManager::AsyncSignIfMember(Consensus::LLMQType llmqType, CSigning return true; } -void CSigSharesManager::NotifyRecoveredSig(const std::shared_ptr& sig, bool proactive_relay) const -{ - m_peerman.RelayRecoveredSig(*Assert(sig), proactive_relay); -} - void CSigSharesManager::CollectSigSharesToRequest(std::unordered_map>& sigSharesToRequest) { AssertLockHeld(cs); @@ -1343,7 +1096,7 @@ bool CSigSharesManager::SendMessages() LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::SendMessages -- QGETSIGSHARES signHash=%s, inv={%s}, node=%d\n", signHash.ToString(), inv.ToString(), pnode->GetId()); msgs.emplace_back(inv); - if (msgs.size() == MAX_MSGS_CNT_QGETSIGSHARES) { + if (msgs.size() == MAX_MSGS_CNT_QSIGSHARES) { m_connman.PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs)); msgs.clear(); didSend = true; @@ -1384,7 +1137,7 @@ bool CSigSharesManager::SendMessages() LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::SendMessages -- QSIGSHARESINV signHash=%s, inv={%s}, node=%d\n", signHash.ToString(), inv.ToString(), pnode->GetId()); msgs.emplace_back(inv); - if (msgs.size() == MAX_MSGS_CNT_QSIGSHARESINV) { + if (msgs.size() == MAX_MSGS_CNT_QSIGSHARES) { m_connman.PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs)); msgs.clear(); didSend = true; @@ -1583,13 +1336,11 @@ void CSigSharesManager::RemoveSigSharesForSession(const uint256& signHash) timeSeenForSessions.erase(signHash); } -void CSigSharesManager::RemoveBannedNodeStates() +void CSigSharesManager::RemoveNodesIf(std::function predicate) { - // Called regularly to cleanup local node states for banned nodes - LOCK(cs); for (auto it = nodeStates.begin(); it != nodeStates.end();) { - if (m_peerman.IsBanned(it->first)) { + if (predicate(it->first)) { // re-request sigshares from other nodes // TODO: remove NO_THREAD_SAFETY_ANALYSIS // using here template ForEach makes impossible to use lock annotation @@ -1604,14 +1355,12 @@ void CSigSharesManager::RemoveBannedNodeStates() } } -void CSigSharesManager::BanNode(NodeId nodeId) +void CSigSharesManager::MarkAsBanned(NodeId nodeId) { if (nodeId == -1) { return; } - m_peerman.Misbehaving(nodeId, 100); - LOCK(cs); auto it = nodeStates.find(nodeId); if (it == nodeStates.end()) { @@ -1630,88 +1379,32 @@ void CSigSharesManager::BanNode(NodeId nodeId) nodeState.banned = true; } -void CSigSharesManager::HousekeepingThreadMain() -{ - while (!workInterrupt) { - RemoveBannedNodeStates(); - SendMessages(); - Cleanup(); - - workInterrupt.sleep_for(std::chrono::milliseconds(100)); - } -} - -void CSigSharesManager::WorkDispatcherThreadMain() -{ - while (!workInterrupt) { - // Dispatch all pending signs (individual tasks) - DispatchPendingSigns(); - - // If there's processing work, spawn a helper worker - DispatchPendingProcessing(); - - // Always sleep briefly between checks - workInterrupt.sleep_for(std::chrono::milliseconds(10)); - } -} - -void CSigSharesManager::DispatchPendingSigns() +std::vector CSigSharesManager::DispatchPendingSigns() { // Swap out entire vector to avoid lock thrashing std::vector signs; - { - LOCK(cs_pendingSigns); - signs.swap(pendingSigns); - } - // Dispatch all signs to worker pool - for (auto& work : signs) { - if (workInterrupt) break; + LOCK(cs_pendingSigns); + signs.swap(pendingSigns); - workerPool.push([this, work = std::move(work)](int) mutable { - SignAndProcessSingleShare(std::move(work)); - }); - } + return signs; } -void CSigSharesManager::DispatchPendingProcessing() +bool CSigSharesManager::IsAnyPendingProcessing() const { + LOCK(cs); // Check if there's work, spawn a helper if so - bool hasWork = false; - { - LOCK(cs); - hasWork = std::any_of(nodeStates.begin(), nodeStates.end(), - [](const auto& entry) { - return !entry.second.pendingIncomingSigShares.Empty(); - }); - } - - if (hasWork) { - // Work exists - spawn a worker to help! - workerPool.push([this](int) { - ProcessPendingSigSharesLoop(); - }); - } -} - -void CSigSharesManager::ProcessPendingSigSharesLoop() -{ - while (!workInterrupt) { - bool moreWork = ProcessPendingSigShares(); - - if (!moreWork) { - return; // No work found, exit immediately - } - } + return std::any_of(nodeStates.begin(), nodeStates.end(), + [](const auto& entry) { return !entry.second.pendingIncomingSigShares.Empty(); }); } -void CSigSharesManager::SignAndProcessSingleShare(PendingSignatureData work) +std::shared_ptr CSigSharesManager::SignAndProcessSingleShare(PendingSignatureData work) { auto opt_sigShare = CreateSigShare(*work.quorum, work.id, work.msgHash); if (opt_sigShare.has_value() && opt_sigShare->sigShare.Get().IsValid()) { auto& sigShare = *opt_sigShare; - ProcessSigShare(sigShare, work.quorum); + auto rs = ProcessSigShare(sigShare, work.quorum); if (IsAllMembersConnectedEnabled(work.quorum->params.type, m_sporkman)) { LOCK(cs); @@ -1721,7 +1414,9 @@ void CSigSharesManager::SignAndProcessSingleShare(PendingSignatureData work) session.nextAttemptTime = 0; session.attempt = 0; } + return rs; } + return nullptr; } void CSigSharesManager::AsyncSign(CQuorumCPtr quorum, const uint256& id, const uint256& msgHash) diff --git a/src/llmq/signing_shares.h b/src/llmq/signing_shares.h index f540807a099f..b232cccb3f20 100644 --- a/src/llmq/signing_shares.h +++ b/src/llmq/signing_shares.h @@ -6,36 +6,33 @@ #define BITCOIN_LLMQ_SIGNING_SHARES_H #include -#include #include #include #include -#include #include #include #include #include #include -#include #include #include +#include #include #include #include #include -#include #include #include #include class CActiveMasternodeManager; +class ChainstateManager; class CNode; class CConnman; -class CDeterministicMN; class CSporkManager; -class PeerManager; +struct MessageProcessingResult; namespace llmq { @@ -362,29 +359,38 @@ class CSignedSession int attempt{0}; }; +struct PendingSignatureData { + const CQuorumCPtr quorum; + const uint256 id; + const uint256 msgHash; + + PendingSignatureData(CQuorumCPtr quorum, const uint256& id, const uint256& msgHash) : + quorum(std::move(quorum)), + id(id), + msgHash(msgHash) + { + } +}; + class CSigSharesManager : public llmq::CRecoveredSigsListener { private: static constexpr int64_t SESSION_NEW_SHARES_TIMEOUT{60}; static constexpr int64_t SIG_SHARE_REQUEST_TIMEOUT{5}; +public: // we try to keep total message size below 10k static constexpr size_t MAX_MSGS_CNT_QSIGSESANN{100}; - static constexpr size_t MAX_MSGS_CNT_QGETSIGSHARES{200}; - static constexpr size_t MAX_MSGS_CNT_QSIGSHARESINV{200}; + static constexpr size_t MAX_MSGS_CNT_QSIGSHARES{200}; // 400 is the maximum quorum size, so this is also the maximum number of sigs we need to support static constexpr size_t MAX_MSGS_TOTAL_BATCHED_SIGS{400}; + static constexpr size_t MAX_MSGS_SIG_SHARES{32}; +private: static constexpr int64_t EXP_SEND_FOR_RECOVERY_TIMEOUT{2000}; static constexpr int64_t MAX_SEND_FOR_RECOVERY_TIMEOUT{10000}; - static constexpr size_t MAX_MSGS_SIG_SHARES{32}; - Mutex cs; - - mutable ctpl::thread_pool workerPool; - std::thread housekeepingThread; - std::thread dispatcherThread; - CThreadInterrupt workInterrupt; + mutable Mutex cs; SigShareMap sigShares GUARDED_BY(cs); Uint256HashMap signedSessions GUARDED_BY(cs); @@ -396,23 +402,14 @@ class CSigSharesManager : public llmq::CRecoveredSigsListener SigShareMap> sigSharesRequested GUARDED_BY(cs); SigShareMap sigSharesQueuedToAnnounce GUARDED_BY(cs); - struct PendingSignatureData { - const CQuorumCPtr quorum; - const uint256 id; - const uint256 msgHash; - - PendingSignatureData(CQuorumCPtr quorum, const uint256& id, const uint256& msgHash) : quorum(std::move(quorum)), id(id), msgHash(msgHash){} - }; - Mutex cs_pendingSigns; std::vector pendingSigns GUARDED_BY(cs_pendingSigns); FastRandomContext rnd GUARDED_BY(cs); CConnman& m_connman; - CChainState& m_chainstate; + const ChainstateManager& m_chainman; CSigningManager& sigman; - PeerManager& m_peerman; const CActiveMasternodeManager& m_mn_activeman; const CQuorumManager& qman; const CSporkManager& m_sporkman; @@ -424,18 +421,13 @@ class CSigSharesManager : public llmq::CRecoveredSigsListener CSigSharesManager() = delete; CSigSharesManager(const CSigSharesManager&) = delete; CSigSharesManager& operator=(const CSigSharesManager&) = delete; - explicit CSigSharesManager(CConnman& connman, CChainState& chainstate, CSigningManager& _sigman, - PeerManager& peerman, const CActiveMasternodeManager& mn_activeman, - const CQuorumManager& _qman, const CSporkManager& sporkman); + explicit CSigSharesManager(CConnman& connman, const ChainstateManager& chainman, CSigningManager& _sigman, + const CActiveMasternodeManager& mn_activeman, const CQuorumManager& _qman, + const CSporkManager& sporkman); ~CSigSharesManager() override; - void Start() EXCLUSIVE_LOCKS_REQUIRED(!cs); - void Stop() EXCLUSIVE_LOCKS_REQUIRED(!cs); void RegisterRecoveryInterface() EXCLUSIVE_LOCKS_REQUIRED(!cs); void UnregisterRecoveryInterface() EXCLUSIVE_LOCKS_REQUIRED(!cs); - void InterruptWorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!cs); - - void ProcessMessage(const CNode& pnode, const std::string& msg_type, CDataStream& vRecv) EXCLUSIVE_LOCKS_REQUIRED(!cs); void AsyncSign(CQuorumCPtr quorum, const uint256& id, const uint256& msgHash) EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns, !cs); @@ -452,50 +444,48 @@ class CSigSharesManager : public llmq::CRecoveredSigsListener bool AsyncSignIfMember(Consensus::LLMQType llmqType, CSigningManager& sigman, const uint256& id, const uint256& msgHash, const uint256& quorumHash = uint256(), bool allowReSign = false, bool allowDiffMsgHashSigning = false) EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns, !cs); - - void NotifyRecoveredSig(const std::shared_ptr& sig, bool proactive_relay) const EXCLUSIVE_LOCKS_REQUIRED(!cs); - private: std::optional CreateSigShareForSingleMember(const CQuorum& quorum, const uint256& id, const uint256& msgHash) const; - // all of these return false when the currently processed message should be aborted (as each message actually contains multiple messages) +public: + // all of these return false when the currently processed message should be aborted (as each message actually contains multiple messages) and ban node bool ProcessMessageSigSesAnn(const CNode& pfrom, const CSigSesAnn& ann) EXCLUSIVE_LOCKS_REQUIRED(!cs); - bool ProcessMessageSigSharesInv(const CNode& pfrom, const CSigSharesInv& inv) EXCLUSIVE_LOCKS_REQUIRED(!cs); - bool ProcessMessageGetSigShares(const CNode& pfrom, const CSigSharesInv& inv) EXCLUSIVE_LOCKS_REQUIRED(!cs); + bool ProcessMessageSigShares(const CNode& pfrom, const CSigSharesInv& inv, const std::string& msg_type) + EXCLUSIVE_LOCKS_REQUIRED(!cs); bool ProcessMessageBatchedSigShares(const CNode& pfrom, const CBatchedSigShares& batchedSigShares) EXCLUSIVE_LOCKS_REQUIRED(!cs); - void ProcessMessageSigShare(NodeId fromId, const CSigShare& sigShare) EXCLUSIVE_LOCKS_REQUIRED(!cs); - static bool VerifySigSharesInv(Consensus::LLMQType llmqType, const CSigSharesInv& inv); - static bool PreVerifyBatchedSigShares(const CActiveMasternodeManager& mn_activeman, const CQuorumManager& quorum_manager, - const CSigSharesNodeState::SessionInfo& session, const CBatchedSigShares& batchedSigShares, bool& retBan); + // if ProcessMessageSigShare returns false the node should be banned + bool ProcessMessageSigShare(NodeId fromId, const CSigShare& sigShare) EXCLUSIVE_LOCKS_REQUIRED(!cs); + // CollectPendingSigSharesToVerify returns true if there's more work to do bool CollectPendingSigSharesToVerify( size_t maxUniqueSessions, std::unordered_map>& retSigShares, std::unordered_map, CQuorumCPtr, StaticSaltedHasher>& retQuorums) EXCLUSIVE_LOCKS_REQUIRED(!cs); - bool ProcessPendingSigShares() EXCLUSIVE_LOCKS_REQUIRED(!cs); - void ProcessPendingSigShares( + std::vector> ProcessPendingSigShares( const std::vector& sigSharesToProcess, const std::unordered_map, CQuorumCPtr, StaticSaltedHasher>& quorums) EXCLUSIVE_LOCKS_REQUIRED(!cs); - void ProcessSigShare(const CSigShare& sigShare, const CQuorumCPtr& quorum) EXCLUSIVE_LOCKS_REQUIRED(!cs); - std::shared_ptr TryRecoverSig(const CQuorum& quorum, const uint256& id, const uint256& msgHash) +private: + [[nodiscard]] std::shared_ptr ProcessSigShare(const CSigShare& sigShare, const CQuorumCPtr& quorum) EXCLUSIVE_LOCKS_REQUIRED(!cs); + [[nodiscard]] std::shared_ptr TryRecoverSig(const CQuorum& quorum, const uint256& id, + const uint256& msgHash) EXCLUSIVE_LOCKS_REQUIRED(!cs); bool GetSessionInfoByRecvId(NodeId nodeId, uint32_t sessionId, CSigSharesNodeState::SessionInfo& retInfo) EXCLUSIVE_LOCKS_REQUIRED(!cs); static CSigShare RebuildSigShare(const CSigSharesNodeState::SessionInfo& session, const std::pair& in); - void Cleanup() EXCLUSIVE_LOCKS_REQUIRED(!cs); void RemoveSigSharesForSession(const uint256& signHash) EXCLUSIVE_LOCKS_REQUIRED(cs); - void RemoveBannedNodeStates() EXCLUSIVE_LOCKS_REQUIRED(!cs); - void BanNode(NodeId nodeId) EXCLUSIVE_LOCKS_REQUIRED(!cs); +public: + void RemoveNodesIf(std::function predicate) EXCLUSIVE_LOCKS_REQUIRED(!cs); + void MarkAsBanned(NodeId nodeId) EXCLUSIVE_LOCKS_REQUIRED(!cs); - bool SendMessages() EXCLUSIVE_LOCKS_REQUIRED(!cs); +private: void CollectSigSharesToRequest(std::unordered_map>& sigSharesToRequest) EXCLUSIVE_LOCKS_REQUIRED(cs); void CollectSigSharesToSend(std::unordered_map>& sigSharesToSend) @@ -504,17 +494,16 @@ class CSigSharesManager : public llmq::CRecoveredSigsListener void CollectSigSharesToAnnounce(std::unordered_map>& sigSharesToAnnounce) EXCLUSIVE_LOCKS_REQUIRED(cs); - // Thread main functions - void HousekeepingThreadMain() EXCLUSIVE_LOCKS_REQUIRED(!cs); - void WorkDispatcherThreadMain() EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns, !cs); +public: + void Cleanup() EXCLUSIVE_LOCKS_REQUIRED(!cs); + bool SendMessages() EXCLUSIVE_LOCKS_REQUIRED(!cs); // Dispatcher functions - void DispatchPendingSigns() EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns); - void DispatchPendingProcessing() EXCLUSIVE_LOCKS_REQUIRED(!cs); - + std::vector DispatchPendingSigns() EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns); // Worker pool task functions - void ProcessPendingSigSharesLoop() EXCLUSIVE_LOCKS_REQUIRED(!cs); - void SignAndProcessSingleShare(PendingSignatureData work) EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns, !cs); + bool IsAnyPendingProcessing() const EXCLUSIVE_LOCKS_REQUIRED(!cs); + [[nodiscard]] std::shared_ptr SignAndProcessSingleShare(PendingSignatureData work) + EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns, !cs); }; } // namespace llmq diff --git a/src/net_processing.cpp b/src/net_processing.cpp index e5b83dc0f88f..b77863dff536 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -57,8 +57,8 @@ #include #include #include +#include #include -#include #include #include #include @@ -651,12 +651,14 @@ class PeerManagerImpl final : public PeerManager /** Implement PeerManagerInternal */ void PeerMisbehaving(const NodeId pnode, const int howmuch, const std::string& message = "") override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + bool PeerIsBanned(const NodeId node_id) override EXCLUSIVE_LOCKS_REQUIRED(cs_main, !m_peer_mutex); void PeerEraseObjectRequest(const NodeId nodeid, const CInv& inv) override EXCLUSIVE_LOCKS_REQUIRED(::cs_main); void PeerRelayInv(const CInv& inv) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void PeerRelayInvFiltered(const CInv& inv, const CTransaction& relatedTx) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void PeerRelayInvFiltered(const CInv& inv, const uint256& relatedTxHash) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void PeerRelayDSQ(const CCoinJoinQueue& queue) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void PeerRelayTransaction(const uint256& txid) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + void PeerRelayRecoveredSig(const llmq::CRecoveredSig& sig, bool proactive_relay) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void PeerAskPeersForTransaction(const uint256& txid) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); size_t PeerGetRequestedObjectCount(NodeId nodeid) const override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, ::cs_main); void PeerPostProcessMessage(MessageProcessingResult&& ret) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); @@ -2555,7 +2557,8 @@ void PeerManagerImpl::_RelayTransaction(const uint256& txid) }; } -void PeerManagerImpl::RelayRecoveredSig(const llmq::CRecoveredSig& sig, bool proactive_relay) { +void PeerManagerImpl::RelayRecoveredSig(const llmq::CRecoveredSig& sig, bool proactive_relay) +{ if (proactive_relay) { // We were the peer that recovered this; avoid a bunch of `inv` -> `GetData` spam by proactively sending m_connman.ForEachNode([this, &sig](CNode* pnode) -> bool { @@ -5455,7 +5458,6 @@ void PeerManagerImpl::ProcessMessage( } if (m_active_ctx) { assert(is_masternode); - m_active_ctx->shareman->ProcessMessage(pfrom, msg_type, vRecv); PostProcessMessage(m_active_ctx->qdkgsman->ProcessMessage(pfrom, is_masternode, msg_type, vRecv), pfrom.GetId()); } if (m_observer_ctx) { @@ -6578,6 +6580,11 @@ void PeerManagerImpl::PeerMisbehaving(const NodeId pnode, const int howmuch, con Misbehaving(pnode, howmuch, message); } +bool PeerManagerImpl::PeerIsBanned(const NodeId node_id) +{ + return IsBanned(node_id); +} + void PeerManagerImpl::PeerEraseObjectRequest(const NodeId nodeid, const CInv& inv) { EraseObjectRequest(nodeid, inv); @@ -6622,3 +6629,8 @@ void PeerManagerImpl::PeerPostProcessMessage(MessageProcessingResult&& ret) { PostProcessMessage(std::move(ret), /*node=*/-1); } + +void PeerManagerImpl::PeerRelayRecoveredSig(const llmq::CRecoveredSig& sig, bool proactive_relay) +{ + RelayRecoveredSig(sig, proactive_relay); +} diff --git a/src/net_processing.h b/src/net_processing.h index 54795ab803c0..9c85846d0d1c 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -62,12 +62,14 @@ class PeerManagerInternal { public: virtual void PeerMisbehaving(const NodeId pnode, const int howmuch, const std::string& message = "") = 0; + virtual bool PeerIsBanned(const NodeId node_id) = 0; virtual void PeerEraseObjectRequest(const NodeId nodeid, const CInv& inv) = 0; virtual void PeerRelayInv(const CInv& inv) = 0; virtual void PeerRelayInvFiltered(const CInv& inv, const CTransaction& relatedTx) = 0; virtual void PeerRelayInvFiltered(const CInv& inv, const uint256& relatedTxHash) = 0; virtual void PeerRelayTransaction(const uint256& txid) = 0; virtual void PeerRelayDSQ(const CCoinJoinQueue& queue) = 0; + virtual void PeerRelayRecoveredSig(const llmq::CRecoveredSig& sig, bool proactive_relay) = 0; virtual void PeerAskPeersForTransaction(const uint256& txid) = 0; virtual size_t PeerGetRequestedObjectCount(NodeId nodeid) const = 0; virtual void PeerPostProcessMessage(MessageProcessingResult&& ret) = 0; diff --git a/test/lint/lint-circular-dependencies.py b/test/lint/lint-circular-dependencies.py index f8be6bdee67f..14f06740fd1c 100755 --- a/test/lint/lint-circular-dependencies.py +++ b/test/lint/lint-circular-dependencies.py @@ -21,18 +21,15 @@ "wallet/wallet -> wallet/walletdb -> wallet/wallet", "kernel/coinstats -> validation -> kernel/coinstats", # Dash - "active/context -> llmq/signing_shares -> net_processing -> active/context", + "active/context -> active/dkgsessionhandler -> llmq/dkgsessionhandler -> net_processing -> active/context", "banman -> common/bloom -> evo/assetlocktx -> llmq/quorumsman -> net -> banman", "chainlock/chainlock -> instantsend/instantsend -> chainlock/chainlock", - "chainlock/chainlock -> chainlock/signing -> llmq/signing_shares -> net_processing -> chainlock/chainlock", - "chainlock/chainlock -> chainlock/signing -> llmq/signing_shares -> net_processing -> llmq/context -> chainlock/chainlock", "chainlock/chainlock -> instantsend/instantsend -> instantsend/signing -> chainlock/chainlock", "chainlock/chainlock -> llmq/quorumsman -> msg_result -> coinjoin/coinjoin -> chainlock/chainlock", "chainlock/chainlock -> validation -> chainlock/chainlock", "chainlock/chainlock -> validation -> evo/chainhelper -> chainlock/chainlock", "coinjoin/coinjoin -> instantsend/instantsend -> spork -> msg_result -> coinjoin/coinjoin", - "coinjoin/coinjoin -> instantsend/instantsend -> instantsend/signing -> llmq/signing_shares -> net_processing -> coinjoin/coinjoin", - "coinjoin/client -> coinjoin/coinjoin -> instantsend/instantsend -> instantsend/signing -> llmq/signing_shares -> net_processing -> coinjoin/walletman -> coinjoin/client", + "coinjoin/client -> coinjoin/util -> wallet/wallet -> psbt -> node/transaction -> net_processing -> coinjoin/walletman -> coinjoin/client", "common/bloom -> evo/assetlocktx -> llmq/commitment -> evo/deterministicmns -> evo/simplifiedmns -> merkleblock -> common/bloom", "common/bloom -> evo/assetlocktx -> llmq/quorumsman -> net -> common/bloom", "consensus/tx_verify -> evo/assetlocktx -> llmq/commitment -> validation -> consensus/tx_verify", @@ -48,13 +45,10 @@ "evo/specialtxman -> validation -> evo/specialtxman", "governance/classes -> governance/object -> governance/governance -> governance/classes", "governance/governance -> governance/signing -> governance/object -> governance/governance", - "instantsend/instantsend -> instantsend/signing -> llmq/signing_shares -> net_processing -> instantsend/instantsend", - "instantsend/instantsend -> instantsend/signing -> llmq/signing_shares -> net_processing -> llmq/context -> instantsend/instantsend", "instantsend/instantsend -> txmempool -> instantsend/instantsend", "llmq/blockprocessor -> llmq/utils -> llmq/snapshot -> llmq/blockprocessor", "llmq/commitment -> llmq/utils -> llmq/snapshot -> llmq/commitment", "llmq/dkgsessionhandler -> net_processing -> llmq/dkgsessionmgr -> llmq/dkgsessionhandler", - "llmq/signing_shares -> net_processing -> llmq/signing_shares", "masternode/payments -> validation -> masternode/payments", "net -> netmessagemaker -> net", "netaddress -> netbase -> netaddress",