Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ add_subdirectory(src/killswitch)
add_executable(hpcore
src/util/version.cpp
src/util/util.cpp
src/util/rollover_hashset.cpp
src/util/ttl_set.cpp
src/util/buffer_store.cpp
src/util/merkle_hash_tree.cpp
Expand Down
2 changes: 1 addition & 1 deletion src/consensus.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include "pchheader.hpp"
#include "conf.hpp"
#include "util/rollover_hashset.hpp"
#include "util/bloom_filter.hpp"
#include "usr/usr.hpp"
#include "usr/user_input.hpp"
#include "p2p/p2p.hpp"
Expand Down
9 changes: 3 additions & 6 deletions src/p2p/peer_comm_session.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#include "../pchheader.hpp"
#include "../util/rollover_hashset.hpp"
#include "../util/bloom_filter.hpp"
#include "../msg/fbuf/p2pmsg_generated.h"
#include "../msg/fbuf/p2pmsg_conversion.hpp"
#include "../msg/fbuf/common_helpers.hpp"
Expand All @@ -13,10 +13,7 @@ namespace p2pmsg = msg::fbuf::p2pmsg;

namespace p2p
{
// The set of recent peer message hashes used for duplicate detection.
util::rollover_hashset recent_peermsg_hashes(200);

/**
/**
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where's the actual use of the bloom filter to deduplicate? you probably need to run two at the same time so that periodically you can clear one or the other to prevent them filling up and becoming useless

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bloom filter implementation maintains the exact same interface (try_emplace) so no other code changes are needed. The global recent_peermsg_hashes is now defined in bloom_filter.hpp and works exactly the same way.

Will look at running two bloom filters at the same time and will amend

* This gets hit every time a peer connects to HP via the peer port (configured in config).
* @param session connected session.
* @return returns 0 if connection is successful and peer challenge is sent otherwise, -1.
Expand Down Expand Up @@ -334,4 +331,4 @@ namespace p2p
p2p::send_peer_requirement_announcement(true, this);
}

} // namespace p2p
} // namespace p2p
7 changes: 2 additions & 5 deletions src/p2p/self_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include "../msg/fbuf/p2pmsg_generated.h"
#include "../msg/fbuf/p2pmsg_conversion.hpp"
#include "../msg/fbuf/common_helpers.hpp"
#include "../util/rollover_hashset.hpp"
#include "../util/bloom_filter.hpp"
#include "../crypto.hpp"

namespace p2pmsg = msg::fbuf::p2pmsg;
Expand All @@ -16,9 +16,6 @@ namespace p2p::self

std::optional<conf::peer_ip_port> ip_port;

// The set of recent self message hashes used for duplicate detection.
util::rollover_hashset recent_selfmsg_hashes(200);

/**
* Processes the next queued message (if any).
* @return 0 if no messages in queue. 1 if message was processed successfully. -1 on error.
Expand Down Expand Up @@ -64,4 +61,4 @@ namespace p2p::self
msg_queue.enqueue(std::string(message));
}

} // namespace p2p::self
} // namespace p2p::self
3 changes: 1 addition & 2 deletions src/usr/usr.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#include "../pchheader.hpp"
#include "../util/util.hpp"
#include "../util/h32.hpp"
#include "../util/rollover_hashset.hpp"
#include "../util/buffer_store.hpp"
#include "../msg/usrmsg_parser.hpp"
#include "user_comm_session.hpp"
Expand Down Expand Up @@ -112,4 +111,4 @@ namespace usr

} // namespace usr

#endif
#endif
173 changes: 173 additions & 0 deletions src/util/bloom_filter.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
#pragma once

#include <atomic>
#include <array>
#include <cstdint>
#include <functional>
#include <bit>
#include <chrono>

namespace util {

/**
* Lock-free Bloom filter implementation.
* Each filter uses 16MB (134,217,728 bits) with 4 hash functions.
* Handles ~13 million items at 1% false positive rate.
*/
class bloom_filter_impl {
static constexpr size_t BITS = 134217728; // 16MB = 128M bits
static constexpr size_t K = 4; // Number of hash functions
static constexpr size_t BITS_PER_WORD = 64;
static constexpr size_t NUM_WORDS = BITS / BITS_PER_WORD;

std::array<std::atomic<uint64_t>, NUM_WORDS> bits;

// MurmurHash3 mix function
static uint64_t murmur_mix(uint64_t h) {
h ^= h >> 33;
h *= 0xff51afd7ed558ccd;
h ^= h >> 33;
h *= 0xc4ceb9fe1a85ec53;
h ^= h >> 33;
return h;
}

std::array<size_t, K> get_positions(const std::string& data) const {
std::array<size_t, K> positions;
uint64_t h1 = std::hash<std::string>{}(data);
uint64_t h2 = murmur_mix(h1);

for (size_t i = 0; i < K; ++i) {
uint64_t hash = h1 + i * h2;
positions[i] = hash % BITS;
}

return positions;
}

public:
bloom_filter_impl() {
clear();
}

void clear() {
for (auto& word : bits) {
word.store(0, std::memory_order_relaxed);
}
}

// Returns true if successfully inserted (was new), false if might already exist
bool try_insert(const std::string& data) {
auto positions = get_positions(data);
bool was_new = false;

for (size_t pos : positions) {
size_t word_idx = pos / BITS_PER_WORD;
size_t bit_idx = pos % BITS_PER_WORD;
uint64_t mask = uint64_t(1) << bit_idx;

uint64_t prev = bits[word_idx].fetch_or(mask, std::memory_order_relaxed);

if ((prev & mask) == 0) {
was_new = true;
}
}

return was_new;
}

// Check if item might exist (read-only)
bool might_contain(const std::string& data) const {
auto positions = get_positions(data);

for (size_t pos : positions) {
size_t word_idx = pos / BITS_PER_WORD;
size_t bit_idx = pos % BITS_PER_WORD;
uint64_t mask = uint64_t(1) << bit_idx;

if ((bits[word_idx].load(std::memory_order_relaxed) & mask) == 0) {
return false;
}
}

return true;
}
};

/**
* Rolling bloom filter using double buffering.
* Total size: 32MB (2 x 16MB filters)
* Automatically rotates filters every 5 minutes.
*/
class bloom_filter {
static constexpr int64_t ROTATION_INTERVAL_MS = 300000; // 5 minutes in milliseconds

bloom_filter_impl filter1;
bloom_filter_impl filter2;
std::atomic<int> active_filter{1}; // 1 or 2
std::atomic<int64_t> last_rotation_time;

int64_t current_time_ms() const {
return std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch()
).count();
}

void check_rotation() {
int64_t now = current_time_ms();
int64_t last_rotation = last_rotation_time.load(std::memory_order_relaxed);

if (now - last_rotation >= ROTATION_INTERVAL_MS) {
// Try to update the rotation time atomically
if (last_rotation_time.compare_exchange_strong(last_rotation, now,
std::memory_order_relaxed)) {
// We won the race to rotate
int current = active_filter.load(std::memory_order_relaxed);
int next = (current == 1) ? 2 : 1;

// Clear the filter that will become active next rotation
if (next == 1) {
filter1.clear();
} else {
filter2.clear();
}

// Switch active filter
active_filter.store(next, std::memory_order_relaxed);
}
}
}

public:
bloom_filter() : last_rotation_time(current_time_ms()) {
// Both filters start clear
}

// Returns true if successfully inserted (was new), false if might already exist
bool try_emplace(const std::string& data) {
check_rotation();

// Check both filters first
bool in_filter1 = filter1.might_contain(data);
bool in_filter2 = filter2.might_contain(data);

if (in_filter1 || in_filter2) {
return false; // Already exists
}

// Insert into both filters
filter1.try_insert(data);
filter2.try_insert(data);

return true;
}
};

// Typedef to match rollover_hashset name
using rollover_hashset = bloom_filter;

// Global instances for different message types
inline bloom_filter recent_peermsg_hashes;
inline bloom_filter recent_selfmsg_hashes;

} // namespace util
39 changes: 0 additions & 39 deletions src/util/rollover_hashset.cpp

This file was deleted.

30 changes: 0 additions & 30 deletions src/util/rollover_hashset.hpp

This file was deleted.