Skip to content
Merged
41 changes: 22 additions & 19 deletions docs/performance/SQLITE_COMPARISON.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,34 +45,37 @@ We addressed the gaps via the following optimizations:
Distributed shuffle joins send **all tuples** across the network to partitioned nodes, even when many will never match. This causes unnecessary network traffic and buffer memory usage.

### Solution: Bloom Filter Integration
Implemented bloom filters to filter tuples at the source before network transmission:
- **One-sided bloom filter**: Built from the left/build table, applied to filter the right/probe table
- **Distributed construction**: Each data node constructs its local bloom during the left/build scan phase
- **Coordinator coordination**: `BloomFilterPush` RPC broadcasts filter metadata to all nodes before the right/probe shuffle
Implemented bloom filters to filter tuples at the source before network transmission using a 3-phase approach:
- **Phase 1 (Local Build)**: Each data node scans its local left/build table partition, extracts join key values, and builds a local bloom filter
- **Phase 2 (Bit Aggregation)**: Coordinator sends `BloomFilterBits` RPC to each data node; each responds with local bloom bits; coordinator OR-aggregates all bits into a single filter
- **Phase 3 (Sender-Side Filter)**: Coordinator broadcasts aggregated filter via `BloomFilterPush` RPC; before sending right/probe tuples, `ShuffleFragment` handler checks `might_contain()` and skips tuples that will definitely not match

### Architecture
```
[Phase 1: Shuffle Left] [Phase 2: Shuffle Right]
| |
v v
Build local bloom Apply bloom filter
from join keys before buffering
| |
+---- BloomFilterPush ----->---+
(filter metadata) |
v
Filtered tuples buffered
Phase 1: Scan Left Phase 2: Aggregate Bits Phase 3: Filter Right
| | |
v v v
Build local bloom <---> BloomFilterBits RPC <-------- Aggregate & Broadcast
on each data node (OR-aggregate bits) via BloomFilterPush
| | |
| v v
+-----------------> BloomFilterPush might_contain() check
(metadata only) | before PushData
|
v
Filtered tuples buffered
```

### Key Components
| Component | Location | Purpose |
|-----------|----------|---------|
| `BloomFilter` class | `include/common/bloom_filter.hpp` | MurmurHash3-based bloom filter |
| `BloomFilterArgs` RPC | `include/network/rpc_message.hpp` | Serialization for network transfer |
| `ClusterManager` storage | `include/common/cluster_manager.hpp` | Stores bloom filter per context |
| `PushData` handler | `src/main.cpp` | Receives and buffers filtered tuples |
| `ShuffleFragment` handler | `src/main.cpp` | Applies bloom filter before sending |
| Coordinator | `src/distributed/distributed_executor.cpp` | Broadcasts filter after Phase 1 |
| `BloomFilterBitsArgs` RPC | `include/network/rpc_message.hpp` | Local bloom bits from data nodes |
| `BloomFilterArgs` RPC | `include/network/rpc_message.hpp` | Aggregated filter broadcast |
| `ClusterManager` storage | `include/common/cluster_manager.hpp` | Stores local and aggregated bloom filters |
| `BloomFilterBits` handler | `src/main.cpp` | Returns local bloom bits to coordinator |
| `ShuffleFragment` handler | `src/main.cpp` | Builds local bloom during Phase 1 scan |
| Coordinator | `src/distributed/distributed_executor.cpp` | Collects bits, aggregates, broadcasts filter |

### Test Coverage
- 10 unit tests covering: BloomFilter class, BloomFilterArgs serialization, ClusterManager storage, filter application logic
Expand Down
5 changes: 3 additions & 2 deletions docs/phases/PHASE_6_DISTRIBUTED_JOIN.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ Seamlessly integrated shuffle buffers into the Volcano execution model.
### 5. Bloom Filter Optimization (`common/bloom_filter.hpp`)
Added probabilistic filtering to reduce network traffic in shuffle joins.
- **MurmurHash3-based BloomFilter**: Configurable false positive rate (default 1%) with optimal bit count and hash function calculation.
- **Filter Construction**: Built during Phase 1 scan, stored in `ClusterManager` per context.
- **Filter Application**: `PushData` handler checks `might_contain()` before buffering, skipping tuples that will definitely not match.
- **Distributed Construction**: Each data node builds a local bloom filter from its left/build table partition during Phase 1 scan.
- **Bit Aggregation**: Coordinator collects local bloom bits from all data nodes via `BloomFilterBits` RPC and OR-aggregates them into a single filter.
- **Sender-Side Filtering**: Aggregated filter is broadcast via `BloomFilterPush` before Phase 2; `ShuffleFragment` handler applies `might_contain()` before sending `PushData`, skipping tuples that will definitely not match.

## Lessons Learned
- Shuffle joins significantly reduce network traffic compared to broadcast joins for large-to-large table joins.
Expand Down
127 changes: 127 additions & 0 deletions include/common/cluster_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,129 @@ class ClusterManager {
return "";
}

