From d100371cb0ea85c51b153a0d6b8b4e91683b598f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Mon, 13 Apr 2026 20:12:26 +0300 Subject: [PATCH 01/10] feat: implement bloom filter building from left table data - Build local bloom filter during Phase 1 left table scan - Collect and OR-aggregate bits via BloomFilterBits RPC - Broadcast aggregated filter via BloomFilterPush before Phase 2 - Apply sender-side filtering before PushData in Phase 2 --- include/common/cluster_manager.hpp | 43 +++++++++++++++++ include/network/rpc_message.hpp | 59 ++++++++++++++++++++++++ src/distributed/distributed_executor.cpp | 44 +++++++++++++----- src/main.cpp | 40 ++++++++++++++++ tests/distributed_tests.cpp | 22 +++++++++ 5 files changed, 196 insertions(+), 12 deletions(-) diff --git a/include/common/cluster_manager.hpp b/include/common/cluster_manager.hpp index 4b3ef244..8a1e9f9f 100644 --- a/include/common/cluster_manager.hpp +++ b/include/common/cluster_manager.hpp @@ -279,6 +279,45 @@ class ClusterManager { return ""; } + /** + * @brief Store local bloom filter bits from this node (called on data nodes) + */ + void set_local_bloom_bits(const std::string& context_id, std::vector bits, + size_t expected_elements, size_t num_hashes) { + const std::scoped_lock lock(mutex_); + local_bloom_bits_[context_id] = std::move(bits); + local_expected_elements_ = expected_elements; + local_num_hashes_ = num_hashes; + } + + /** + * @brief Get stored local bloom filter bits for a context + */ + [[nodiscard]] std::vector get_local_bloom_bits(const std::string& context_id) const { + const std::scoped_lock lock(mutex_); + auto it = local_bloom_bits_.find(context_id); + if (it != local_bloom_bits_.end()) { + return it->second; + } + return {}; + } + + /** + * @brief Get expected_elements for local bloom filter + */ + [[nodiscard]] size_t get_local_expected_elements() const { + const std::scoped_lock lock(mutex_); + return local_expected_elements_; + } + + /** + * @brief Get num_hashes for local bloom filter + */ + [[nodiscard]] size_t get_local_num_hashes() const { + const std::scoped_lock lock(mutex_); + return local_num_hashes_; + } + /** * @brief Clear bloom filter for a context */ @@ -311,6 +350,10 @@ class ClusterManager { shuffle_buffers_; /* context_id -> bloom filter data */ std::unordered_map bloom_filters_; + /* context_id -> local bloom filter bits (for aggregation during distributed build) */ + std::unordered_map> local_bloom_bits_; + size_t local_expected_elements_ = 0; + size_t local_num_hashes_ = 0; mutable std::mutex mutex_; }; diff --git a/include/network/rpc_message.hpp b/include/network/rpc_message.hpp index 4dce850d..3d857358 100644 --- a/include/network/rpc_message.hpp +++ b/include/network/rpc_message.hpp @@ -34,6 +34,7 @@ enum class RpcType : uint8_t { PushData = 9, ShuffleFragment = 10, BloomFilterPush = 11, + BloomFilterBits = 12, Error = 255 }; @@ -507,6 +508,64 @@ struct BloomFilterArgs { } }; +/** + * @brief Arguments for sending local bloom filter bits from data node to coordinator + * Used during Phase 1 to collect and aggregate bloom filters from all nodes + */ +struct BloomFilterBitsArgs { + std::string context_id; + std::vector filter_data; + size_t expected_elements = 0; + size_t num_hashes = 0; + + [[nodiscard]] std::vector serialize() const { + std::vector out; + Serializer::serialize_string(context_id, out); + + // Serialize filter data (blob) + const uint32_t filter_len = static_cast(filter_data.size()); + const size_t off = out.size(); + out.resize(off + Serializer::VAL_SIZE_32); + std::memcpy(out.data() + off, &filter_len, Serializer::VAL_SIZE_32); + out.insert(out.end(), filter_data.begin(), filter_data.end()); + + // Serialize metadata + uint64_t tmp_expected = static_cast(expected_elements); + uint8_t tmp_hashes = static_cast(num_hashes); + const size_t off2 = out.size(); + out.resize(off2 + 9); // 8 bytes for expected_elements + 1 for num_hashes + std::memcpy(out.data() + off2, &tmp_expected, 8); + out[off2 + 8] = tmp_hashes; + return out; + } + + static BloomFilterBitsArgs deserialize(const std::vector& in) { + BloomFilterBitsArgs args; + size_t offset = 0; + args.context_id = Serializer::deserialize_string(in.data(), offset, in.size()); + + uint32_t filter_len = 0; + if (offset + Serializer::VAL_SIZE_32 <= in.size()) { + std::memcpy(&filter_len, in.data() + offset, Serializer::VAL_SIZE_32); + offset += Serializer::VAL_SIZE_32; + } + if (offset + filter_len <= in.size()) { + args.filter_data.resize(filter_len); + std::memcpy(args.filter_data.data(), in.data() + offset, filter_len); + offset += filter_len; + } + + if (offset + 9 <= in.size()) { + uint64_t tmp_expected = 0; + std::memcpy(&tmp_expected, in.data() + offset, 8); + args.expected_elements = static_cast(tmp_expected); + offset += 8; + args.num_hashes = static_cast(in[offset]); + } + return args; + } +}; + /** * @brief Arguments for TxnPrepare/Commit/Abort RPC */ diff --git a/src/distributed/distributed_executor.cpp b/src/distributed/distributed_executor.cpp index c39deb9d..e5e1c3d6 100644 --- a/src/distributed/distributed_executor.cpp +++ b/src/distributed/distributed_executor.cpp @@ -242,23 +242,43 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt, return res; } - // After Phase 1, each node will have received left table data. - // Now broadcast bloom filter built from that data to all nodes for Phase 2 - // filtering. The filter is sent as a separate RPC that data nodes will store and - // apply to their right table shuffle. For now, we send a simple metadata-only - // filter that signals "filtering enabled" - the actual filter building happens on - // each data node during Phase 1 and they stash it for use during Phase 2. - // - // In production, we'd collect and OR all local bloom filters, but for POC - // we just signal that bloom filtering is enabled for this context. + // After Phase 1, collect bloom filter bits from each data node and aggregate + // via bitwise OR to create the combined bloom filter + std::vector aggregated_bits; + size_t total_expected = 0; + size_t max_hashes = 0; + + for (const auto& node : data_nodes) { + network::RpcClient client(node.address, node.cluster_port); + if (!client.connect()) { + continue; + } + network::BloomFilterBitsArgs bits_args; + bits_args.context_id = context_id; + std::vector resp; + if (client.call(network::RpcType::BloomFilterBits, bits_args.serialize(), resp)) { + auto reply = network::BloomFilterBitsArgs::deserialize(resp); + if (reply.filter_data.size() > aggregated_bits.size()) { + aggregated_bits.resize(reply.filter_data.size(), 0); + } + // Bitwise OR aggregation + for (size_t i = 0; i < reply.filter_data.size(); i++) { + aggregated_bits[i] |= reply.filter_data[i]; + } + total_expected += reply.expected_elements; + max_hashes = std::max(max_hashes, reply.num_hashes); + } + } + + // Broadcast the aggregated bloom filter to all nodes for Phase 2 filtering network::BloomFilterArgs bf_args; bf_args.context_id = context_id; bf_args.build_table = left_table; bf_args.probe_table = right_table; bf_args.probe_key_col = right_key; // Tell probe side which column to filter on - bf_args.filter_data.clear(); // Empty = filter built distributed - bf_args.expected_elements = data_nodes.size() * 1000; // Estimate - bf_args.num_hashes = 4; + bf_args.filter_data = aggregated_bits; + bf_args.expected_elements = total_expected; + bf_args.num_hashes = max_hashes > 0 ? max_hashes : 4; auto bf_payload = bf_args.serialize(); for (const auto& node : data_nodes) { diff --git a/src/main.cpp b/src/main.cpp index f68b37bf..7fb7aa9e 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -516,6 +516,31 @@ int main(int argc, char* argv[]) { static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); }); + // Handler for collecting local bloom filter bits from data nodes + // Coordinator calls this after Phase 1 to aggregate bloom filters + rpc_server->set_handler( + cloudsql::network::RpcType::BloomFilterBits, + [&](const cloudsql::network::RpcHeader& h, const std::vector& p, + int fd) { + (void)h; + auto args = cloudsql::network::BloomFilterBitsArgs::deserialize(p); + cloudsql::network::BloomFilterBitsArgs reply_args; + reply_args.context_id = args.context_id; + reply_args.filter_data = cluster_manager->get_local_bloom_bits(args.context_id); + reply_args.expected_elements = cluster_manager->get_local_expected_elements(); + reply_args.num_hashes = cluster_manager->get_local_num_hashes(); + + auto resp_p = reply_args.serialize(); + cloudsql::network::RpcHeader resp_h; + resp_h.type = cloudsql::network::RpcType::QueryResults; + resp_h.payload_len = static_cast(resp_p.size()); + char h_buf[cloudsql::network::RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast( + send(fd, h_buf, cloudsql::network::RpcHeader::HEADER_SIZE, 0)); + static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); + }); + rpc_server->set_handler( cloudsql::network::RpcType::ShuffleFragment, [&](const cloudsql::network::RpcHeader& h, const std::vector& p, @@ -556,11 +581,18 @@ int main(int argc, char* argv[]) { partitions[node.id] = {}; } + // Estimate expected elements for bloom filter + // For now, estimate based on table size (will be refined with actual count) + size_t estimated_count = 1000; + cloudsql::common::BloomFilter local_bloom(estimated_count); + auto iter = table.scan(); cloudsql::storage::HeapTable::TupleMeta t_meta; while (iter.next_meta(t_meta)) { if (t_meta.xmax == 0) { // Visible const auto& key_val = t_meta.tuple.get(key_idx); + // Build bloom filter from join key values + local_bloom.insert(key_val); uint32_t node_idx = cloudsql::cluster::ShardManager::compute_shard( key_val, static_cast(data_nodes.size())); @@ -569,6 +601,14 @@ int main(int argc, char* argv[]) { } } + // Store local bloom filter bits for coordinator to collect + // The coordinator will aggregate these during Phase 1 + auto bloom_bits = local_bloom.serialize(); + cluster_manager->set_local_bloom_bits( + args.context_id, bloom_bits, + local_bloom.expected_elements(), + local_bloom.num_hashes()); + bool overall_success = true; std::string delivery_errors; diff --git a/tests/distributed_tests.cpp b/tests/distributed_tests.cpp index e96dca94..8f0d35e6 100644 --- a/tests/distributed_tests.cpp +++ b/tests/distributed_tests.cpp @@ -330,14 +330,36 @@ TEST(DistributedExecutorTests, ShuffleJoinOrchestration) { static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); }; + auto bloom_bits_handler = [&](const RpcHeader& h, const std::vector& p, int fd) { + (void)h; + auto args = BloomFilterBitsArgs::deserialize(p); + BloomFilterBitsArgs reply_args; + reply_args.context_id = args.context_id; + // Return empty bloom filter bits for mock - real implementation would return actual bits + reply_args.filter_data = {}; + reply_args.expected_elements = 0; + reply_args.num_hashes = 4; + + auto resp_p = reply_args.serialize(); + RpcHeader resp_h; + resp_h.type = RpcType::QueryResults; + resp_h.payload_len = static_cast(resp_p.size()); + char h_buf[RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast(send(fd, h_buf, RpcHeader::HEADER_SIZE, 0)); + static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); + }; + node1.set_handler(RpcType::ShuffleFragment, handler); node1.set_handler(RpcType::PushData, handler); node1.set_handler(RpcType::ExecuteFragment, handler); node1.set_handler(RpcType::BloomFilterPush, handler); + node1.set_handler(RpcType::BloomFilterBits, bloom_bits_handler); node2.set_handler(RpcType::ShuffleFragment, handler); node2.set_handler(RpcType::PushData, handler); node2.set_handler(RpcType::ExecuteFragment, handler); node2.set_handler(RpcType::BloomFilterPush, handler); + node2.set_handler(RpcType::BloomFilterBits, bloom_bits_handler); ASSERT_TRUE(node1.start()); ASSERT_TRUE(node2.start()); From 1a2874ae4d7302401445234dee309dd4ee76ebb4 Mon Sep 17 00:00:00 2001 From: poyrazK <83272398+poyrazK@users.noreply.github.com> Date: Mon, 13 Apr 2026 17:13:16 +0000 Subject: [PATCH 02/10] style: automated clang-format fixes --- src/distributed/distributed_executor.cpp | 3 ++- src/main.cpp | 16 +++++++++------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/distributed/distributed_executor.cpp b/src/distributed/distributed_executor.cpp index e5e1c3d6..57bc3750 100644 --- a/src/distributed/distributed_executor.cpp +++ b/src/distributed/distributed_executor.cpp @@ -256,7 +256,8 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt, network::BloomFilterBitsArgs bits_args; bits_args.context_id = context_id; std::vector resp; - if (client.call(network::RpcType::BloomFilterBits, bits_args.serialize(), resp)) { + if (client.call(network::RpcType::BloomFilterBits, bits_args.serialize(), + resp)) { auto reply = network::BloomFilterBitsArgs::deserialize(resp); if (reply.filter_data.size() > aggregated_bits.size()) { aggregated_bits.resize(reply.filter_data.size(), 0); diff --git a/src/main.cpp b/src/main.cpp index 7fb7aa9e..40c3fd5e 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -526,8 +526,10 @@ int main(int argc, char* argv[]) { auto args = cloudsql::network::BloomFilterBitsArgs::deserialize(p); cloudsql::network::BloomFilterBitsArgs reply_args; reply_args.context_id = args.context_id; - reply_args.filter_data = cluster_manager->get_local_bloom_bits(args.context_id); - reply_args.expected_elements = cluster_manager->get_local_expected_elements(); + reply_args.filter_data = + cluster_manager->get_local_bloom_bits(args.context_id); + reply_args.expected_elements = + cluster_manager->get_local_expected_elements(); reply_args.num_hashes = cluster_manager->get_local_num_hashes(); auto resp_p = reply_args.serialize(); @@ -582,7 +584,8 @@ int main(int argc, char* argv[]) { } // Estimate expected elements for bloom filter - // For now, estimate based on table size (will be refined with actual count) + // For now, estimate based on table size (will be refined with actual + // count) size_t estimated_count = 1000; cloudsql::common::BloomFilter local_bloom(estimated_count); @@ -604,10 +607,9 @@ int main(int argc, char* argv[]) { // Store local bloom filter bits for coordinator to collect // The coordinator will aggregate these during Phase 1 auto bloom_bits = local_bloom.serialize(); - cluster_manager->set_local_bloom_bits( - args.context_id, bloom_bits, - local_bloom.expected_elements(), - local_bloom.num_hashes()); + cluster_manager->set_local_bloom_bits(args.context_id, bloom_bits, + local_bloom.expected_elements(), + local_bloom.num_hashes()); bool overall_success = true; std::string delivery_errors; From a73926b65eba1ac1783d74ccd91eefd1dad3dc85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Tue, 14 Apr 2026 14:17:40 +0300 Subject: [PATCH 03/10] docs: update bloom filter architecture docs with 3-phase build process --- docs/performance/SQLITE_COMPARISON.md | 41 +++++++++++++------------ docs/phases/PHASE_6_DISTRIBUTED_JOIN.md | 5 +-- 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/docs/performance/SQLITE_COMPARISON.md b/docs/performance/SQLITE_COMPARISON.md index 6df8830a..0c53bf0a 100644 --- a/docs/performance/SQLITE_COMPARISON.md +++ b/docs/performance/SQLITE_COMPARISON.md @@ -45,34 +45,37 @@ We addressed the gaps via the following optimizations: Distributed shuffle joins send **all tuples** across the network to partitioned nodes, even when many will never match. This causes unnecessary network traffic and buffer memory usage. ### Solution: Bloom Filter Integration -Implemented bloom filters to filter tuples at the source before network transmission: -- **One-sided bloom filter**: Built from the left/build table, applied to filter the right/probe table -- **Distributed construction**: Each data node constructs its local bloom during the left/build scan phase -- **Coordinator coordination**: `BloomFilterPush` RPC broadcasts filter metadata to all nodes before the right/probe shuffle +Implemented bloom filters to filter tuples at the source before network transmission using a 3-phase approach: +- **Phase 1 (Local Build)**: Each data node scans its local left/build table partition, extracts join key values, and builds a local bloom filter +- **Phase 2 (Bit Aggregation)**: Coordinator sends `BloomFilterBits` RPC to each data node; each responds with local bloom bits; coordinator OR-aggregates all bits into a single filter +- **Phase 3 (Sender-Side Filter)**: Coordinator broadcasts aggregated filter via `BloomFilterPush` RPC; before sending right/probe tuples, `ShuffleFragment` handler checks `might_contain()` and skips tuples that will definitely not match ### Architecture ``` -[Phase 1: Shuffle Left] [Phase 2: Shuffle Right] - | | - v v -Build local bloom Apply bloom filter -from join keys before buffering - | | - +---- BloomFilterPush ----->---+ - (filter metadata) | - v - Filtered tuples buffered +Phase 1: Scan Left Phase 2: Aggregate Bits Phase 3: Filter Right + | | | + v v v +Build local bloom <---> BloomFilterBits RPC <-------- Aggregate & Broadcast +on each data node (OR-aggregate bits) via BloomFilterPush + | | | + | v v + +-----------------> BloomFilterPush might_contain() check + (metadata only) | before PushData + | + v + Filtered tuples buffered ``` ### Key Components | Component | Location | Purpose | |-----------|----------|---------| | `BloomFilter` class | `include/common/bloom_filter.hpp` | MurmurHash3-based bloom filter | -| `BloomFilterArgs` RPC | `include/network/rpc_message.hpp` | Serialization for network transfer | -| `ClusterManager` storage | `include/common/cluster_manager.hpp` | Stores bloom filter per context | -| `PushData` handler | `src/main.cpp` | Receives and buffers filtered tuples | -| `ShuffleFragment` handler | `src/main.cpp` | Applies bloom filter before sending | -| Coordinator | `src/distributed/distributed_executor.cpp` | Broadcasts filter after Phase 1 | +| `BloomFilterBitsArgs` RPC | `include/network/rpc_message.hpp` | Local bloom bits from data nodes | +| `BloomFilterArgs` RPC | `include/network/rpc_message.hpp` | Aggregated filter broadcast | +| `ClusterManager` storage | `include/common/cluster_manager.hpp` | Stores local and aggregated bloom filters | +| `BloomFilterBits` handler | `src/main.cpp` | Returns local bloom bits to coordinator | +| `ShuffleFragment` handler | `src/main.cpp` | Builds local bloom during Phase 1 scan | +| Coordinator | `src/distributed/distributed_executor.cpp` | Collects bits, aggregates, broadcasts filter | ### Test Coverage - 10 unit tests covering: BloomFilter class, BloomFilterArgs serialization, ClusterManager storage, filter application logic diff --git a/docs/phases/PHASE_6_DISTRIBUTED_JOIN.md b/docs/phases/PHASE_6_DISTRIBUTED_JOIN.md index 9c6fdfa3..a89eed68 100644 --- a/docs/phases/PHASE_6_DISTRIBUTED_JOIN.md +++ b/docs/phases/PHASE_6_DISTRIBUTED_JOIN.md @@ -28,8 +28,9 @@ Seamlessly integrated shuffle buffers into the Volcano execution model. ### 5. Bloom Filter Optimization (`common/bloom_filter.hpp`) Added probabilistic filtering to reduce network traffic in shuffle joins. - **MurmurHash3-based BloomFilter**: Configurable false positive rate (default 1%) with optimal bit count and hash function calculation. -- **Filter Construction**: Built during Phase 1 scan, stored in `ClusterManager` per context. -- **Filter Application**: `PushData` handler checks `might_contain()` before buffering, skipping tuples that will definitely not match. +- **Distributed Construction**: Each data node builds a local bloom filter from its left/build table partition during Phase 1 scan. +- **Bit Aggregation**: Coordinator collects local bloom bits from all data nodes via `BloomFilterBits` RPC and OR-aggregates them into a single filter. +- **Sender-Side Filtering**: Aggregated filter is broadcast via `BloomFilterPush` before Phase 2; `ShuffleFragment` handler applies `might_contain()` before sending `PushData`, skipping tuples that will definitely not match. ## Lessons Learned - Shuffle joins significantly reduce network traffic compared to broadcast joins for large-to-large table joins. From ad7a6f153ce08cbf27f3a272a3da4e194ceda4c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Tue, 14 Apr 2026 15:30:11 +0300 Subject: [PATCH 04/10] fix: per-context bloom filter metadata to avoid race conditions - Store expected_elements and num_hashes in per-context maps instead of globals - Update set_local_bloom_bits to write to per-context maps - Update getters to take context_id parameter - clear_bloom_filter now also erases local_bloom_bits and metadata maps - Update BloomFilterBits handler to use per-context getters --- include/common/cluster_manager.hpp | 27 +++++++++++++++++++-------- src/main.cpp | 4 ++-- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/include/common/cluster_manager.hpp b/include/common/cluster_manager.hpp index 8a1e9f9f..9c5dbdbd 100644 --- a/include/common/cluster_manager.hpp +++ b/include/common/cluster_manager.hpp @@ -286,8 +286,8 @@ class ClusterManager { size_t expected_elements, size_t num_hashes) { const std::scoped_lock lock(mutex_); local_bloom_bits_[context_id] = std::move(bits); - local_expected_elements_ = expected_elements; - local_num_hashes_ = num_hashes; + local_expected_elements_map_[context_id] = expected_elements; + local_num_hashes_map_[context_id] = num_hashes; } /** @@ -305,17 +305,25 @@ class ClusterManager { /** * @brief Get expected_elements for local bloom filter */ - [[nodiscard]] size_t get_local_expected_elements() const { + [[nodiscard]] size_t get_local_expected_elements(const std::string& context_id) const { const std::scoped_lock lock(mutex_); - return local_expected_elements_; + auto it = local_expected_elements_map_.find(context_id); + if (it != local_expected_elements_map_.end()) { + return it->second; + } + return 0; } /** * @brief Get num_hashes for local bloom filter */ - [[nodiscard]] size_t get_local_num_hashes() const { + [[nodiscard]] size_t get_local_num_hashes(const std::string& context_id) const { const std::scoped_lock lock(mutex_); - return local_num_hashes_; + auto it = local_num_hashes_map_.find(context_id); + if (it != local_num_hashes_map_.end()) { + return it->second; + } + return 0; } /** @@ -324,6 +332,9 @@ class ClusterManager { void clear_bloom_filter(const std::string& context_id) { const std::scoped_lock lock(mutex_); bloom_filters_.erase(context_id); + local_bloom_bits_.erase(context_id); + local_expected_elements_map_.erase(context_id); + local_num_hashes_map_.erase(context_id); } private: @@ -352,8 +363,8 @@ class ClusterManager { std::unordered_map bloom_filters_; /* context_id -> local bloom filter bits (for aggregation during distributed build) */ std::unordered_map> local_bloom_bits_; - size_t local_expected_elements_ = 0; - size_t local_num_hashes_ = 0; + std::unordered_map local_expected_elements_map_; + std::unordered_map local_num_hashes_map_; mutable std::mutex mutex_; }; diff --git a/src/main.cpp b/src/main.cpp index 40c3fd5e..9a15a370 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -529,8 +529,8 @@ int main(int argc, char* argv[]) { reply_args.filter_data = cluster_manager->get_local_bloom_bits(args.context_id); reply_args.expected_elements = - cluster_manager->get_local_expected_elements(); - reply_args.num_hashes = cluster_manager->get_local_num_hashes(); + cluster_manager->get_local_expected_elements(args.context_id); + reply_args.num_hashes = cluster_manager->get_local_num_hashes(args.context_id); auto resp_p = reply_args.serialize(); cloudsql::network::RpcHeader resp_h; From 0650ee05967941c190f02b356b41d8ba55493ea0 Mon Sep 17 00:00:00 2001 From: poyrazK <83272398+poyrazK@users.noreply.github.com> Date: Tue, 14 Apr 2026 12:31:10 +0000 Subject: [PATCH 05/10] style: automated clang-format fixes --- src/main.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main.cpp b/src/main.cpp index 9a15a370..ef6cad01 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -530,7 +530,8 @@ int main(int argc, char* argv[]) { cluster_manager->get_local_bloom_bits(args.context_id); reply_args.expected_elements = cluster_manager->get_local_expected_elements(args.context_id); - reply_args.num_hashes = cluster_manager->get_local_num_hashes(args.context_id); + reply_args.num_hashes = + cluster_manager->get_local_num_hashes(args.context_id); auto resp_p = reply_args.serialize(); cloudsql::network::RpcHeader resp_h; From a11c638031dff0f8317e9250d44c30c13d3b157d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Tue, 14 Apr 2026 16:06:27 +0300 Subject: [PATCH 06/10] fix: reject OUTER joins (LEFT/RIGHT/FULL) in distributed shuffle join The distributed shuffle join algorithm only supports INNER joins. LEFT, RIGHT, and FULL outer joins require different handling (e.g., broadcasting the outer table, or double-shuffle with side tables) that is not yet implemented. Instead of producing incorrect results, we now return a clear error message. Also add unit tests RightJoinRejection and FullJoinRejection to verify this behavior. --- src/distributed/distributed_executor.cpp | 16 ++++++++++ tests/distributed_tests.cpp | 40 ++++++++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/src/distributed/distributed_executor.cpp b/src/distributed/distributed_executor.cpp index 57bc3750..c627ef70 100644 --- a/src/distributed/distributed_executor.cpp +++ b/src/distributed/distributed_executor.cpp @@ -188,6 +188,22 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt, const std::string left_table = select_stmt->from()->to_string(); const std::string right_table = join.table->to_string(); + // Check join type - shuffle join only supports INNER joins + if (join.type != parser::SelectStatement::JoinType::Inner) { + QueryResult res; + std::string join_type_name; + switch (join.type) { + case parser::SelectStatement::JoinType::Left: join_type_name = "LEFT"; break; + case parser::SelectStatement::JoinType::Right: join_type_name = "RIGHT"; break; + case parser::SelectStatement::JoinType::Full: join_type_name = "FULL"; break; + default: join_type_name = "OUTER"; break; + } + res.set_error("Distributed Shuffle Join only supports INNER joins. " + + join_type_name + + " joins are not yet supported in distributed mode."); + return res; + } + // Assume join key is in the condition std::string left_key; std::string right_key; diff --git a/tests/distributed_tests.cpp b/tests/distributed_tests.cpp index 8f0d35e6..a68af353 100644 --- a/tests/distributed_tests.cpp +++ b/tests/distributed_tests.cpp @@ -447,4 +447,44 @@ TEST(DistributedExecutorTests, NonEqualityJoinRejection) { EXPECT_THAT(res.error(), testing::HasSubstr("equality join condition")); } +TEST(DistributedExecutorTests, RightJoinRejection) { + auto catalog = Catalog::create(); + const config::Config config; + ClusterManager cm(&config); + cm.register_node("n1", "127.0.0.1", 7800, config::RunMode::Data); + DistributedExecutor exec(*catalog, cm); + + auto lexer = + std::make_unique("SELECT * FROM table1 RIGHT JOIN table2 ON table1.id = table2.id"); + Parser parser(std::move(lexer)); + auto stmt = parser.parse_statement(); + + auto res = exec.execute(*stmt, "SELECT * FROM table1 RIGHT JOIN table2 ON table1.id = table2.id"); + + // Should fail because distributed shuffle join only supports INNER joins + EXPECT_FALSE(res.success()); + EXPECT_THAT(res.error(), testing::HasSubstr("only supports INNER joins")); + EXPECT_THAT(res.error(), testing::HasSubstr("RIGHT")); +} + +TEST(DistributedExecutorTests, FullJoinRejection) { + auto catalog = Catalog::create(); + const config::Config config; + ClusterManager cm(&config); + cm.register_node("n1", "127.0.0.1", 7800, config::RunMode::Data); + DistributedExecutor exec(*catalog, cm); + + auto lexer = + std::make_unique("SELECT * FROM table1 FULL JOIN table2 ON table1.id = table2.id"); + Parser parser(std::move(lexer)); + auto stmt = parser.parse_statement(); + + auto res = exec.execute(*stmt, "SELECT * FROM table1 FULL JOIN table2 ON table1.id = table2.id"); + + // Should fail because distributed shuffle join only supports INNER joins + EXPECT_FALSE(res.success()); + EXPECT_THAT(res.error(), testing::HasSubstr("only supports INNER joins")); + EXPECT_THAT(res.error(), testing::HasSubstr("FULL")); +} + } // namespace From 30895d6f156bbcb1784653342f5650c25b631723 Mon Sep 17 00:00:00 2001 From: poyrazK <83272398+poyrazK@users.noreply.github.com> Date: Tue, 14 Apr 2026 13:08:08 +0000 Subject: [PATCH 07/10] style: automated clang-format fixes --- src/distributed/distributed_executor.cpp | 16 ++++++++++++---- tests/distributed_tests.cpp | 6 ++++-- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/src/distributed/distributed_executor.cpp b/src/distributed/distributed_executor.cpp index c627ef70..33bcdc88 100644 --- a/src/distributed/distributed_executor.cpp +++ b/src/distributed/distributed_executor.cpp @@ -193,10 +193,18 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt, QueryResult res; std::string join_type_name; switch (join.type) { - case parser::SelectStatement::JoinType::Left: join_type_name = "LEFT"; break; - case parser::SelectStatement::JoinType::Right: join_type_name = "RIGHT"; break; - case parser::SelectStatement::JoinType::Full: join_type_name = "FULL"; break; - default: join_type_name = "OUTER"; break; + case parser::SelectStatement::JoinType::Left: + join_type_name = "LEFT"; + break; + case parser::SelectStatement::JoinType::Right: + join_type_name = "RIGHT"; + break; + case parser::SelectStatement::JoinType::Full: + join_type_name = "FULL"; + break; + default: + join_type_name = "OUTER"; + break; } res.set_error("Distributed Shuffle Join only supports INNER joins. " + join_type_name + diff --git a/tests/distributed_tests.cpp b/tests/distributed_tests.cpp index a68af353..d6198aa9 100644 --- a/tests/distributed_tests.cpp +++ b/tests/distributed_tests.cpp @@ -459,7 +459,8 @@ TEST(DistributedExecutorTests, RightJoinRejection) { Parser parser(std::move(lexer)); auto stmt = parser.parse_statement(); - auto res = exec.execute(*stmt, "SELECT * FROM table1 RIGHT JOIN table2 ON table1.id = table2.id"); + auto res = + exec.execute(*stmt, "SELECT * FROM table1 RIGHT JOIN table2 ON table1.id = table2.id"); // Should fail because distributed shuffle join only supports INNER joins EXPECT_FALSE(res.success()); @@ -479,7 +480,8 @@ TEST(DistributedExecutorTests, FullJoinRejection) { Parser parser(std::move(lexer)); auto stmt = parser.parse_statement(); - auto res = exec.execute(*stmt, "SELECT * FROM table1 FULL JOIN table2 ON table1.id = table2.id"); + auto res = + exec.execute(*stmt, "SELECT * FROM table1 FULL JOIN table2 ON table1.id = table2.id"); // Should fail because distributed shuffle join only supports INNER joins EXPECT_FALSE(res.success()); From a1f9c0df0c02a47cafe2b2ed92dc7bb92c6c3121 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Tue, 14 Apr 2026 16:43:55 +0300 Subject: [PATCH 08/10] fix: only reject RIGHT and FULL joins, allow LEFT joins The distributed shuffle join can correctly handle LEFT joins in the current implementation because each node executes the query locally and LEFT join only requires preserving unmatched left-table rows (which are already local to each node). RIGHT and FULL joins require tracking unmatched rows across partitions which is not yet implemented. Update error message to say "INNER and LEFT" instead of just "INNER". --- src/distributed/distributed_executor.cpp | 25 +++++++----------------- tests/distributed_tests.cpp | 8 ++++---- 2 files changed, 11 insertions(+), 22 deletions(-) diff --git a/src/distributed/distributed_executor.cpp b/src/distributed/distributed_executor.cpp index 33bcdc88..186c5ea6 100644 --- a/src/distributed/distributed_executor.cpp +++ b/src/distributed/distributed_executor.cpp @@ -188,25 +188,14 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt, const std::string left_table = select_stmt->from()->to_string(); const std::string right_table = join.table->to_string(); - // Check join type - shuffle join only supports INNER joins - if (join.type != parser::SelectStatement::JoinType::Inner) { + // Check join type - shuffle join only supports INNER and LEFT joins + // RIGHT and FULL joins require tracking unmatched rows across partitions + if (join.type == parser::SelectStatement::JoinType::Right || + join.type == parser::SelectStatement::JoinType::Full) { QueryResult res; - std::string join_type_name; - switch (join.type) { - case parser::SelectStatement::JoinType::Left: - join_type_name = "LEFT"; - break; - case parser::SelectStatement::JoinType::Right: - join_type_name = "RIGHT"; - break; - case parser::SelectStatement::JoinType::Full: - join_type_name = "FULL"; - break; - default: - join_type_name = "OUTER"; - break; - } - res.set_error("Distributed Shuffle Join only supports INNER joins. " + + std::string join_type_name = + (join.type == parser::SelectStatement::JoinType::Right) ? "RIGHT" : "FULL"; + res.set_error("Distributed Shuffle Join only supports INNER and LEFT joins. " + join_type_name + " joins are not yet supported in distributed mode."); return res; diff --git a/tests/distributed_tests.cpp b/tests/distributed_tests.cpp index d6198aa9..af30bef9 100644 --- a/tests/distributed_tests.cpp +++ b/tests/distributed_tests.cpp @@ -462,9 +462,9 @@ TEST(DistributedExecutorTests, RightJoinRejection) { auto res = exec.execute(*stmt, "SELECT * FROM table1 RIGHT JOIN table2 ON table1.id = table2.id"); - // Should fail because distributed shuffle join only supports INNER joins + // Should fail because distributed shuffle join only supports INNER and LEFT joins EXPECT_FALSE(res.success()); - EXPECT_THAT(res.error(), testing::HasSubstr("only supports INNER joins")); + EXPECT_THAT(res.error(), testing::HasSubstr("only supports INNER and LEFT joins")); EXPECT_THAT(res.error(), testing::HasSubstr("RIGHT")); } @@ -483,9 +483,9 @@ TEST(DistributedExecutorTests, FullJoinRejection) { auto res = exec.execute(*stmt, "SELECT * FROM table1 FULL JOIN table2 ON table1.id = table2.id"); - // Should fail because distributed shuffle join only supports INNER joins + // Should fail because distributed shuffle join only supports INNER and LEFT joins EXPECT_FALSE(res.success()); - EXPECT_THAT(res.error(), testing::HasSubstr("only supports INNER joins")); + EXPECT_THAT(res.error(), testing::HasSubstr("only supports INNER and LEFT joins")); EXPECT_THAT(res.error(), testing::HasSubstr("FULL")); } From c171913f525d45ebc7abe3e5af71958a3b700526 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Wed, 15 Apr 2026 17:57:15 +0300 Subject: [PATCH 09/10] feat: support RIGHT/FULL JOIN with bloom filter skip - Skip bloom filter for RIGHT/FULL joins to prevent false negatives - Add integration tests for bloom filter skip - Allow LEFT joins in distributed shuffle join The bloom filter can cause false negatives (rows filtered when they shouldn't be), which violates outer join semantics. For RIGHT/FULL joins, all right rows must be sent to ensure correct results. NOTE: Phase 3-5 for collecting unmatched outer rows is temporarily disabled due to column indexing issues with non-SELECT * queries. The local executor on each data node handles unmatched right rows correctly for RIGHT JOIN. FULL JOIN unmatched left rows will be implemented separately. --- include/common/cluster_manager.hpp | 73 ++++++ include/executor/operator.hpp | 12 + include/network/rpc_message.hpp | 133 ++++++++++ src/distributed/distributed_executor.cpp | 186 ++++++++++++-- src/executor/operator.cpp | 22 ++ src/main.cpp | 137 +++++++++- tests/distributed_tests.cpp | 310 +++++++++++++++++++++-- 7 files changed, 827 insertions(+), 46 deletions(-) diff --git a/include/common/cluster_manager.hpp b/include/common/cluster_manager.hpp index 9c5dbdbd..6cffdbf4 100644 --- a/include/common/cluster_manager.hpp +++ b/include/common/cluster_manager.hpp @@ -337,6 +337,73 @@ class ClusterManager { local_num_hashes_map_.erase(context_id); } + /** + * @brief Store local right table rows for outer join processing + * Called during Phase 2 shuffle when sending right table rows to other nodes + */ + void set_local_right_rows(const std::string& context_id, const std::string& table_name, + std::vector rows) { + const std::scoped_lock lock(mutex_); + local_right_table_rows_[context_id][table_name] = std::move(rows); + } + + /** + * @brief Get stored local right table rows + */ + [[nodiscard]] std::vector get_local_right_rows( + const std::string& context_id, const std::string& table_name) const { + const std::scoped_lock lock(mutex_); + auto ctx_it = local_right_table_rows_.find(context_id); + if (ctx_it != local_right_table_rows_.end()) { + auto table_it = ctx_it->second.find(table_name); + if (table_it != ctx_it->second.end()) { + return table_it->second; + } + } + return {}; + } + + /** + * @brief Clear local right table rows for a context + */ + void clear_local_right_rows(const std::string& context_id) { + const std::scoped_lock lock(mutex_); + local_right_table_rows_.erase(context_id); + } + + /** + * @brief Store unmatched rows for a context (used by outer join processing) + */ + void set_unmatched_rows(const std::string& context_id, const std::string& table_name, + std::vector rows) { + const std::scoped_lock lock(mutex_); + unmatched_rows_[context_id][table_name] = std::move(rows); + } + + /** + * @brief Get stored unmatched rows for a context + */ + [[nodiscard]] std::vector get_unmatched_rows( + const std::string& context_id, const std::string& table_name) const { + const std::scoped_lock lock(mutex_); + auto ctx_it = unmatched_rows_.find(context_id); + if (ctx_it != unmatched_rows_.end()) { + auto table_it = ctx_it->second.find(table_name); + if (table_it != ctx_it->second.end()) { + return table_it->second; + } + } + return {}; + } + + /** + * @brief Clear unmatched rows for a context + */ + void clear_unmatched_rows(const std::string& context_id) { + const std::scoped_lock lock(mutex_); + unmatched_rows_.erase(context_id); + } + private: /** * @brief Stored bloom filter data for a context @@ -365,6 +432,12 @@ class ClusterManager { std::unordered_map> local_bloom_bits_; std::unordered_map local_expected_elements_map_; std::unordered_map local_num_hashes_map_; + /* context_id -> table_name -> local right table rows for outer join tracking */ + std::unordered_map>> + local_right_table_rows_; + /* context_id -> table_name -> unmatched rows for outer join NULL-padding */ + std::unordered_map>> + unmatched_rows_; mutable std::mutex mutex_; }; diff --git a/include/executor/operator.hpp b/include/executor/operator.hpp index fb1a1648..e8a9bc19 100644 --- a/include/executor/operator.hpp +++ b/include/executor/operator.hpp @@ -358,6 +358,18 @@ class HashJoinOperator : public Operator { void set_memory_resource(std::pmr::memory_resource* mr) override; void set_params(const std::vector* params) override; + + /** + * @brief Get unmatched right rows after join execution + * @return Vector of tuples - the right-side rows that had no match + */ + [[nodiscard]] std::vector get_unmatched_right_rows() const; + + /** + * @brief Get join key values of unmatched right rows + * @return Vector of strings - the join key values for unmatched right rows + */ + [[nodiscard]] std::vector get_unmatched_right_keys() const; }; /** diff --git a/include/network/rpc_message.hpp b/include/network/rpc_message.hpp index 3d857358..a1ff22e7 100644 --- a/include/network/rpc_message.hpp +++ b/include/network/rpc_message.hpp @@ -35,6 +35,9 @@ enum class RpcType : uint8_t { ShuffleFragment = 10, BloomFilterPush = 11, BloomFilterBits = 12, + UnmatchedRowsReport = 13, // Data node reports unmatched right rows for outer join + UnmatchedRowsPush = 14, // Coordinator sends unmatched rows for NULL-padding + FetchUnmatchedRows = 15, // Coordinator fetches stored unmatched rows from data node Error = 255 }; @@ -566,6 +569,136 @@ struct BloomFilterBitsArgs { } }; +/** + * @brief Arguments for UnmatchedRowsReport RPC + * @note Data node reports unmatched right row keys to coordinator after local join + */ +struct UnmatchedRowsReportArgs { + std::string context_id; + std::string right_table; + std::string join_key_col; // Which column was the join key + std::vector unmatched_keys; // Key values that had no match + uint32_t left_column_count = 0; // Number of left table columns for NULL-padding + + [[nodiscard]] std::vector serialize() const { + std::vector out; + Serializer::serialize_string(context_id, out); + Serializer::serialize_string(right_table, out); + Serializer::serialize_string(join_key_col, out); + + // Serialize left column count + const uint32_t left_count = left_column_count; + const size_t lc_off = out.size(); + out.resize(lc_off + Serializer::VAL_SIZE_32); + std::memcpy(out.data() + lc_off, &left_count, Serializer::VAL_SIZE_32); + + // Serialize unmatched keys count + const uint32_t count = static_cast(unmatched_keys.size()); + const size_t off = out.size(); + out.resize(off + Serializer::VAL_SIZE_32); + std::memcpy(out.data() + off, &count, Serializer::VAL_SIZE_32); + + // Serialize each key + for (const auto& key : unmatched_keys) { + Serializer::serialize_string(key, out); + } + return out; + } + + static UnmatchedRowsReportArgs deserialize(const std::vector& in) { + UnmatchedRowsReportArgs args; + size_t offset = 0; + args.context_id = Serializer::deserialize_string(in.data(), offset, in.size()); + args.right_table = Serializer::deserialize_string(in.data(), offset, in.size()); + args.join_key_col = Serializer::deserialize_string(in.data(), offset, in.size()); + + // Deserialize left column count + if (offset + Serializer::VAL_SIZE_32 <= in.size()) { + std::memcpy(&args.left_column_count, in.data() + offset, Serializer::VAL_SIZE_32); + offset += Serializer::VAL_SIZE_32; + } + + uint32_t count = 0; + if (offset + Serializer::VAL_SIZE_32 <= in.size()) { + std::memcpy(&count, in.data() + offset, Serializer::VAL_SIZE_32); + offset += Serializer::VAL_SIZE_32; + } + + for (uint32_t i = 0; i < count; ++i) { + args.unmatched_keys.push_back( + Serializer::deserialize_string(in.data(), offset, in.size())); + } + return args; + } +}; + +/** + * @brief Arguments for UnmatchedRowsPush RPC + * @note Coordinator sends unmatched rows to data nodes for NULL-padding + */ +struct UnmatchedRowsPushArgs { + std::string context_id; + std::vector unmatched_rows; // Right rows needing NULL padding + + [[nodiscard]] std::vector serialize() const { + std::vector out; + Serializer::serialize_string(context_id, out); + + // Serialize row count + const uint32_t count = static_cast(unmatched_rows.size()); + const size_t off = out.size(); + out.resize(off + Serializer::VAL_SIZE_32); + std::memcpy(out.data() + off, &count, Serializer::VAL_SIZE_32); + + // Serialize each row + for (const auto& row : unmatched_rows) { + Serializer::serialize_tuple(row, out); + } + return out; + } + + static UnmatchedRowsPushArgs deserialize(const std::vector& in) { + UnmatchedRowsPushArgs args; + size_t offset = 0; + args.context_id = Serializer::deserialize_string(in.data(), offset, in.size()); + + uint32_t count = 0; + if (offset + Serializer::VAL_SIZE_32 <= in.size()) { + std::memcpy(&count, in.data() + offset, Serializer::VAL_SIZE_32); + offset += Serializer::VAL_SIZE_32; + } + + for (uint32_t i = 0; i < count; ++i) { + args.unmatched_rows.push_back(Serializer::deserialize_tuple(in.data(), offset, in.size())); + } + return args; + } +}; + +/** + * @brief Arguments for FetchUnmatchedRows RPC + * @note Coordinator fetches stored unmatched rows from a data node + */ +struct FetchUnmatchedRowsArgs { + std::string context_id; + std::string table_name; + + [[nodiscard]] std::vector serialize() const { + std::vector out; + Serializer::serialize_string(context_id, out); + Serializer::serialize_string(table_name, out); + return out; + } + + static FetchUnmatchedRowsArgs deserialize(const std::vector& in) { + FetchUnmatchedRowsArgs args; + size_t offset = 0; + args.context_id = Serializer::deserialize_string(in.data(), offset, in.size()); + args.table_name = Serializer::deserialize_string(in.data(), offset, in.size()); + return args; + } +}; + /** * @brief Arguments for TxnPrepare/Commit/Abort RPC */ diff --git a/src/distributed/distributed_executor.cpp b/src/distributed/distributed_executor.cpp index 186c5ea6..478fa76a 100644 --- a/src/distributed/distributed_executor.cpp +++ b/src/distributed/distributed_executor.cpp @@ -180,6 +180,13 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt, // Step 2: Advanced Joins: Broadcast or Shuffle Join Orchestration std::string context_id = "ctx_" + std::to_string(next_context_id.fetch_add(1)); + // Variables to track outer join info for Phase 3-5 processing after ExecuteFragment + bool is_outer_join_join_query = false; + std::string outer_join_left_table; + std::string outer_join_right_table; + std::string outer_join_right_key; + parser::SelectStatement::JoinType outer_join_type = parser::SelectStatement::JoinType::Inner; + if (type == parser::StmtType::Select) { const auto* select_stmt = dynamic_cast(&stmt); if (select_stmt != nullptr && !select_stmt->joins().empty()) { @@ -188,20 +195,18 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt, const std::string left_table = select_stmt->from()->to_string(); const std::string right_table = join.table->to_string(); - // Check join type - shuffle join only supports INNER and LEFT joins - // RIGHT and FULL joins require tracking unmatched rows across partitions - if (join.type == parser::SelectStatement::JoinType::Right || - join.type == parser::SelectStatement::JoinType::Full) { + // Check join type - shuffle join only supports INNER, LEFT, RIGHT, and FULL joins + // RIGHT and FULL joins require special handling for unmatched rows + if (join.type != parser::SelectStatement::JoinType::Inner && + join.type != parser::SelectStatement::JoinType::Left && + join.type != parser::SelectStatement::JoinType::Right && + join.type != parser::SelectStatement::JoinType::Full) { QueryResult res; - std::string join_type_name = - (join.type == parser::SelectStatement::JoinType::Right) ? "RIGHT" : "FULL"; - res.set_error("Distributed Shuffle Join only supports INNER and LEFT joins. " + - join_type_name + - " joins are not yet supported in distributed mode."); + res.set_error("Distributed Shuffle Join only supports INNER, LEFT, RIGHT, and FULL joins"); return res; } - // Assume join key is in the condition + // Check for equality join condition std::string left_key; std::string right_key; if (join.condition && join.condition->type() == parser::ExprType::Binary) { @@ -219,6 +224,18 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt, return res; } + bool is_outer_join = (join.type == parser::SelectStatement::JoinType::Right || + join.type == parser::SelectStatement::JoinType::Full); + + // Track outer join info for Phase 3-5 processing after ExecuteFragment + if (is_outer_join) { + is_outer_join_join_query = true; + outer_join_left_table = left_table; + outer_join_right_table = right_table; + outer_join_right_key = right_key; + outer_join_type = join.type; + } + // Phase 1: Instruct nodes to shuffle Left Table network::ShuffleFragmentArgs left_args; left_args.context_id = context_id; @@ -285,23 +302,27 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt, } // Broadcast the aggregated bloom filter to all nodes for Phase 2 filtering - network::BloomFilterArgs bf_args; - bf_args.context_id = context_id; - bf_args.build_table = left_table; - bf_args.probe_table = right_table; - bf_args.probe_key_col = right_key; // Tell probe side which column to filter on - bf_args.filter_data = aggregated_bits; - bf_args.expected_elements = total_expected; - bf_args.num_hashes = max_hashes > 0 ? max_hashes : 4; - auto bf_payload = bf_args.serialize(); + // IMPORTANT: Skip bloom filter for RIGHT/FULL joins - bloom filter false negatives + // would cause unmatched right rows to be lost, which violates outer join semantics + if (!is_outer_join && !aggregated_bits.empty()) { + network::BloomFilterArgs bf_args; + bf_args.context_id = context_id; + bf_args.build_table = left_table; + bf_args.probe_table = right_table; + bf_args.probe_key_col = right_key; // Tell probe side which column to filter on + bf_args.filter_data = aggregated_bits; + bf_args.expected_elements = total_expected; + bf_args.num_hashes = max_hashes > 0 ? max_hashes : 4; + auto bf_payload = bf_args.serialize(); - for (const auto& node : data_nodes) { - network::RpcClient client(node.address, node.cluster_port); - if (!client.connect()) { - continue; // Best effort for POC + for (const auto& node : data_nodes) { + network::RpcClient client(node.address, node.cluster_port); + if (!client.connect()) { + continue; // Best effort for POC + } + std::vector resp; + client.call(network::RpcType::BloomFilterPush, bf_payload, resp); } - std::vector resp; - client.call(network::RpcType::BloomFilterPush, bf_payload, resp); } // Phase 2: Instruct nodes to shuffle Right Table (now with bloom filter available) @@ -588,6 +609,121 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt, } } + // Phase 3-5: Currently disabled for all outer joins due to issues with column indexing + // when SELECT doesn't use SELECT * (causes duplicate rows instead of correct results). + // + // For RIGHT JOIN: Local executor on each data node correctly handles unmatched right rows. + // For FULL JOIN: Unmatched LEFT rows are not collected (to be implemented in separate PR). + // + // TODO: Re-enable Phase 3-5 for FULL JOIN once column indexing is fixed to properly + // identify which rows were unmatched during the distributed join. + if (false && is_outer_join_join_query && all_success) { + // Extract matched right keys from aggregated results + // The right key column is at a known position in the result schema + std::vector matched_keys; + size_t right_key_idx = static_cast(-1); + + // Find the right key column index in the result schema + for (size_t i = 0; i < result_schema.columns().size(); ++i) { + const auto& col = result_schema.columns()[i]; + if (col.name() == outer_join_right_key) { + right_key_idx = i; + break; + } + } + + // If we found the key column, extract matched keys from results + if (right_key_idx != static_cast(-1)) { + for (const auto& row : aggregated_rows) { + if (row.size() > right_key_idx) { + matched_keys.push_back(row.get(right_key_idx).to_string()); + } + } + } + + // Phase 3: Ask each node to scan local table and store unmatched rows + // First, compute the left column count for NULL-padding + uint32_t left_column_count = 0; + if (!outer_join_left_table.empty()) { + auto left_table_info = catalog_.get_table_by_name(outer_join_left_table); + if (left_table_info.has_value()) { + left_column_count = static_cast((*left_table_info)->columns.size()); + } + } + + std::vector>> report_futures; + + for (const auto& node : data_nodes) { + report_futures.push_back(std::async(std::launch::async, [node, context_id, + outer_join_right_table, + outer_join_right_key, + matched_keys, + left_column_count]() { + network::RpcClient client(node.address, node.cluster_port); + network::UnmatchedRowsReportArgs reply; + reply.context_id = context_id; + if (client.connect()) { + network::UnmatchedRowsReportArgs report_args; + report_args.context_id = context_id; + report_args.right_table = outer_join_right_table; + report_args.join_key_col = outer_join_right_key; + // Attach matched keys so node knows what was matched + report_args.unmatched_keys = matched_keys; + // Attach left column count for NULL-padding + report_args.left_column_count = left_column_count; + + std::vector resp; + if (client.call(network::RpcType::UnmatchedRowsReport, + report_args.serialize(), resp)) { + reply = network::UnmatchedRowsReportArgs::deserialize(resp); + return std::make_pair(true, reply); + } + } + return std::make_pair(false, reply); + })); + } + + // Wait for all report futures to complete + for (auto& f : report_futures) { + f.get(); + } + + // Phase 4: Fetch stored unmatched rows from each node + std::vector>>> fetch_futures; + + for (const auto& node : data_nodes) { + fetch_futures.push_back(std::async(std::launch::async, [node, context_id, + outer_join_right_table]() { + network::RpcClient client(node.address, node.cluster_port); + std::vector rows; + if (client.connect()) { + network::FetchUnmatchedRowsArgs fetch_args; + fetch_args.context_id = context_id; + fetch_args.table_name = outer_join_right_table; + + std::vector resp; + if (client.call(network::RpcType::FetchUnmatchedRows, + fetch_args.serialize(), resp)) { + auto reply = network::UnmatchedRowsPushArgs::deserialize(resp); + rows = std::move(reply.unmatched_rows); + return std::make_pair(true, std::move(rows)); + } + } + return std::make_pair(false, std::move(rows)); + })); + } + + // Aggregate all unmatched rows from all nodes + for (auto& f : fetch_futures) { + auto result = f.get(); + if (result.first) { + for (auto& row : result.second) { + aggregated_rows.push_back(std::move(row)); + } + } + } + } + if (all_success) { QueryResult res; res.set_schema(std::move(result_schema)); diff --git a/src/executor/operator.cpp b/src/executor/operator.cpp index 2da5cf1e..09482395 100644 --- a/src/executor/operator.cpp +++ b/src/executor/operator.cpp @@ -890,6 +890,28 @@ void HashJoinOperator::set_params(const std::vector* params) { if (right_) right_->set_params(params); } +std::vector HashJoinOperator::get_unmatched_right_rows() const { + std::vector unmatched; + auto right_schema = right_->output_schema(); + for (const auto& [key_str, build_tuple] : hash_table_) { + if (!build_tuple.matched) { + unmatched.push_back(build_tuple.tuple); + } + } + return unmatched; +} + +std::vector HashJoinOperator::get_unmatched_right_keys() const { + std::vector keys; + auto right_schema = right_->output_schema(); + for (const auto& [key_str, build_tuple] : hash_table_) { + if (!build_tuple.matched) { + keys.push_back(key_str); + } + } + return keys; +} + /* --- LimitOperator --- */ LimitOperator::LimitOperator(std::unique_ptr child, int64_t limit, int64_t offset) diff --git a/src/main.cpp b/src/main.cpp index ef6cad01..d9e659cb 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -517,7 +517,7 @@ int main(int argc, char* argv[]) { }); // Handler for collecting local bloom filter bits from data nodes - // Coordinator calls this after Phase 1 to aggregate bloom filters + // Coordinator aggregates these via bitwise OR after Phase 1 rpc_server->set_handler( cloudsql::network::RpcType::BloomFilterBits, [&](const cloudsql::network::RpcHeader& h, const std::vector& p, @@ -694,6 +694,141 @@ int main(int argc, char* argv[]) { reply.error_msg = e.what(); } + auto resp_p = reply.serialize(); + cloudsql::network::RpcHeader resp_h; + resp_h.type = cloudsql::network::RpcType::QueryResults; + resp_h.payload_len = static_cast(resp_p.size()); + char h_buf[cloudsql::network::RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast( + send(fd, h_buf, cloudsql::network::RpcHeader::HEADER_SIZE, 0)); + static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); + }); + + // Handler for reporting unmatched right rows after join execution + // For RIGHT/FULL outer joins, each node identifies rows from its local right table + // partition that had no matching left row during the distributed join + rpc_server->set_handler( + cloudsql::network::RpcType::UnmatchedRowsReport, + [&](const cloudsql::network::RpcHeader& h, const std::vector& p, + int fd) { + (void)h; + auto args = cloudsql::network::UnmatchedRowsReportArgs::deserialize(p); + cloudsql::network::UnmatchedRowsReportArgs reply; + reply.context_id = args.context_id; + reply.right_table = args.right_table; + reply.join_key_col = args.join_key_col; + + // args.unmatched_keys contains MATCHED keys from coordinator + // We need to return rows that are NOT in this set + std::unordered_set matched_keys_set( + args.unmatched_keys.begin(), args.unmatched_keys.end()); + + try { + // Scan local right table and collect rows that were NOT matched + auto table_meta_opt = catalog->get_table_by_name(args.right_table); + if (table_meta_opt.has_value()) { + const auto* table_meta = table_meta_opt.value(); + cloudsql::executor::Schema schema; + for (const auto& col : table_meta->columns) { + schema.add_column(col.name, col.type); + } + cloudsql::storage::HeapTable table(args.right_table, *bpm, schema); + + const size_t key_idx = schema.find_column(args.join_key_col); + if (key_idx != static_cast(-1)) { + std::vector unmatched_tuples; + auto iter = table.scan(); + cloudsql::storage::HeapTable::TupleMeta t_meta; + while (iter.next_meta(t_meta)) { + if (t_meta.xmax == 0) { + const auto& key_val = t_meta.tuple.get(key_idx); + std::string key_str = key_val.to_string(); + // Only include if NOT in matched keys + if (matched_keys_set.find(key_str) == matched_keys_set.end()) { + reply.unmatched_keys.push_back(key_str); + // Pad with NULLs for left columns and append right row + std::vector padded_values; + padded_values.reserve(args.left_column_count + t_meta.tuple.size()); + // Prepend NULLs for left table columns + for (uint32_t i = 0; i < args.left_column_count; ++i) { + padded_values.push_back(cloudsql::common::Value::make_null()); + } + // Append right table column values + for (size_t j = 0; j < t_meta.tuple.size(); ++j) { + padded_values.push_back(t_meta.tuple.get(j)); + } + unmatched_tuples.emplace_back(std::move(padded_values)); + } + } + } + // Store properly padded tuples in ClusterManager for coordinator to collect + if (cluster_manager != nullptr && !unmatched_tuples.empty()) { + cluster_manager->set_unmatched_rows(args.context_id, + args.right_table, + std::move(unmatched_tuples)); + } + } + } + } catch (const std::exception& /*e*/) { + // Return empty on error + } + + auto resp_p = reply.serialize(); + cloudsql::network::RpcHeader resp_h; + resp_h.type = cloudsql::network::RpcType::QueryResults; + resp_h.payload_len = static_cast(resp_p.size()); + char h_buf[cloudsql::network::RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast( + send(fd, h_buf, cloudsql::network::RpcHeader::HEADER_SIZE, 0)); + static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); + }); + + // Handler for receiving unmatched rows from coordinator for NULL-padding emission + // Coordinator broadcasts unmatched right rows to all nodes for final result assembly + rpc_server->set_handler( + cloudsql::network::RpcType::UnmatchedRowsPush, + [&](const cloudsql::network::RpcHeader& h, const std::vector& p, + int fd) { + (void)h; + auto args = cloudsql::network::UnmatchedRowsPushArgs::deserialize(p); + + if (cluster_manager != nullptr && !args.unmatched_rows.empty()) { + cluster_manager->buffer_shuffle_data(args.context_id, + "_unmatched_right_rows", + std::move(args.unmatched_rows)); + } + + cloudsql::network::QueryResultsReply reply; + reply.success = true; + auto resp_p = reply.serialize(); + cloudsql::network::RpcHeader resp_h; + resp_h.type = cloudsql::network::RpcType::QueryResults; + resp_h.payload_len = static_cast(resp_p.size()); + char h_buf[cloudsql::network::RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast( + send(fd, h_buf, cloudsql::network::RpcHeader::HEADER_SIZE, 0)); + static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); + }); + + // Handler for fetching stored unmatched rows from a data node + // Coordinator calls this after UnmatchedRowsReport to get full unmatched tuples + rpc_server->set_handler( + cloudsql::network::RpcType::FetchUnmatchedRows, + [&](const cloudsql::network::RpcHeader& h, const std::vector& p, + int fd) { + (void)h; + auto args = cloudsql::network::FetchUnmatchedRowsArgs::deserialize(p); + cloudsql::network::UnmatchedRowsPushArgs reply; + reply.context_id = args.context_id; + + if (cluster_manager != nullptr) { + reply.unmatched_rows = cluster_manager->get_unmatched_rows( + args.context_id, args.table_name); + } + auto resp_p = reply.serialize(); cloudsql::network::RpcHeader resp_h; resp_h.type = cloudsql::network::RpcType::QueryResults; diff --git a/tests/distributed_tests.cpp b/tests/distributed_tests.cpp index af30bef9..b347fe38 100644 --- a/tests/distributed_tests.cpp +++ b/tests/distributed_tests.cpp @@ -15,8 +15,10 @@ #include "catalog/catalog.hpp" #include "common/cluster_manager.hpp" +#include "common/value.hpp" #include "distributed/distributed_executor.hpp" #include "distributed/shard_manager.hpp" +#include "executor/types.hpp" #include "network/rpc_client.hpp" #include "network/rpc_message.hpp" #include "network/rpc_server.hpp" @@ -24,6 +26,7 @@ #include "parser/parser.hpp" using namespace cloudsql; +using namespace cloudsql::common; using namespace cloudsql::executor; using namespace cloudsql::cluster; using namespace cloudsql::parser; @@ -447,46 +450,313 @@ TEST(DistributedExecutorTests, NonEqualityJoinRejection) { EXPECT_THAT(res.error(), testing::HasSubstr("equality join condition")); } -TEST(DistributedExecutorTests, RightJoinRejection) { +TEST(DistributedExecutorTests, BloomFilterSkipForOuterJoin) { + // Test that bloom filter is NOT sent for RIGHT JOIN + // (INNER join behavior is tested in ShuffleJoinOrchestration) + + RpcServer node1(7860); + RpcServer node2(7861); + + std::atomic bloom_push_calls{0}; + std::atomic shuffle_calls{0}; + std::atomic push_calls{0}; + std::atomic fragment_calls{0}; + std::atomic unmatched_report_calls{0}; + std::atomic fetch_unmatched_calls{0}; + + // Handler for regular RPCs + auto handler = [&](const RpcHeader& h, const std::vector& p, int fd) { + (void)p; + QueryResultsReply reply; + reply.success = true; + + if (h.type == RpcType::BloomFilterPush) { + bloom_push_calls++; + } else if (h.type == RpcType::ShuffleFragment) { + shuffle_calls++; + } else if (h.type == RpcType::PushData) { + push_calls++; + } else if (h.type == RpcType::ExecuteFragment) { + fragment_calls++; + } else if (h.type == RpcType::UnmatchedRowsReport) { + unmatched_report_calls++; + } else if (h.type == RpcType::FetchUnmatchedRows) { + fetch_unmatched_calls++; + } + + auto resp_p = reply.serialize(); + RpcHeader resp_h; + resp_h.type = RpcType::QueryResults; + resp_h.payload_len = static_cast(resp_p.size()); + char h_buf[RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast(send(fd, h_buf, RpcHeader::HEADER_SIZE, 0)); + static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); + }; + + // Handler for BloomFilterBits - returns non-empty filter data to properly test bloom filter + auto bloom_bits_handler = [&](const RpcHeader& h, const std::vector& p, int fd) { + (void)h; + auto args = BloomFilterBitsArgs::deserialize(p); + BloomFilterBitsArgs reply_args; + reply_args.context_id = args.context_id; + reply_args.filter_data = {0xFF, 0xFF, 0xFF, 0xFF}; // Non-empty bloom filter + reply_args.expected_elements = 100; + reply_args.num_hashes = 4; + + auto resp_p = reply_args.serialize(); + RpcHeader resp_h; + resp_h.type = RpcType::QueryResults; + resp_h.payload_len = static_cast(resp_p.size()); + char h_buf[RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast(send(fd, h_buf, RpcHeader::HEADER_SIZE, 0)); + static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); + }; + + // Handler for UnmatchedRowsReport - returns proper response type + auto unmatched_report_handler = [&](const RpcHeader& h, const std::vector& p, int fd) { + (void)h; + auto args = UnmatchedRowsReportArgs::deserialize(p); + UnmatchedRowsReportArgs reply; + reply.context_id = args.context_id; + reply.right_table = args.right_table; + // Empty unmatched_keys = all rows matched (for test simplicity) + + auto resp_p = reply.serialize(); + RpcHeader resp_h; + resp_h.type = RpcType::QueryResults; + resp_h.payload_len = static_cast(resp_p.size()); + char h_buf[RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast(send(fd, h_buf, RpcHeader::HEADER_SIZE, 0)); + static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); + }; + + // Handler for FetchUnmatchedRows - returns proper response type + auto fetch_unmatched_handler = [&](const RpcHeader& h, const std::vector& p, int fd) { + (void)h; + auto args = FetchUnmatchedRowsArgs::deserialize(p); + UnmatchedRowsPushArgs reply; + reply.context_id = args.context_id; + reply.unmatched_rows = {}; // Empty for test simplicity + + auto resp_p = reply.serialize(); + RpcHeader resp_h; + resp_h.type = RpcType::QueryResults; + resp_h.payload_len = static_cast(resp_p.size()); + char h_buf[RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast(send(fd, h_buf, RpcHeader::HEADER_SIZE, 0)); + static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); + }; + + node1.set_handler(RpcType::BloomFilterPush, handler); + node1.set_handler(RpcType::ShuffleFragment, handler); + node1.set_handler(RpcType::PushData, handler); + node1.set_handler(RpcType::ExecuteFragment, handler); + node1.set_handler(RpcType::BloomFilterBits, bloom_bits_handler); + node1.set_handler(RpcType::UnmatchedRowsReport, unmatched_report_handler); + node1.set_handler(RpcType::FetchUnmatchedRows, fetch_unmatched_handler); + node2.set_handler(RpcType::BloomFilterPush, handler); + node2.set_handler(RpcType::ShuffleFragment, handler); + node2.set_handler(RpcType::PushData, handler); + node2.set_handler(RpcType::ExecuteFragment, handler); + node2.set_handler(RpcType::BloomFilterBits, bloom_bits_handler); + node2.set_handler(RpcType::UnmatchedRowsReport, unmatched_report_handler); + node2.set_handler(RpcType::FetchUnmatchedRows, fetch_unmatched_handler); + + ASSERT_TRUE(node1.start()); + ASSERT_TRUE(node2.start()); + auto catalog = Catalog::create(); const config::Config config; ClusterManager cm(&config); - cm.register_node("n1", "127.0.0.1", 7800, config::RunMode::Data); + cm.register_node("n1", "127.0.0.1", 7860, config::RunMode::Data); + cm.register_node("n2", "127.0.0.1", 7861, config::RunMode::Data); DistributedExecutor exec(*catalog, cm); - auto lexer = - std::make_unique("SELECT * FROM table1 RIGHT JOIN table2 ON table1.id = table2.id"); + // Execute RIGHT join - bloom filter should NOT be sent + auto lexer = std::make_unique( + "SELECT * FROM table1 RIGHT JOIN table2 ON table1.id = table2.id"); Parser parser(std::move(lexer)); auto stmt = parser.parse_statement(); + auto res = exec.execute(*stmt, "SELECT * FROM table1 RIGHT JOIN table2 ON table1.id = table2.id"); - auto res = - exec.execute(*stmt, "SELECT * FROM table1 RIGHT JOIN table2 ON table1.id = table2.id"); + // For RIGHT join, bloom filter is skipped (should be 0) + // Even though we returned valid bloom bits, the coordinator should not push for outer joins + EXPECT_EQ(bloom_push_calls.load(), 0); + // Verify the query succeeded + EXPECT_TRUE(res.success()); - // Should fail because distributed shuffle join only supports INNER and LEFT joins - EXPECT_FALSE(res.success()); - EXPECT_THAT(res.error(), testing::HasSubstr("only supports INNER and LEFT joins")); - EXPECT_THAT(res.error(), testing::HasSubstr("RIGHT")); + node1.stop(); + node2.stop(); } -TEST(DistributedExecutorTests, FullJoinRejection) { +TEST(DistributedExecutorTests, Phase3SkippedForRightJoin) { + // Test that Phase 3 (UnmatchedRowsReport) is NOT called for RIGHT JOIN + // because the local executor on each data node already handles unmatched right rows. + // Phase 3-5 is only needed for FULL JOIN to collect unmatched LEFT rows. + + RpcServer node1(7870); + RpcServer node2(7871); + + std::atomic unmatched_report_calls{0}; + std::atomic fetch_unmatched_calls{0}; + + // Handler for ExecuteFragment that returns proper schema and rows + auto execute_fragment_handler = [&](const RpcHeader& h, const std::vector& p, int fd) { + (void)h; + auto args = ExecuteFragmentArgs::deserialize(p); + QueryResultsReply reply; + reply.success = true; + + // Build schema for joined result: table1(id, val), table2(id, val) + Schema schema; + schema.add_column("id", common::ValueType::TYPE_INT64); + schema.add_column("val", common::ValueType::TYPE_INT64); + schema.add_column("id", common::ValueType::TYPE_INT64); // table2.id (ambiguous but matches) + schema.add_column("val", common::ValueType::TYPE_INT64); // table2.val + reply.schema = schema; + + // Return some matched rows (e.g., rows where table2.id = 1 and 2 matched) + // Format: {table1.id, table1.val, table2.id, table2.val} + reply.rows = { + Tuple{Value::make_int64(100), Value::make_int64(10), Value::make_int64(1), Value::make_int64(100)}, + Tuple{Value::make_int64(200), Value::make_int64(20), Value::make_int64(2), Value::make_int64(200)}, + }; + + auto resp_p = reply.serialize(); + RpcHeader resp_h; + resp_h.type = RpcType::QueryResults; + resp_h.payload_len = static_cast(resp_p.size()); + char h_buf[RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast(send(fd, h_buf, RpcHeader::HEADER_SIZE, 0)); + static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); + }; + + // Handler for UnmatchedRowsReport - tracks calls (should be 0 for RIGHT JOIN) + auto unmatched_report_handler = [&](const RpcHeader& h, const std::vector& p, int fd) { + (void)h; + auto args = UnmatchedRowsReportArgs::deserialize(p); + unmatched_report_calls++; + + UnmatchedRowsReportArgs reply; + reply.context_id = args.context_id; + reply.right_table = args.right_table; + reply.unmatched_keys = {"3"}; + + auto resp_p = reply.serialize(); + RpcHeader resp_h; + resp_h.type = RpcType::QueryResults; + resp_h.payload_len = static_cast(resp_p.size()); + char h_buf[RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast(send(fd, h_buf, RpcHeader::HEADER_SIZE, 0)); + static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); + }; + + // Handler for FetchUnmatchedRows - tracks calls (should be 0 for RIGHT JOIN) + auto fetch_unmatched_handler = [&](const RpcHeader& h, const std::vector& p, int fd) { + (void)h; + auto args = FetchUnmatchedRowsArgs::deserialize(p); + fetch_unmatched_calls++; + + UnmatchedRowsPushArgs reply; + reply.context_id = args.context_id; + reply.unmatched_rows = { + Tuple{Value::make_null(), Value::make_null(), Value::make_int64(3), Value::make_int64(30)}}; + + auto resp_p = reply.serialize(); + RpcHeader resp_h; + resp_h.type = RpcType::QueryResults; + resp_h.payload_len = static_cast(resp_p.size()); + char h_buf[RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast(send(fd, h_buf, RpcHeader::HEADER_SIZE, 0)); + static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); + }; + + // Regular handler for other RPCs + auto basic_handler = [&](const RpcHeader& h, const std::vector& p, int fd) { + (void)p; + QueryResultsReply reply; + reply.success = true; + + auto resp_p = reply.serialize(); + RpcHeader resp_h; + resp_h.type = RpcType::QueryResults; + resp_h.payload_len = static_cast(resp_p.size()); + char h_buf[RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast(send(fd, h_buf, RpcHeader::HEADER_SIZE, 0)); + static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); + }; + + // BloomFilterBits handler + auto bloom_bits_handler = [&](const RpcHeader& h, const std::vector& p, int fd) { + (void)h; + auto args = BloomFilterBitsArgs::deserialize(p); + BloomFilterBitsArgs reply_args; + reply_args.context_id = args.context_id; + reply_args.filter_data = {0xFF, 0xFF}; + reply_args.expected_elements = 100; + reply_args.num_hashes = 4; + + auto resp_p = reply_args.serialize(); + RpcHeader resp_h; + resp_h.type = RpcType::QueryResults; + resp_h.payload_len = static_cast(resp_p.size()); + char h_buf[RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast(send(fd, h_buf, RpcHeader::HEADER_SIZE, 0)); + static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); + }; + + // Set up handlers for node1 + node1.set_handler(RpcType::ExecuteFragment, execute_fragment_handler); + node1.set_handler(RpcType::ShuffleFragment, basic_handler); + node1.set_handler(RpcType::PushData, basic_handler); + node1.set_handler(RpcType::BloomFilterPush, basic_handler); + node1.set_handler(RpcType::BloomFilterBits, bloom_bits_handler); + node1.set_handler(RpcType::UnmatchedRowsReport, unmatched_report_handler); + node1.set_handler(RpcType::FetchUnmatchedRows, fetch_unmatched_handler); + + // Set up handlers for node2 + node2.set_handler(RpcType::ExecuteFragment, execute_fragment_handler); + node2.set_handler(RpcType::ShuffleFragment, basic_handler); + node2.set_handler(RpcType::PushData, basic_handler); + node2.set_handler(RpcType::BloomFilterPush, basic_handler); + node2.set_handler(RpcType::BloomFilterBits, bloom_bits_handler); + node2.set_handler(RpcType::UnmatchedRowsReport, unmatched_report_handler); + node2.set_handler(RpcType::FetchUnmatchedRows, fetch_unmatched_handler); + + ASSERT_TRUE(node1.start()); + ASSERT_TRUE(node2.start()); + auto catalog = Catalog::create(); const config::Config config; ClusterManager cm(&config); - cm.register_node("n1", "127.0.0.1", 7800, config::RunMode::Data); + cm.register_node("n1", "127.0.0.1", 7870, config::RunMode::Data); + cm.register_node("n2", "127.0.0.1", 7871, config::RunMode::Data); DistributedExecutor exec(*catalog, cm); - auto lexer = - std::make_unique("SELECT * FROM table1 FULL JOIN table2 ON table1.id = table2.id"); + // Execute RIGHT join + auto lexer = std::make_unique( + "SELECT * FROM table1 RIGHT JOIN table2 ON table1.id = table2.id"); Parser parser(std::move(lexer)); auto stmt = parser.parse_statement(); + auto res = exec.execute(*stmt, "SELECT * FROM table1 RIGHT JOIN table2 ON table1.id = table2.id"); - auto res = - exec.execute(*stmt, "SELECT * FROM table1 FULL JOIN table2 ON table1.id = table2.id"); + // Verify Phase 3-4 RPCs were NOT called for RIGHT JOIN + // (local executor handles unmatched right rows correctly) + EXPECT_EQ(unmatched_report_calls.load(), 0); // NOT called for RIGHT JOIN + EXPECT_EQ(fetch_unmatched_calls.load(), 0); // NOT called for RIGHT JOIN + EXPECT_TRUE(res.success()); - // Should fail because distributed shuffle join only supports INNER and LEFT joins - EXPECT_FALSE(res.success()); - EXPECT_THAT(res.error(), testing::HasSubstr("only supports INNER and LEFT joins")); - EXPECT_THAT(res.error(), testing::HasSubstr("FULL")); + node1.stop(); + node2.stop(); } } // namespace From 654d198c7df7d2c1f3758b8cf78026edb21267aa Mon Sep 17 00:00:00 2001 From: poyrazK <83272398+poyrazK@users.noreply.github.com> Date: Wed, 15 Apr 2026 15:28:46 +0000 Subject: [PATCH 10/10] style: automated clang-format fixes --- include/common/cluster_manager.hpp | 2 +- include/network/rpc_message.hpp | 11 +-- src/distributed/distributed_executor.cpp | 86 ++++++++++++------------ src/main.cpp | 34 ++++++---- tests/distributed_tests.cpp | 29 ++++---- 5 files changed, 88 insertions(+), 74 deletions(-) diff --git a/include/common/cluster_manager.hpp b/include/common/cluster_manager.hpp index 6cffdbf4..b86b72fc 100644 --- a/include/common/cluster_manager.hpp +++ b/include/common/cluster_manager.hpp @@ -375,7 +375,7 @@ class ClusterManager { * @brief Store unmatched rows for a context (used by outer join processing) */ void set_unmatched_rows(const std::string& context_id, const std::string& table_name, - std::vector rows) { + std::vector rows) { const std::scoped_lock lock(mutex_); unmatched_rows_[context_id][table_name] = std::move(rows); } diff --git a/include/network/rpc_message.hpp b/include/network/rpc_message.hpp index a1ff22e7..5bffba52 100644 --- a/include/network/rpc_message.hpp +++ b/include/network/rpc_message.hpp @@ -36,8 +36,8 @@ enum class RpcType : uint8_t { BloomFilterPush = 11, BloomFilterBits = 12, UnmatchedRowsReport = 13, // Data node reports unmatched right rows for outer join - UnmatchedRowsPush = 14, // Coordinator sends unmatched rows for NULL-padding - FetchUnmatchedRows = 15, // Coordinator fetches stored unmatched rows from data node + UnmatchedRowsPush = 14, // Coordinator sends unmatched rows for NULL-padding + FetchUnmatchedRows = 15, // Coordinator fetches stored unmatched rows from data node Error = 255 }; @@ -576,9 +576,9 @@ struct BloomFilterBitsArgs { struct UnmatchedRowsReportArgs { std::string context_id; std::string right_table; - std::string join_key_col; // Which column was the join key + std::string join_key_col; // Which column was the join key std::vector unmatched_keys; // Key values that had no match - uint32_t left_column_count = 0; // Number of left table columns for NULL-padding + uint32_t left_column_count = 0; // Number of left table columns for NULL-padding [[nodiscard]] std::vector serialize() const { std::vector out; @@ -669,7 +669,8 @@ struct UnmatchedRowsPushArgs { } for (uint32_t i = 0; i < count; ++i) { - args.unmatched_rows.push_back(Serializer::deserialize_tuple(in.data(), offset, in.size())); + args.unmatched_rows.push_back( + Serializer::deserialize_tuple(in.data(), offset, in.size())); } return args; } diff --git a/src/distributed/distributed_executor.cpp b/src/distributed/distributed_executor.cpp index 478fa76a..36f8a190 100644 --- a/src/distributed/distributed_executor.cpp +++ b/src/distributed/distributed_executor.cpp @@ -202,7 +202,9 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt, join.type != parser::SelectStatement::JoinType::Right && join.type != parser::SelectStatement::JoinType::Full) { QueryResult res; - res.set_error("Distributed Shuffle Join only supports INNER, LEFT, RIGHT, and FULL joins"); + res.set_error( + "Distributed Shuffle Join only supports INNER, LEFT, RIGHT, and FULL " + "joins"); return res; } @@ -654,33 +656,31 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt, std::vector>> report_futures; for (const auto& node : data_nodes) { - report_futures.push_back(std::async(std::launch::async, [node, context_id, - outer_join_right_table, - outer_join_right_key, - matched_keys, - left_column_count]() { - network::RpcClient client(node.address, node.cluster_port); - network::UnmatchedRowsReportArgs reply; - reply.context_id = context_id; - if (client.connect()) { - network::UnmatchedRowsReportArgs report_args; - report_args.context_id = context_id; - report_args.right_table = outer_join_right_table; - report_args.join_key_col = outer_join_right_key; - // Attach matched keys so node knows what was matched - report_args.unmatched_keys = matched_keys; - // Attach left column count for NULL-padding - report_args.left_column_count = left_column_count; + report_futures.push_back(std::async( + std::launch::async, [node, context_id, outer_join_right_table, outer_join_right_key, + matched_keys, left_column_count]() { + network::RpcClient client(node.address, node.cluster_port); + network::UnmatchedRowsReportArgs reply; + reply.context_id = context_id; + if (client.connect()) { + network::UnmatchedRowsReportArgs report_args; + report_args.context_id = context_id; + report_args.right_table = outer_join_right_table; + report_args.join_key_col = outer_join_right_key; + // Attach matched keys so node knows what was matched + report_args.unmatched_keys = matched_keys; + // Attach left column count for NULL-padding + report_args.left_column_count = left_column_count; - std::vector resp; - if (client.call(network::RpcType::UnmatchedRowsReport, - report_args.serialize(), resp)) { - reply = network::UnmatchedRowsReportArgs::deserialize(resp); - return std::make_pair(true, reply); + std::vector resp; + if (client.call(network::RpcType::UnmatchedRowsReport, + report_args.serialize(), resp)) { + reply = network::UnmatchedRowsReportArgs::deserialize(resp); + return std::make_pair(true, reply); + } } - } - return std::make_pair(false, reply); - })); + return std::make_pair(false, reply); + })); } // Wait for all report futures to complete @@ -692,25 +692,25 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt, std::vector>>> fetch_futures; for (const auto& node : data_nodes) { - fetch_futures.push_back(std::async(std::launch::async, [node, context_id, - outer_join_right_table]() { - network::RpcClient client(node.address, node.cluster_port); - std::vector rows; - if (client.connect()) { - network::FetchUnmatchedRowsArgs fetch_args; - fetch_args.context_id = context_id; - fetch_args.table_name = outer_join_right_table; + fetch_futures.push_back( + std::async(std::launch::async, [node, context_id, outer_join_right_table]() { + network::RpcClient client(node.address, node.cluster_port); + std::vector rows; + if (client.connect()) { + network::FetchUnmatchedRowsArgs fetch_args; + fetch_args.context_id = context_id; + fetch_args.table_name = outer_join_right_table; - std::vector resp; - if (client.call(network::RpcType::FetchUnmatchedRows, - fetch_args.serialize(), resp)) { - auto reply = network::UnmatchedRowsPushArgs::deserialize(resp); - rows = std::move(reply.unmatched_rows); - return std::make_pair(true, std::move(rows)); + std::vector resp; + if (client.call(network::RpcType::FetchUnmatchedRows, + fetch_args.serialize(), resp)) { + auto reply = network::UnmatchedRowsPushArgs::deserialize(resp); + rows = std::move(reply.unmatched_rows); + return std::make_pair(true, std::move(rows)); + } } - } - return std::make_pair(false, std::move(rows)); - })); + return std::make_pair(false, std::move(rows)); + })); } // Aggregate all unmatched rows from all nodes diff --git a/src/main.cpp b/src/main.cpp index d9e659cb..cd1fee33 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -745,28 +745,35 @@ int main(int argc, char* argv[]) { const auto& key_val = t_meta.tuple.get(key_idx); std::string key_str = key_val.to_string(); // Only include if NOT in matched keys - if (matched_keys_set.find(key_str) == matched_keys_set.end()) { + if (matched_keys_set.find(key_str) == + matched_keys_set.end()) { reply.unmatched_keys.push_back(key_str); - // Pad with NULLs for left columns and append right row + // Pad with NULLs for left columns and append right + // row std::vector padded_values; - padded_values.reserve(args.left_column_count + t_meta.tuple.size()); + padded_values.reserve(args.left_column_count + + t_meta.tuple.size()); // Prepend NULLs for left table columns - for (uint32_t i = 0; i < args.left_column_count; ++i) { - padded_values.push_back(cloudsql::common::Value::make_null()); + for (uint32_t i = 0; i < args.left_column_count; + ++i) { + padded_values.push_back( + cloudsql::common::Value::make_null()); } // Append right table column values for (size_t j = 0; j < t_meta.tuple.size(); ++j) { padded_values.push_back(t_meta.tuple.get(j)); } - unmatched_tuples.emplace_back(std::move(padded_values)); + unmatched_tuples.emplace_back( + std::move(padded_values)); } } } - // Store properly padded tuples in ClusterManager for coordinator to collect + // Store properly padded tuples in ClusterManager for + // coordinator to collect if (cluster_manager != nullptr && !unmatched_tuples.empty()) { - cluster_manager->set_unmatched_rows(args.context_id, - args.right_table, - std::move(unmatched_tuples)); + cluster_manager->set_unmatched_rows( + args.context_id, args.right_table, + std::move(unmatched_tuples)); } } } @@ -786,7 +793,8 @@ int main(int argc, char* argv[]) { }); // Handler for receiving unmatched rows from coordinator for NULL-padding emission - // Coordinator broadcasts unmatched right rows to all nodes for final result assembly + // Coordinator broadcasts unmatched right rows to all nodes for final result + // assembly rpc_server->set_handler( cloudsql::network::RpcType::UnmatchedRowsPush, [&](const cloudsql::network::RpcHeader& h, const std::vector& p, @@ -796,8 +804,8 @@ int main(int argc, char* argv[]) { if (cluster_manager != nullptr && !args.unmatched_rows.empty()) { cluster_manager->buffer_shuffle_data(args.context_id, - "_unmatched_right_rows", - std::move(args.unmatched_rows)); + "_unmatched_right_rows", + std::move(args.unmatched_rows)); } cloudsql::network::QueryResultsReply reply; diff --git a/tests/distributed_tests.cpp b/tests/distributed_tests.cpp index b347fe38..875c6827 100644 --- a/tests/distributed_tests.cpp +++ b/tests/distributed_tests.cpp @@ -577,11 +577,12 @@ TEST(DistributedExecutorTests, BloomFilterSkipForOuterJoin) { DistributedExecutor exec(*catalog, cm); // Execute RIGHT join - bloom filter should NOT be sent - auto lexer = std::make_unique( - "SELECT * FROM table1 RIGHT JOIN table2 ON table1.id = table2.id"); + auto lexer = + std::make_unique("SELECT * FROM table1 RIGHT JOIN table2 ON table1.id = table2.id"); Parser parser(std::move(lexer)); auto stmt = parser.parse_statement(); - auto res = exec.execute(*stmt, "SELECT * FROM table1 RIGHT JOIN table2 ON table1.id = table2.id"); + auto res = + exec.execute(*stmt, "SELECT * FROM table1 RIGHT JOIN table2 ON table1.id = table2.id"); // For RIGHT join, bloom filter is skipped (should be 0) // Even though we returned valid bloom bits, the coordinator should not push for outer joins @@ -615,15 +616,18 @@ TEST(DistributedExecutorTests, Phase3SkippedForRightJoin) { Schema schema; schema.add_column("id", common::ValueType::TYPE_INT64); schema.add_column("val", common::ValueType::TYPE_INT64); - schema.add_column("id", common::ValueType::TYPE_INT64); // table2.id (ambiguous but matches) + schema.add_column("id", + common::ValueType::TYPE_INT64); // table2.id (ambiguous but matches) schema.add_column("val", common::ValueType::TYPE_INT64); // table2.val reply.schema = schema; // Return some matched rows (e.g., rows where table2.id = 1 and 2 matched) // Format: {table1.id, table1.val, table2.id, table2.val} reply.rows = { - Tuple{Value::make_int64(100), Value::make_int64(10), Value::make_int64(1), Value::make_int64(100)}, - Tuple{Value::make_int64(200), Value::make_int64(20), Value::make_int64(2), Value::make_int64(200)}, + Tuple{Value::make_int64(100), Value::make_int64(10), Value::make_int64(1), + Value::make_int64(100)}, + Tuple{Value::make_int64(200), Value::make_int64(20), Value::make_int64(2), + Value::make_int64(200)}, }; auto resp_p = reply.serialize(); @@ -665,8 +669,8 @@ TEST(DistributedExecutorTests, Phase3SkippedForRightJoin) { UnmatchedRowsPushArgs reply; reply.context_id = args.context_id; - reply.unmatched_rows = { - Tuple{Value::make_null(), Value::make_null(), Value::make_int64(3), Value::make_int64(30)}}; + reply.unmatched_rows = {Tuple{Value::make_null(), Value::make_null(), Value::make_int64(3), + Value::make_int64(30)}}; auto resp_p = reply.serialize(); RpcHeader resp_h; @@ -743,16 +747,17 @@ TEST(DistributedExecutorTests, Phase3SkippedForRightJoin) { DistributedExecutor exec(*catalog, cm); // Execute RIGHT join - auto lexer = std::make_unique( - "SELECT * FROM table1 RIGHT JOIN table2 ON table1.id = table2.id"); + auto lexer = + std::make_unique("SELECT * FROM table1 RIGHT JOIN table2 ON table1.id = table2.id"); Parser parser(std::move(lexer)); auto stmt = parser.parse_statement(); - auto res = exec.execute(*stmt, "SELECT * FROM table1 RIGHT JOIN table2 ON table1.id = table2.id"); + auto res = + exec.execute(*stmt, "SELECT * FROM table1 RIGHT JOIN table2 ON table1.id = table2.id"); // Verify Phase 3-4 RPCs were NOT called for RIGHT JOIN // (local executor handles unmatched right rows correctly) EXPECT_EQ(unmatched_report_calls.load(), 0); // NOT called for RIGHT JOIN - EXPECT_EQ(fetch_unmatched_calls.load(), 0); // NOT called for RIGHT JOIN + EXPECT_EQ(fetch_unmatched_calls.load(), 0); // NOT called for RIGHT JOIN EXPECT_TRUE(res.success()); node1.stop();