diff --git a/CMakeLists.txt b/CMakeLists.txt index f905e265..6e4125e4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -18,7 +18,6 @@ add_subdirectory(src/killswitch) add_executable(hpcore src/util/version.cpp src/util/util.cpp - src/util/rollover_hashset.cpp src/util/ttl_set.cpp src/util/buffer_store.cpp src/util/merkle_hash_tree.cpp diff --git a/src/consensus.cpp b/src/consensus.cpp index 4efb7ee2..ff5da358 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -1,6 +1,6 @@ #include "pchheader.hpp" #include "conf.hpp" -#include "util/rollover_hashset.hpp" +#include "util/bloom_filter.hpp" #include "usr/usr.hpp" #include "usr/user_input.hpp" #include "p2p/p2p.hpp" diff --git a/src/p2p/peer_comm_session.cpp b/src/p2p/peer_comm_session.cpp index 95d7b54c..35c737d4 100644 --- a/src/p2p/peer_comm_session.cpp +++ b/src/p2p/peer_comm_session.cpp @@ -1,5 +1,5 @@ #include "../pchheader.hpp" -#include "../util/rollover_hashset.hpp" +#include "../util/bloom_filter.hpp" #include "../msg/fbuf/p2pmsg_generated.h" #include "../msg/fbuf/p2pmsg_conversion.hpp" #include "../msg/fbuf/common_helpers.hpp" @@ -13,10 +13,7 @@ namespace p2pmsg = msg::fbuf::p2pmsg; namespace p2p { - // The set of recent peer message hashes used for duplicate detection. - util::rollover_hashset recent_peermsg_hashes(200); - - /** + /** * This gets hit every time a peer connects to HP via the peer port (configured in config). * @param session connected session. * @return returns 0 if connection is successful and peer challenge is sent otherwise, -1. @@ -334,4 +331,4 @@ namespace p2p p2p::send_peer_requirement_announcement(true, this); } -} // namespace p2p \ No newline at end of file +} // namespace p2p diff --git a/src/p2p/self_node.cpp b/src/p2p/self_node.cpp index fd9653e0..69ff12e9 100644 --- a/src/p2p/self_node.cpp +++ b/src/p2p/self_node.cpp @@ -4,7 +4,7 @@ #include "../msg/fbuf/p2pmsg_generated.h" #include "../msg/fbuf/p2pmsg_conversion.hpp" #include "../msg/fbuf/common_helpers.hpp" -#include "../util/rollover_hashset.hpp" +#include "../util/bloom_filter.hpp" #include "../crypto.hpp" namespace p2pmsg = msg::fbuf::p2pmsg; @@ -16,9 +16,6 @@ namespace p2p::self std::optional ip_port; - // The set of recent self message hashes used for duplicate detection. - util::rollover_hashset recent_selfmsg_hashes(200); - /** * Processes the next queued message (if any). * @return 0 if no messages in queue. 1 if message was processed successfully. -1 on error. @@ -64,4 +61,4 @@ namespace p2p::self msg_queue.enqueue(std::string(message)); } -} // namespace p2p::self \ No newline at end of file +} // namespace p2p::self diff --git a/src/usr/usr.hpp b/src/usr/usr.hpp index 6e9ceef3..ab96fda0 100644 --- a/src/usr/usr.hpp +++ b/src/usr/usr.hpp @@ -4,7 +4,6 @@ #include "../pchheader.hpp" #include "../util/util.hpp" #include "../util/h32.hpp" -#include "../util/rollover_hashset.hpp" #include "../util/buffer_store.hpp" #include "../msg/usrmsg_parser.hpp" #include "user_comm_session.hpp" @@ -112,4 +111,4 @@ namespace usr } // namespace usr -#endif \ No newline at end of file +#endif diff --git a/src/util/bloom_filter.hpp b/src/util/bloom_filter.hpp new file mode 100644 index 00000000..1693ba7b --- /dev/null +++ b/src/util/bloom_filter.hpp @@ -0,0 +1,173 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace util { + +/** + * Lock-free Bloom filter implementation. + * Each filter uses 16MB (134,217,728 bits) with 4 hash functions. + * Handles ~13 million items at 1% false positive rate. + */ +class bloom_filter_impl { + static constexpr size_t BITS = 134217728; // 16MB = 128M bits + static constexpr size_t K = 4; // Number of hash functions + static constexpr size_t BITS_PER_WORD = 64; + static constexpr size_t NUM_WORDS = BITS / BITS_PER_WORD; + + std::array, NUM_WORDS> bits; + + // MurmurHash3 mix function + static uint64_t murmur_mix(uint64_t h) { + h ^= h >> 33; + h *= 0xff51afd7ed558ccd; + h ^= h >> 33; + h *= 0xc4ceb9fe1a85ec53; + h ^= h >> 33; + return h; + } + + std::array get_positions(const std::string& data) const { + std::array positions; + uint64_t h1 = std::hash{}(data); + uint64_t h2 = murmur_mix(h1); + + for (size_t i = 0; i < K; ++i) { + uint64_t hash = h1 + i * h2; + positions[i] = hash % BITS; + } + + return positions; + } + +public: + bloom_filter_impl() { + clear(); + } + + void clear() { + for (auto& word : bits) { + word.store(0, std::memory_order_relaxed); + } + } + + // Returns true if successfully inserted (was new), false if might already exist + bool try_insert(const std::string& data) { + auto positions = get_positions(data); + bool was_new = false; + + for (size_t pos : positions) { + size_t word_idx = pos / BITS_PER_WORD; + size_t bit_idx = pos % BITS_PER_WORD; + uint64_t mask = uint64_t(1) << bit_idx; + + uint64_t prev = bits[word_idx].fetch_or(mask, std::memory_order_relaxed); + + if ((prev & mask) == 0) { + was_new = true; + } + } + + return was_new; + } + + // Check if item might exist (read-only) + bool might_contain(const std::string& data) const { + auto positions = get_positions(data); + + for (size_t pos : positions) { + size_t word_idx = pos / BITS_PER_WORD; + size_t bit_idx = pos % BITS_PER_WORD; + uint64_t mask = uint64_t(1) << bit_idx; + + if ((bits[word_idx].load(std::memory_order_relaxed) & mask) == 0) { + return false; + } + } + + return true; + } +}; + +/** + * Rolling bloom filter using double buffering. + * Total size: 32MB (2 x 16MB filters) + * Automatically rotates filters every 5 minutes. + */ +class bloom_filter { + static constexpr int64_t ROTATION_INTERVAL_MS = 300000; // 5 minutes in milliseconds + + bloom_filter_impl filter1; + bloom_filter_impl filter2; + std::atomic active_filter{1}; // 1 or 2 + std::atomic last_rotation_time; + + int64_t current_time_ms() const { + return std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch() + ).count(); + } + + void check_rotation() { + int64_t now = current_time_ms(); + int64_t last_rotation = last_rotation_time.load(std::memory_order_relaxed); + + if (now - last_rotation >= ROTATION_INTERVAL_MS) { + // Try to update the rotation time atomically + if (last_rotation_time.compare_exchange_strong(last_rotation, now, + std::memory_order_relaxed)) { + // We won the race to rotate + int current = active_filter.load(std::memory_order_relaxed); + int next = (current == 1) ? 2 : 1; + + // Clear the filter that will become active next rotation + if (next == 1) { + filter1.clear(); + } else { + filter2.clear(); + } + + // Switch active filter + active_filter.store(next, std::memory_order_relaxed); + } + } + } + +public: + bloom_filter() : last_rotation_time(current_time_ms()) { + // Both filters start clear + } + + // Returns true if successfully inserted (was new), false if might already exist + bool try_emplace(const std::string& data) { + check_rotation(); + + // Check both filters first + bool in_filter1 = filter1.might_contain(data); + bool in_filter2 = filter2.might_contain(data); + + if (in_filter1 || in_filter2) { + return false; // Already exists + } + + // Insert into both filters + filter1.try_insert(data); + filter2.try_insert(data); + + return true; + } +}; + +// Typedef to match rollover_hashset name +using rollover_hashset = bloom_filter; + +// Global instances for different message types +inline bloom_filter recent_peermsg_hashes; +inline bloom_filter recent_selfmsg_hashes; + +} // namespace util diff --git a/src/util/rollover_hashset.cpp b/src/util/rollover_hashset.cpp deleted file mode 100644 index d22043f4..00000000 --- a/src/util/rollover_hashset.cpp +++ /dev/null @@ -1,39 +0,0 @@ -#include "rollover_hashset.hpp" - -namespace util -{ - - rollover_hashset::rollover_hashset(const uint32_t maxsize) - { - this->maxsize = maxsize == 0 ? 1 : maxsize; - } - - /** - * Inserts the given hash to the list. - * @return True on succesful insertion. False if hash already exists. - */ - bool rollover_hashset::try_emplace(const std::string hash) - { - const auto itr = recent_hashes.find(hash); - if (itr == recent_hashes.end()) // Not found - { - // Add the new message hash to the set. - const auto [newitr, success] = recent_hashes.emplace(std::move(hash)); - - // Insert a pointer to the stored hash value to the back of the ordered list of hashes. - recent_hashes_list.push_back(&(*newitr)); - - // Remove oldest hash if exceeding max size. - if (recent_hashes_list.size() > maxsize) - { - const std::string &oldest_hash = *recent_hashes_list.front(); - recent_hashes.erase(oldest_hash); - recent_hashes_list.pop_front(); - } - - return true; // Hash was inserted successfuly. - } - - return false; // Hash already exists. - } -} \ No newline at end of file diff --git a/src/util/rollover_hashset.hpp b/src/util/rollover_hashset.hpp deleted file mode 100644 index a7648904..00000000 --- a/src/util/rollover_hashset.hpp +++ /dev/null @@ -1,30 +0,0 @@ -#ifndef _HP_UTIL_ROLLOVER_HASHSET_ -#define _HP_UTIL_ROLLOVER_HASHSET_ - -#include "../pchheader.hpp" - -namespace util -{ - - /** - * FIFO hash set with a max size. - */ - class rollover_hashset - { - private: - // The set of recent hashes used for duplicate detection. - std::unordered_set recent_hashes; - - // The supporting list of recent hashes used for adding and removing hashes from - // the 'recent_hashes' in a first-in-first-out manner. - std::list recent_hashes_list; - - uint32_t maxsize; - - public: - rollover_hashset(const uint32_t maxsize); - bool try_emplace(const std::string hash); - }; -} // namespace util - -#endif \ No newline at end of file