/**
* @brief Store local bloom filter bits from this node (called on data nodes)
*/
void set_local_bloom_bits(const std::string& context_id, std::vector<uint8_t> bits,
size_t expected_elements, size_t num_hashes) {
const std::scoped_lock<std::mutex> lock(mutex_);
local_bloom_bits_[context_id] = std::move(bits);
local_expected_elements_map_[context_id] = expected_elements;
local_num_hashes_map_[context_id] = num_hashes;
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

/**
* @brief Get stored local bloom filter bits for a context
*/
[[nodiscard]] std::vector<uint8_t> get_local_bloom_bits(const std::string& context_id) const {
const std::scoped_lock<std::mutex> lock(mutex_);
auto it = local_bloom_bits_.find(context_id);
if (it != local_bloom_bits_.end()) {
return it->second;
}
return {};
}

/**
* @brief Get expected_elements for local bloom filter
*/
[[nodiscard]] size_t get_local_expected_elements(const std::string& context_id) const {
const std::scoped_lock<std::mutex> lock(mutex_);
auto it = local_expected_elements_map_.find(context_id);
if (it != local_expected_elements_map_.end()) {
return it->second;
}
return 0;
}

/**
* @brief Get num_hashes for local bloom filter
*/
[[nodiscard]] size_t get_local_num_hashes(const std::string& context_id) const {
const std::scoped_lock<std::mutex> lock(mutex_);
auto it = local_num_hashes_map_.find(context_id);
if (it != local_num_hashes_map_.end()) {
return it->second;
}
return 0;
}

/**
* @brief Clear bloom filter for a context
*/
void clear_bloom_filter(const std::string& context_id) {
const std::scoped_lock<std::mutex> lock(mutex_);
bloom_filters_.erase(context_id);
local_bloom_bits_.erase(context_id);
local_expected_elements_map_.erase(context_id);
local_num_hashes_map_.erase(context_id);
}

/**
* @brief Store local right table rows for outer join processing
* Called during Phase 2 shuffle when sending right table rows to other nodes
*/
void set_local_right_rows(const std::string& context_id, const std::string& table_name,
std::vector<executor::Tuple> rows) {
const std::scoped_lock<std::mutex> lock(mutex_);
local_right_table_rows_[context_id][table_name] = std::move(rows);
}

/**
* @brief Get stored local right table rows
*/
[[nodiscard]] std::vector<executor::Tuple> get_local_right_rows(
const std::string& context_id, const std::string& table_name) const {
const std::scoped_lock<std::mutex> lock(mutex_);
auto ctx_it = local_right_table_rows_.find(context_id);
if (ctx_it != local_right_table_rows_.end()) {
auto table_it = ctx_it->second.find(table_name);
if (table_it != ctx_it->second.end()) {
return table_it->second;
}
}
return {};
}

/**
* @brief Clear local right table rows for a context
*/
void clear_local_right_rows(const std::string& context_id) {
const std::scoped_lock<std::mutex> lock(mutex_);
local_right_table_rows_.erase(context_id);
}

/**
* @brief Store unmatched rows for a context (used by outer join processing)
*/
void set_unmatched_rows(const std::string& context_id, const std::string& table_name,
std::vector<executor::Tuple> rows) {
const std::scoped_lock<std::mutex> lock(mutex_);
unmatched_rows_[context_id][table_name] = std::move(rows);
}

/**
* @brief Get stored unmatched rows for a context
*/
[[nodiscard]] std::vector<executor::Tuple> get_unmatched_rows(
const std::string& context_id, const std::string& table_name) const {
const std::scoped_lock<std::mutex> lock(mutex_);
auto ctx_it = unmatched_rows_.find(context_id);
if (ctx_it != unmatched_rows_.end()) {
auto table_it = ctx_it->second.find(table_name);
if (table_it != ctx_it->second.end()) {
return table_it->second;
}
}
return {};
}

/**
* @brief Clear unmatched rows for a context
*/
void clear_unmatched_rows(const std::string& context_id) {
const std::scoped_lock<std::mutex> lock(mutex_);
unmatched_rows_.erase(context_id);
}

private:
Expand All @@ -311,6 +428,16 @@ class ClusterManager {
shuffle_buffers_;
/* context_id -> bloom filter data */
std::unordered_map<std::string, BloomFilterEntry> bloom_filters_;
/* context_id -> local bloom filter bits (for aggregation during distributed build) */
std::unordered_map<std::string, std::vector<uint8_t>> local_bloom_bits_;
std::unordered_map<std::string, size_t> local_expected_elements_map_;
std::unordered_map<std::string, size_t> local_num_hashes_map_;
/* context_id -> table_name -> local right table rows for outer join tracking */
std::unordered_map<std::string, std::unordered_map<std::string, std::vector<executor::Tuple>>>
local_right_table_rows_;
/* context_id -> table_name -> unmatched rows for outer join NULL-padding */
std::unordered_map<std::string, std::unordered_map<std::string, std::vector<executor::Tuple>>>
unmatched_rows_;
mutable std::mutex mutex_;
};

Expand Down
12 changes: 12 additions & 0 deletions include/executor/operator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,18 @@ class HashJoinOperator : public Operator {

void set_memory_resource(std::pmr::memory_resource* mr) override;
void set_params(const std::vector<common::Value>* params) override;

/**
* @brief Get unmatched right rows after join execution
* @return Vector of tuples - the right-side rows that had no match
*/
[[nodiscard]] std::vector<Tuple> get_unmatched_right_rows() const;

/**
* @brief Get join key values of unmatched right rows
* @return Vector of strings - the join key values for unmatched right rows
*/
[[nodiscard]] std::vector<std::string> get_unmatched_right_keys() const;
};

/**
Expand Down
Loading