diff --git a/docs/performance/SQLITE_COMPARISON.md b/docs/performance/SQLITE_COMPARISON.md index 6df8830a..0c53bf0a 100644 --- a/docs/performance/SQLITE_COMPARISON.md +++ b/docs/performance/SQLITE_COMPARISON.md @@ -45,34 +45,37 @@ We addressed the gaps via the following optimizations: Distributed shuffle joins send **all tuples** across the network to partitioned nodes, even when many will never match. This causes unnecessary network traffic and buffer memory usage. ### Solution: Bloom Filter Integration -Implemented bloom filters to filter tuples at the source before network transmission: -- **One-sided bloom filter**: Built from the left/build table, applied to filter the right/probe table -- **Distributed construction**: Each data node constructs its local bloom during the left/build scan phase -- **Coordinator coordination**: `BloomFilterPush` RPC broadcasts filter metadata to all nodes before the right/probe shuffle +Implemented bloom filters to filter tuples at the source before network transmission using a 3-phase approach: +- **Phase 1 (Local Build)**: Each data node scans its local left/build table partition, extracts join key values, and builds a local bloom filter +- **Phase 2 (Bit Aggregation)**: Coordinator sends `BloomFilterBits` RPC to each data node; each responds with local bloom bits; coordinator OR-aggregates all bits into a single filter +- **Phase 3 (Sender-Side Filter)**: Coordinator broadcasts aggregated filter via `BloomFilterPush` RPC; before sending right/probe tuples, `ShuffleFragment` handler checks `might_contain()` and skips tuples that will definitely not match ### Architecture ``` -[Phase 1: Shuffle Left] [Phase 2: Shuffle Right] - | | - v v -Build local bloom Apply bloom filter -from join keys before buffering - | | - +---- BloomFilterPush ----->---+ - (filter metadata) | - v - Filtered tuples buffered +Phase 1: Scan Left Phase 2: Aggregate Bits Phase 3: Filter Right + | | | + v v v +Build local bloom <---> BloomFilterBits RPC <-------- Aggregate & Broadcast +on each data node (OR-aggregate bits) via BloomFilterPush + | | | + | v v + +-----------------> BloomFilterPush might_contain() check + (metadata only) | before PushData + | + v + Filtered tuples buffered ``` ### Key Components | Component | Location | Purpose | |-----------|----------|---------| | `BloomFilter` class | `include/common/bloom_filter.hpp` | MurmurHash3-based bloom filter | -| `BloomFilterArgs` RPC | `include/network/rpc_message.hpp` | Serialization for network transfer | -| `ClusterManager` storage | `include/common/cluster_manager.hpp` | Stores bloom filter per context | -| `PushData` handler | `src/main.cpp` | Receives and buffers filtered tuples | -| `ShuffleFragment` handler | `src/main.cpp` | Applies bloom filter before sending | -| Coordinator | `src/distributed/distributed_executor.cpp` | Broadcasts filter after Phase 1 | +| `BloomFilterBitsArgs` RPC | `include/network/rpc_message.hpp` | Local bloom bits from data nodes | +| `BloomFilterArgs` RPC | `include/network/rpc_message.hpp` | Aggregated filter broadcast | +| `ClusterManager` storage | `include/common/cluster_manager.hpp` | Stores local and aggregated bloom filters | +| `BloomFilterBits` handler | `src/main.cpp` | Returns local bloom bits to coordinator | +| `ShuffleFragment` handler | `src/main.cpp` | Builds local bloom during Phase 1 scan | +| Coordinator | `src/distributed/distributed_executor.cpp` | Collects bits, aggregates, broadcasts filter | ### Test Coverage - 10 unit tests covering: BloomFilter class, BloomFilterArgs serialization, ClusterManager storage, filter application logic diff --git a/docs/phases/PHASE_6_DISTRIBUTED_JOIN.md b/docs/phases/PHASE_6_DISTRIBUTED_JOIN.md index 9c6fdfa3..a89eed68 100644 --- a/docs/phases/PHASE_6_DISTRIBUTED_JOIN.md +++ b/docs/phases/PHASE_6_DISTRIBUTED_JOIN.md @@ -28,8 +28,9 @@ Seamlessly integrated shuffle buffers into the Volcano execution model. ### 5. Bloom Filter Optimization (`common/bloom_filter.hpp`) Added probabilistic filtering to reduce network traffic in shuffle joins. - **MurmurHash3-based BloomFilter**: Configurable false positive rate (default 1%) with optimal bit count and hash function calculation. -- **Filter Construction**: Built during Phase 1 scan, stored in `ClusterManager` per context. -- **Filter Application**: `PushData` handler checks `might_contain()` before buffering, skipping tuples that will definitely not match. +- **Distributed Construction**: Each data node builds a local bloom filter from its left/build table partition during Phase 1 scan. +- **Bit Aggregation**: Coordinator collects local bloom bits from all data nodes via `BloomFilterBits` RPC and OR-aggregates them into a single filter. +- **Sender-Side Filtering**: Aggregated filter is broadcast via `BloomFilterPush` before Phase 2; `ShuffleFragment` handler applies `might_contain()` before sending `PushData`, skipping tuples that will definitely not match. ## Lessons Learned - Shuffle joins significantly reduce network traffic compared to broadcast joins for large-to-large table joins. diff --git a/include/common/cluster_manager.hpp b/include/common/cluster_manager.hpp index 4b3ef244..b86b72fc 100644 --- a/include/common/cluster_manager.hpp +++ b/include/common/cluster_manager.hpp @@ -279,12 +279,129 @@ class ClusterManager { return ""; } + /** + * @brief Store local bloom filter bits from this node (called on data nodes) + */ + void set_local_bloom_bits(const std::string& context_id, std::vector bits, + size_t expected_elements, size_t num_hashes) { + const std::scoped_lock lock(mutex_); + local_bloom_bits_[context_id] = std::move(bits); + local_expected_elements_map_[context_id] = expected_elements; + local_num_hashes_map_[context_id] = num_hashes; + } + + /** + * @brief Get stored local bloom filter bits for a context + */ + [[nodiscard]] std::vector get_local_bloom_bits(const std::string& context_id) const { + const std::scoped_lock lock(mutex_); + auto it = local_bloom_bits_.find(context_id); + if (it != local_bloom_bits_.end()) { + return it->second; + } + return {}; + } + + /** + * @brief Get expected_elements for local bloom filter + */ + [[nodiscard]] size_t get_local_expected_elements(const std::string& context_id) const { + const std::scoped_lock lock(mutex_); + auto it = local_expected_elements_map_.find(context_id); + if (it != local_expected_elements_map_.end()) { + return it->second; + } + return 0; + } + + /** + * @brief Get num_hashes for local bloom filter + */ + [[nodiscard]] size_t get_local_num_hashes(const std::string& context_id) const { + const std::scoped_lock lock(mutex_); + auto it = local_num_hashes_map_.find(context_id); + if (it != local_num_hashes_map_.end()) { + return it->second; + } + return 0; + } + /** * @brief Clear bloom filter for a context */ void clear_bloom_filter(const std::string& context_id) { const std::scoped_lock lock(mutex_); bloom_filters_.erase(context_id); + local_bloom_bits_.erase(context_id); + local_expected_elements_map_.erase(context_id); + local_num_hashes_map_.erase(context_id); + } + + /** + * @brief Store local right table rows for outer join processing + * Called during Phase 2 shuffle when sending right table rows to other nodes + */ + void set_local_right_rows(const std::string& context_id, const std::string& table_name, + std::vector rows) { + const std::scoped_lock lock(mutex_); + local_right_table_rows_[context_id][table_name] = std::move(rows); + } + + /** + * @brief Get stored local right table rows + */ + [[nodiscard]] std::vector get_local_right_rows( + const std::string& context_id, const std::string& table_name) const { + const std::scoped_lock lock(mutex_); + auto ctx_it = local_right_table_rows_.find(context_id); + if (ctx_it != local_right_table_rows_.end()) { + auto table_it = ctx_it->second.find(table_name); + if (table_it != ctx_it->second.end()) { + return table_it->second; + } + } + return {}; + } + + /** + * @brief Clear local right table rows for a context + */ + void clear_local_right_rows(const std::string& context_id) { + const std::scoped_lock lock(mutex_); + local_right_table_rows_.erase(context_id); + } + + /** + * @brief Store unmatched rows for a context (used by outer join processing) + */ + void set_unmatched_rows(const std::string& context_id, const std::string& table_name, + std::vector rows) { + const std::scoped_lock lock(mutex_); + unmatched_rows_[context_id][table_name] = std::move(rows); + } + + /** + * @brief Get stored unmatched rows for a context + */ + [[nodiscard]] std::vector get_unmatched_rows( + const std::string& context_id, const std::string& table_name) const { + const std::scoped_lock lock(mutex_); + auto ctx_it = unmatched_rows_.find(context_id); + if (ctx_it != unmatched_rows_.end()) { + auto table_it = ctx_it->second.find(table_name); + if (table_it != ctx_it->second.end()) { + return table_it->second; + } + } + return {}; + } + + /** + * @brief Clear unmatched rows for a context + */ + void clear_unmatched_rows(const std::string& context_id) { + const std::scoped_lock lock(mutex_); + unmatched_rows_.erase(context_id); } private: @@ -311,6 +428,16 @@ class ClusterManager { shuffle_buffers_; /* context_id -> bloom filter data */ std::unordered_map bloom_filters_; + /* context_id -> local bloom filter bits (for aggregation during distributed build) */ + std::unordered_map> local_bloom_bits_; + std::unordered_map local_expected_elements_map_; + std::unordered_map local_num_hashes_map_; + /* context_id -> table_name -> local right table rows for outer join tracking */ + std::unordered_map>> + local_right_table_rows_; + /* context_id -> table_name -> unmatched rows for outer join NULL-padding */ + std::unordered_map>> + unmatched_rows_; mutable std::mutex mutex_; }; diff --git a/include/executor/operator.hpp b/include/executor/operator.hpp index fb1a1648..e8a9bc19 100644 --- a/include/executor/operator.hpp +++ b/include/executor/operator.hpp @@ -358,6 +358,18 @@ class HashJoinOperator : public Operator { void set_memory_resource(std::pmr::memory_resource* mr) override; void set_params(const std::vector* params) override; + + /** + * @brief Get unmatched right rows after join execution + * @return Vector of tuples - the right-side rows that had no match + */ + [[nodiscard]] std::vector get_unmatched_right_rows() const; + + /** + * @brief Get join key values of unmatched right rows + * @return Vector of strings - the join key values for unmatched right rows + */ + [[nodiscard]] std::vector get_unmatched_right_keys() const; }; /** diff --git a/include/network/rpc_message.hpp b/include/network/rpc_message.hpp index 4dce850d..5bffba52 100644 --- a/include/network/rpc_message.hpp +++ b/include/network/rpc_message.hpp @@ -34,6 +34,10 @@ enum class RpcType : uint8_t { PushData = 9, ShuffleFragment = 10, BloomFilterPush = 11, + BloomFilterBits = 12, + UnmatchedRowsReport = 13, // Data node reports unmatched right rows for outer join + UnmatchedRowsPush = 14, // Coordinator sends unmatched rows for NULL-padding + FetchUnmatchedRows = 15, // Coordinator fetches stored unmatched rows from data node Error = 255 }; @@ -507,6 +511,195 @@ struct BloomFilterArgs { } }; +/** + * @brief Arguments for sending local bloom filter bits from data node to coordinator + * Used during Phase 1 to collect and aggregate bloom filters from all nodes + */ +struct BloomFilterBitsArgs { + std::string context_id; + std::vector filter_data; + size_t expected_elements = 0; + size_t num_hashes = 0; + + [[nodiscard]] std::vector serialize() const { + std::vector out; + Serializer::serialize_string(context_id, out); + + // Serialize filter data (blob) + const uint32_t filter_len = static_cast(filter_data.size()); + const size_t off = out.size(); + out.resize(off + Serializer::VAL_SIZE_32); + std::memcpy(out.data() + off, &filter_len, Serializer::VAL_SIZE_32); + out.insert(out.end(), filter_data.begin(), filter_data.end()); + + // Serialize metadata + uint64_t tmp_expected = static_cast(expected_elements); + uint8_t tmp_hashes = static_cast(num_hashes); + const size_t off2 = out.size(); + out.resize(off2 + 9); // 8 bytes for expected_elements + 1 for num_hashes + std::memcpy(out.data() + off2, &tmp_expected, 8); + out[off2 + 8] = tmp_hashes; + return out; + } + + static BloomFilterBitsArgs deserialize(const std::vector& in) { + BloomFilterBitsArgs args; + size_t offset = 0; + args.context_id = Serializer::deserialize_string(in.data(), offset, in.size()); + + uint32_t filter_len = 0; + if (offset + Serializer::VAL_SIZE_32 <= in.size()) { + std::memcpy(&filter_len, in.data() + offset, Serializer::VAL_SIZE_32); + offset += Serializer::VAL_SIZE_32; + } + if (offset + filter_len <= in.size()) { + args.filter_data.resize(filter_len); + std::memcpy(args.filter_data.data(), in.data() + offset, filter_len); + offset += filter_len; + } + + if (offset + 9 <= in.size()) { + uint64_t tmp_expected = 0; + std::memcpy(&tmp_expected, in.data() + offset, 8); + args.expected_elements = static_cast(tmp_expected); + offset += 8; + args.num_hashes = static_cast(in[offset]); + } + return args; + } +}; + +/** + * @brief Arguments for UnmatchedRowsReport RPC + * @note Data node reports unmatched right row keys to coordinator after local join + */ +struct UnmatchedRowsReportArgs { + std::string context_id; + std::string right_table; + std::string join_key_col; // Which column was the join key + std::vector unmatched_keys; // Key values that had no match + uint32_t left_column_count = 0; // Number of left table columns for NULL-padding + + [[nodiscard]] std::vector serialize() const { + std::vector out; + Serializer::serialize_string(context_id, out); + Serializer::serialize_string(right_table, out); + Serializer::serialize_string(join_key_col, out); + + // Serialize left column count + const uint32_t left_count = left_column_count; + const size_t lc_off = out.size(); + out.resize(lc_off + Serializer::VAL_SIZE_32); + std::memcpy(out.data() + lc_off, &left_count, Serializer::VAL_SIZE_32); + + // Serialize unmatched keys count + const uint32_t count = static_cast(unmatched_keys.size()); + const size_t off = out.size(); + out.resize(off + Serializer::VAL_SIZE_32); + std::memcpy(out.data() + off, &count, Serializer::VAL_SIZE_32); + + // Serialize each key + for (const auto& key : unmatched_keys) { + Serializer::serialize_string(key, out); + } + return out; + } + + static UnmatchedRowsReportArgs deserialize(const std::vector& in) { + UnmatchedRowsReportArgs args; + size_t offset = 0; + args.context_id = Serializer::deserialize_string(in.data(), offset, in.size()); + args.right_table = Serializer::deserialize_string(in.data(), offset, in.size()); + args.join_key_col = Serializer::deserialize_string(in.data(), offset, in.size()); + + // Deserialize left column count + if (offset + Serializer::VAL_SIZE_32 <= in.size()) { + std::memcpy(&args.left_column_count, in.data() + offset, Serializer::VAL_SIZE_32); + offset += Serializer::VAL_SIZE_32; + } + + uint32_t count = 0; + if (offset + Serializer::VAL_SIZE_32 <= in.size()) { + std::memcpy(&count, in.data() + offset, Serializer::VAL_SIZE_32); + offset += Serializer::VAL_SIZE_32; + } + + for (uint32_t i = 0; i < count; ++i) { + args.unmatched_keys.push_back( + Serializer::deserialize_string(in.data(), offset, in.size())); + } + return args; + } +}; + +/** + * @brief Arguments for UnmatchedRowsPush RPC + * @note Coordinator sends unmatched rows to data nodes for NULL-padding + */ +struct UnmatchedRowsPushArgs { + std::string context_id; + std::vector unmatched_rows; // Right rows needing NULL padding + + [[nodiscard]] std::vector serialize() const { + std::vector out; + Serializer::serialize_string(context_id, out); + + // Serialize row count + const uint32_t count = static_cast(unmatched_rows.size()); + const size_t off = out.size(); + out.resize(off + Serializer::VAL_SIZE_32); + std::memcpy(out.data() + off, &count, Serializer::VAL_SIZE_32); + + // Serialize each row + for (const auto& row : unmatched_rows) { + Serializer::serialize_tuple(row, out); + } + return out; + } + + static UnmatchedRowsPushArgs deserialize(const std::vector& in) { + UnmatchedRowsPushArgs args; + size_t offset = 0; + args.context_id = Serializer::deserialize_string(in.data(), offset, in.size()); + + uint32_t count = 0; + if (offset + Serializer::VAL_SIZE_32 <= in.size()) { + std::memcpy(&count, in.data() + offset, Serializer::VAL_SIZE_32); + offset += Serializer::VAL_SIZE_32; + } + + for (uint32_t i = 0; i < count; ++i) { + args.unmatched_rows.push_back( + Serializer::deserialize_tuple(in.data(), offset, in.size())); + } + return args; + } +}; + +/** + * @brief Arguments for FetchUnmatchedRows RPC + * @note Coordinator fetches stored unmatched rows from a data node + */ +struct FetchUnmatchedRowsArgs { + std::string context_id; + std::string table_name; + + [[nodiscard]] std::vector serialize() const { + std::vector out; + Serializer::serialize_string(context_id, out); + Serializer::serialize_string(table_name, out); + return out; + } + + static FetchUnmatchedRowsArgs deserialize(const std::vector& in) { + FetchUnmatchedRowsArgs args; + size_t offset = 0; + args.context_id = Serializer::deserialize_string(in.data(), offset, in.size()); + args.table_name = Serializer::deserialize_string(in.data(), offset, in.size()); + return args; + } +}; + /** * @brief Arguments for TxnPrepare/Commit/Abort RPC */ diff --git a/src/distributed/distributed_executor.cpp b/src/distributed/distributed_executor.cpp index c39deb9d..36f8a190 100644 --- a/src/distributed/distributed_executor.cpp +++ b/src/distributed/distributed_executor.cpp @@ -180,6 +180,13 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt, // Step 2: Advanced Joins: Broadcast or Shuffle Join Orchestration std::string context_id = "ctx_" + std::to_string(next_context_id.fetch_add(1)); + // Variables to track outer join info for Phase 3-5 processing after ExecuteFragment + bool is_outer_join_join_query = false; + std::string outer_join_left_table; + std::string outer_join_right_table; + std::string outer_join_right_key; + parser::SelectStatement::JoinType outer_join_type = parser::SelectStatement::JoinType::Inner; + if (type == parser::StmtType::Select) { const auto* select_stmt = dynamic_cast(&stmt); if (select_stmt != nullptr && !select_stmt->joins().empty()) { @@ -188,7 +195,20 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt, const std::string left_table = select_stmt->from()->to_string(); const std::string right_table = join.table->to_string(); - // Assume join key is in the condition + // Check join type - shuffle join only supports INNER, LEFT, RIGHT, and FULL joins + // RIGHT and FULL joins require special handling for unmatched rows + if (join.type != parser::SelectStatement::JoinType::Inner && + join.type != parser::SelectStatement::JoinType::Left && + join.type != parser::SelectStatement::JoinType::Right && + join.type != parser::SelectStatement::JoinType::Full) { + QueryResult res; + res.set_error( + "Distributed Shuffle Join only supports INNER, LEFT, RIGHT, and FULL " + "joins"); + return res; + } + + // Check for equality join condition std::string left_key; std::string right_key; if (join.condition && join.condition->type() == parser::ExprType::Binary) { @@ -206,6 +226,18 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt, return res; } + bool is_outer_join = (join.type == parser::SelectStatement::JoinType::Right || + join.type == parser::SelectStatement::JoinType::Full); + + // Track outer join info for Phase 3-5 processing after ExecuteFragment + if (is_outer_join) { + is_outer_join_join_query = true; + outer_join_left_table = left_table; + outer_join_right_table = right_table; + outer_join_right_key = right_key; + outer_join_type = join.type; + } + // Phase 1: Instruct nodes to shuffle Left Table network::ShuffleFragmentArgs left_args; left_args.context_id = context_id; @@ -242,32 +274,57 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt, return res; } - // After Phase 1, each node will have received left table data. - // Now broadcast bloom filter built from that data to all nodes for Phase 2 - // filtering. The filter is sent as a separate RPC that data nodes will store and - // apply to their right table shuffle. For now, we send a simple metadata-only - // filter that signals "filtering enabled" - the actual filter building happens on - // each data node during Phase 1 and they stash it for use during Phase 2. - // - // In production, we'd collect and OR all local bloom filters, but for POC - // we just signal that bloom filtering is enabled for this context. - network::BloomFilterArgs bf_args; - bf_args.context_id = context_id; - bf_args.build_table = left_table; - bf_args.probe_table = right_table; - bf_args.probe_key_col = right_key; // Tell probe side which column to filter on - bf_args.filter_data.clear(); // Empty = filter built distributed - bf_args.expected_elements = data_nodes.size() * 1000; // Estimate - bf_args.num_hashes = 4; - auto bf_payload = bf_args.serialize(); + // After Phase 1, collect bloom filter bits from each data node and aggregate + // via bitwise OR to create the combined bloom filter + std::vector aggregated_bits; + size_t total_expected = 0; + size_t max_hashes = 0; for (const auto& node : data_nodes) { network::RpcClient client(node.address, node.cluster_port); if (!client.connect()) { - continue; // Best effort for POC + continue; } + network::BloomFilterBitsArgs bits_args; + bits_args.context_id = context_id; std::vector resp; - client.call(network::RpcType::BloomFilterPush, bf_payload, resp); + if (client.call(network::RpcType::BloomFilterBits, bits_args.serialize(), + resp)) { + auto reply = network::BloomFilterBitsArgs::deserialize(resp); + if (reply.filter_data.size() > aggregated_bits.size()) { + aggregated_bits.resize(reply.filter_data.size(), 0); + } + // Bitwise OR aggregation + for (size_t i = 0; i < reply.filter_data.size(); i++) { + aggregated_bits[i] |= reply.filter_data[i]; + } + total_expected += reply.expected_elements; + max_hashes = std::max(max_hashes, reply.num_hashes); + } + } + + // Broadcast the aggregated bloom filter to all nodes for Phase 2 filtering + // IMPORTANT: Skip bloom filter for RIGHT/FULL joins - bloom filter false negatives + // would cause unmatched right rows to be lost, which violates outer join semantics + if (!is_outer_join && !aggregated_bits.empty()) { + network::BloomFilterArgs bf_args; + bf_args.context_id = context_id; + bf_args.build_table = left_table; + bf_args.probe_table = right_table; + bf_args.probe_key_col = right_key; // Tell probe side which column to filter on + bf_args.filter_data = aggregated_bits; + bf_args.expected_elements = total_expected; + bf_args.num_hashes = max_hashes > 0 ? max_hashes : 4; + auto bf_payload = bf_args.serialize(); + + for (const auto& node : data_nodes) { + network::RpcClient client(node.address, node.cluster_port); + if (!client.connect()) { + continue; // Best effort for POC + } + std::vector resp; + client.call(network::RpcType::BloomFilterPush, bf_payload, resp); + } } // Phase 2: Instruct nodes to shuffle Right Table (now with bloom filter available) @@ -554,6 +611,119 @@ QueryResult DistributedExecutor::execute(const parser::Statement& stmt, } } + // Phase 3-5: Currently disabled for all outer joins due to issues with column indexing + // when SELECT doesn't use SELECT * (causes duplicate rows instead of correct results). + // + // For RIGHT JOIN: Local executor on each data node correctly handles unmatched right rows. + // For FULL JOIN: Unmatched LEFT rows are not collected (to be implemented in separate PR). + // + // TODO: Re-enable Phase 3-5 for FULL JOIN once column indexing is fixed to properly + // identify which rows were unmatched during the distributed join. + if (false && is_outer_join_join_query && all_success) { + // Extract matched right keys from aggregated results + // The right key column is at a known position in the result schema + std::vector matched_keys; + size_t right_key_idx = static_cast(-1); + + // Find the right key column index in the result schema + for (size_t i = 0; i < result_schema.columns().size(); ++i) { + const auto& col = result_schema.columns()[i]; + if (col.name() == outer_join_right_key) { + right_key_idx = i; + break; + } + } + + // If we found the key column, extract matched keys from results + if (right_key_idx != static_cast(-1)) { + for (const auto& row : aggregated_rows) { + if (row.size() > right_key_idx) { + matched_keys.push_back(row.get(right_key_idx).to_string()); + } + } + } + + // Phase 3: Ask each node to scan local table and store unmatched rows + // First, compute the left column count for NULL-padding + uint32_t left_column_count = 0; + if (!outer_join_left_table.empty()) { + auto left_table_info = catalog_.get_table_by_name(outer_join_left_table); + if (left_table_info.has_value()) { + left_column_count = static_cast((*left_table_info)->columns.size()); + } + } + + std::vector>> report_futures; + + for (const auto& node : data_nodes) { + report_futures.push_back(std::async( + std::launch::async, [node, context_id, outer_join_right_table, outer_join_right_key, + matched_keys, left_column_count]() { + network::RpcClient client(node.address, node.cluster_port); + network::UnmatchedRowsReportArgs reply; + reply.context_id = context_id; + if (client.connect()) { + network::UnmatchedRowsReportArgs report_args; + report_args.context_id = context_id; + report_args.right_table = outer_join_right_table; + report_args.join_key_col = outer_join_right_key; + // Attach matched keys so node knows what was matched + report_args.unmatched_keys = matched_keys; + // Attach left column count for NULL-padding + report_args.left_column_count = left_column_count; + + std::vector resp; + if (client.call(network::RpcType::UnmatchedRowsReport, + report_args.serialize(), resp)) { + reply = network::UnmatchedRowsReportArgs::deserialize(resp); + return std::make_pair(true, reply); + } + } + return std::make_pair(false, reply); + })); + } + + // Wait for all report futures to complete + for (auto& f : report_futures) { + f.get(); + } + + // Phase 4: Fetch stored unmatched rows from each node + std::vector>>> fetch_futures; + + for (const auto& node : data_nodes) { + fetch_futures.push_back( + std::async(std::launch::async, [node, context_id, outer_join_right_table]() { + network::RpcClient client(node.address, node.cluster_port); + std::vector rows; + if (client.connect()) { + network::FetchUnmatchedRowsArgs fetch_args; + fetch_args.context_id = context_id; + fetch_args.table_name = outer_join_right_table; + + std::vector resp; + if (client.call(network::RpcType::FetchUnmatchedRows, + fetch_args.serialize(), resp)) { + auto reply = network::UnmatchedRowsPushArgs::deserialize(resp); + rows = std::move(reply.unmatched_rows); + return std::make_pair(true, std::move(rows)); + } + } + return std::make_pair(false, std::move(rows)); + })); + } + + // Aggregate all unmatched rows from all nodes + for (auto& f : fetch_futures) { + auto result = f.get(); + if (result.first) { + for (auto& row : result.second) { + aggregated_rows.push_back(std::move(row)); + } + } + } + } + if (all_success) { QueryResult res; res.set_schema(std::move(result_schema)); diff --git a/src/executor/operator.cpp b/src/executor/operator.cpp index 2da5cf1e..09482395 100644 --- a/src/executor/operator.cpp +++ b/src/executor/operator.cpp @@ -890,6 +890,28 @@ void HashJoinOperator::set_params(const std::vector* params) { if (right_) right_->set_params(params); } +std::vector HashJoinOperator::get_unmatched_right_rows() const { + std::vector unmatched; + auto right_schema = right_->output_schema(); + for (const auto& [key_str, build_tuple] : hash_table_) { + if (!build_tuple.matched) { + unmatched.push_back(build_tuple.tuple); + } + } + return unmatched; +} + +std::vector HashJoinOperator::get_unmatched_right_keys() const { + std::vector keys; + auto right_schema = right_->output_schema(); + for (const auto& [key_str, build_tuple] : hash_table_) { + if (!build_tuple.matched) { + keys.push_back(key_str); + } + } + return keys; +} + /* --- LimitOperator --- */ LimitOperator::LimitOperator(std::unique_ptr child, int64_t limit, int64_t offset) diff --git a/src/main.cpp b/src/main.cpp index f68b37bf..cd1fee33 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -516,6 +516,34 @@ int main(int argc, char* argv[]) { static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); }); + // Handler for collecting local bloom filter bits from data nodes + // Coordinator aggregates these via bitwise OR after Phase 1 + rpc_server->set_handler( + cloudsql::network::RpcType::BloomFilterBits, + [&](const cloudsql::network::RpcHeader& h, const std::vector& p, + int fd) { + (void)h; + auto args = cloudsql::network::BloomFilterBitsArgs::deserialize(p); + cloudsql::network::BloomFilterBitsArgs reply_args; + reply_args.context_id = args.context_id; + reply_args.filter_data = + cluster_manager->get_local_bloom_bits(args.context_id); + reply_args.expected_elements = + cluster_manager->get_local_expected_elements(args.context_id); + reply_args.num_hashes = + cluster_manager->get_local_num_hashes(args.context_id); + + auto resp_p = reply_args.serialize(); + cloudsql::network::RpcHeader resp_h; + resp_h.type = cloudsql::network::RpcType::QueryResults; + resp_h.payload_len = static_cast(resp_p.size()); + char h_buf[cloudsql::network::RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast( + send(fd, h_buf, cloudsql::network::RpcHeader::HEADER_SIZE, 0)); + static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); + }); + rpc_server->set_handler( cloudsql::network::RpcType::ShuffleFragment, [&](const cloudsql::network::RpcHeader& h, const std::vector& p, @@ -556,11 +584,19 @@ int main(int argc, char* argv[]) { partitions[node.id] = {}; } + // Estimate expected elements for bloom filter + // For now, estimate based on table size (will be refined with actual + // count) + size_t estimated_count = 1000; + cloudsql::common::BloomFilter local_bloom(estimated_count); + auto iter = table.scan(); cloudsql::storage::HeapTable::TupleMeta t_meta; while (iter.next_meta(t_meta)) { if (t_meta.xmax == 0) { // Visible const auto& key_val = t_meta.tuple.get(key_idx); + // Build bloom filter from join key values + local_bloom.insert(key_val); uint32_t node_idx = cloudsql::cluster::ShardManager::compute_shard( key_val, static_cast(data_nodes.size())); @@ -569,6 +605,13 @@ int main(int argc, char* argv[]) { } } + // Store local bloom filter bits for coordinator to collect + // The coordinator will aggregate these during Phase 1 + auto bloom_bits = local_bloom.serialize(); + cluster_manager->set_local_bloom_bits(args.context_id, bloom_bits, + local_bloom.expected_elements(), + local_bloom.num_hashes()); + bool overall_success = true; std::string delivery_errors; @@ -651,6 +694,149 @@ int main(int argc, char* argv[]) { reply.error_msg = e.what(); } + auto resp_p = reply.serialize(); + cloudsql::network::RpcHeader resp_h; + resp_h.type = cloudsql::network::RpcType::QueryResults; + resp_h.payload_len = static_cast(resp_p.size()); + char h_buf[cloudsql::network::RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast( + send(fd, h_buf, cloudsql::network::RpcHeader::HEADER_SIZE, 0)); + static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); + }); + + // Handler for reporting unmatched right rows after join execution + // For RIGHT/FULL outer joins, each node identifies rows from its local right table + // partition that had no matching left row during the distributed join + rpc_server->set_handler( + cloudsql::network::RpcType::UnmatchedRowsReport, + [&](const cloudsql::network::RpcHeader& h, const std::vector& p, + int fd) { + (void)h; + auto args = cloudsql::network::UnmatchedRowsReportArgs::deserialize(p); + cloudsql::network::UnmatchedRowsReportArgs reply; + reply.context_id = args.context_id; + reply.right_table = args.right_table; + reply.join_key_col = args.join_key_col; + + // args.unmatched_keys contains MATCHED keys from coordinator + // We need to return rows that are NOT in this set + std::unordered_set matched_keys_set( + args.unmatched_keys.begin(), args.unmatched_keys.end()); + + try { + // Scan local right table and collect rows that were NOT matched + auto table_meta_opt = catalog->get_table_by_name(args.right_table); + if (table_meta_opt.has_value()) { + const auto* table_meta = table_meta_opt.value(); + cloudsql::executor::Schema schema; + for (const auto& col : table_meta->columns) { + schema.add_column(col.name, col.type); + } + cloudsql::storage::HeapTable table(args.right_table, *bpm, schema); + + const size_t key_idx = schema.find_column(args.join_key_col); + if (key_idx != static_cast(-1)) { + std::vector unmatched_tuples; + auto iter = table.scan(); + cloudsql::storage::HeapTable::TupleMeta t_meta; + while (iter.next_meta(t_meta)) { + if (t_meta.xmax == 0) { + const auto& key_val = t_meta.tuple.get(key_idx); + std::string key_str = key_val.to_string(); + // Only include if NOT in matched keys + if (matched_keys_set.find(key_str) == + matched_keys_set.end()) { + reply.unmatched_keys.push_back(key_str); + // Pad with NULLs for left columns and append right + // row + std::vector padded_values; + padded_values.reserve(args.left_column_count + + t_meta.tuple.size()); + // Prepend NULLs for left table columns + for (uint32_t i = 0; i < args.left_column_count; + ++i) { + padded_values.push_back( + cloudsql::common::Value::make_null()); + } + // Append right table column values + for (size_t j = 0; j < t_meta.tuple.size(); ++j) { + padded_values.push_back(t_meta.tuple.get(j)); + } + unmatched_tuples.emplace_back( + std::move(padded_values)); + } + } + } + // Store properly padded tuples in ClusterManager for + // coordinator to collect + if (cluster_manager != nullptr && !unmatched_tuples.empty()) { + cluster_manager->set_unmatched_rows( + args.context_id, args.right_table, + std::move(unmatched_tuples)); + } + } + } + } catch (const std::exception& /*e*/) { + // Return empty on error + } + + auto resp_p = reply.serialize(); + cloudsql::network::RpcHeader resp_h; + resp_h.type = cloudsql::network::RpcType::QueryResults; + resp_h.payload_len = static_cast(resp_p.size()); + char h_buf[cloudsql::network::RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast( + send(fd, h_buf, cloudsql::network::RpcHeader::HEADER_SIZE, 0)); + static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); + }); + + // Handler for receiving unmatched rows from coordinator for NULL-padding emission + // Coordinator broadcasts unmatched right rows to all nodes for final result + // assembly + rpc_server->set_handler( + cloudsql::network::RpcType::UnmatchedRowsPush, + [&](const cloudsql::network::RpcHeader& h, const std::vector& p, + int fd) { + (void)h; + auto args = cloudsql::network::UnmatchedRowsPushArgs::deserialize(p); + + if (cluster_manager != nullptr && !args.unmatched_rows.empty()) { + cluster_manager->buffer_shuffle_data(args.context_id, + "_unmatched_right_rows", + std::move(args.unmatched_rows)); + } + + cloudsql::network::QueryResultsReply reply; + reply.success = true; + auto resp_p = reply.serialize(); + cloudsql::network::RpcHeader resp_h; + resp_h.type = cloudsql::network::RpcType::QueryResults; + resp_h.payload_len = static_cast(resp_p.size()); + char h_buf[cloudsql::network::RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast( + send(fd, h_buf, cloudsql::network::RpcHeader::HEADER_SIZE, 0)); + static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); + }); + + // Handler for fetching stored unmatched rows from a data node + // Coordinator calls this after UnmatchedRowsReport to get full unmatched tuples + rpc_server->set_handler( + cloudsql::network::RpcType::FetchUnmatchedRows, + [&](const cloudsql::network::RpcHeader& h, const std::vector& p, + int fd) { + (void)h; + auto args = cloudsql::network::FetchUnmatchedRowsArgs::deserialize(p); + cloudsql::network::UnmatchedRowsPushArgs reply; + reply.context_id = args.context_id; + + if (cluster_manager != nullptr) { + reply.unmatched_rows = cluster_manager->get_unmatched_rows( + args.context_id, args.table_name); + } + auto resp_p = reply.serialize(); cloudsql::network::RpcHeader resp_h; resp_h.type = cloudsql::network::RpcType::QueryResults; diff --git a/tests/distributed_tests.cpp b/tests/distributed_tests.cpp index e96dca94..875c6827 100644 --- a/tests/distributed_tests.cpp +++ b/tests/distributed_tests.cpp @@ -15,8 +15,10 @@ #include "catalog/catalog.hpp" #include "common/cluster_manager.hpp" +#include "common/value.hpp" #include "distributed/distributed_executor.hpp" #include "distributed/shard_manager.hpp" +#include "executor/types.hpp" #include "network/rpc_client.hpp" #include "network/rpc_message.hpp" #include "network/rpc_server.hpp" @@ -24,6 +26,7 @@ #include "parser/parser.hpp" using namespace cloudsql; +using namespace cloudsql::common; using namespace cloudsql::executor; using namespace cloudsql::cluster; using namespace cloudsql::parser; @@ -330,14 +333,36 @@ TEST(DistributedExecutorTests, ShuffleJoinOrchestration) { static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); }; + auto bloom_bits_handler = [&](const RpcHeader& h, const std::vector& p, int fd) { + (void)h; + auto args = BloomFilterBitsArgs::deserialize(p); + BloomFilterBitsArgs reply_args; + reply_args.context_id = args.context_id; + // Return empty bloom filter bits for mock - real implementation would return actual bits + reply_args.filter_data = {}; + reply_args.expected_elements = 0; + reply_args.num_hashes = 4; + + auto resp_p = reply_args.serialize(); + RpcHeader resp_h; + resp_h.type = RpcType::QueryResults; + resp_h.payload_len = static_cast(resp_p.size()); + char h_buf[RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast(send(fd, h_buf, RpcHeader::HEADER_SIZE, 0)); + static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); + }; + node1.set_handler(RpcType::ShuffleFragment, handler); node1.set_handler(RpcType::PushData, handler); node1.set_handler(RpcType::ExecuteFragment, handler); node1.set_handler(RpcType::BloomFilterPush, handler); + node1.set_handler(RpcType::BloomFilterBits, bloom_bits_handler); node2.set_handler(RpcType::ShuffleFragment, handler); node2.set_handler(RpcType::PushData, handler); node2.set_handler(RpcType::ExecuteFragment, handler); node2.set_handler(RpcType::BloomFilterPush, handler); + node2.set_handler(RpcType::BloomFilterBits, bloom_bits_handler); ASSERT_TRUE(node1.start()); ASSERT_TRUE(node2.start()); @@ -425,4 +450,318 @@ TEST(DistributedExecutorTests, NonEqualityJoinRejection) { EXPECT_THAT(res.error(), testing::HasSubstr("equality join condition")); } +TEST(DistributedExecutorTests, BloomFilterSkipForOuterJoin) { + // Test that bloom filter is NOT sent for RIGHT JOIN + // (INNER join behavior is tested in ShuffleJoinOrchestration) + + RpcServer node1(7860); + RpcServer node2(7861); + + std::atomic bloom_push_calls{0}; + std::atomic shuffle_calls{0}; + std::atomic push_calls{0}; + std::atomic fragment_calls{0}; + std::atomic unmatched_report_calls{0}; + std::atomic fetch_unmatched_calls{0}; + + // Handler for regular RPCs + auto handler = [&](const RpcHeader& h, const std::vector& p, int fd) { + (void)p; + QueryResultsReply reply; + reply.success = true; + + if (h.type == RpcType::BloomFilterPush) { + bloom_push_calls++; + } else if (h.type == RpcType::ShuffleFragment) { + shuffle_calls++; + } else if (h.type == RpcType::PushData) { + push_calls++; + } else if (h.type == RpcType::ExecuteFragment) { + fragment_calls++; + } else if (h.type == RpcType::UnmatchedRowsReport) { + unmatched_report_calls++; + } else if (h.type == RpcType::FetchUnmatchedRows) { + fetch_unmatched_calls++; + } + + auto resp_p = reply.serialize(); + RpcHeader resp_h; + resp_h.type = RpcType::QueryResults; + resp_h.payload_len = static_cast(resp_p.size()); + char h_buf[RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast(send(fd, h_buf, RpcHeader::HEADER_SIZE, 0)); + static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); + }; + + // Handler for BloomFilterBits - returns non-empty filter data to properly test bloom filter + auto bloom_bits_handler = [&](const RpcHeader& h, const std::vector& p, int fd) { + (void)h; + auto args = BloomFilterBitsArgs::deserialize(p); + BloomFilterBitsArgs reply_args; + reply_args.context_id = args.context_id; + reply_args.filter_data = {0xFF, 0xFF, 0xFF, 0xFF}; // Non-empty bloom filter + reply_args.expected_elements = 100; + reply_args.num_hashes = 4; + + auto resp_p = reply_args.serialize(); + RpcHeader resp_h; + resp_h.type = RpcType::QueryResults; + resp_h.payload_len = static_cast(resp_p.size()); + char h_buf[RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast(send(fd, h_buf, RpcHeader::HEADER_SIZE, 0)); + static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); + }; + + // Handler for UnmatchedRowsReport - returns proper response type + auto unmatched_report_handler = [&](const RpcHeader& h, const std::vector& p, int fd) { + (void)h; + auto args = UnmatchedRowsReportArgs::deserialize(p); + UnmatchedRowsReportArgs reply; + reply.context_id = args.context_id; + reply.right_table = args.right_table; + // Empty unmatched_keys = all rows matched (for test simplicity) + + auto resp_p = reply.serialize(); + RpcHeader resp_h; + resp_h.type = RpcType::QueryResults; + resp_h.payload_len = static_cast(resp_p.size()); + char h_buf[RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast(send(fd, h_buf, RpcHeader::HEADER_SIZE, 0)); + static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); + }; + + // Handler for FetchUnmatchedRows - returns proper response type + auto fetch_unmatched_handler = [&](const RpcHeader& h, const std::vector& p, int fd) { + (void)h; + auto args = FetchUnmatchedRowsArgs::deserialize(p); + UnmatchedRowsPushArgs reply; + reply.context_id = args.context_id; + reply.unmatched_rows = {}; // Empty for test simplicity + + auto resp_p = reply.serialize(); + RpcHeader resp_h; + resp_h.type = RpcType::QueryResults; + resp_h.payload_len = static_cast(resp_p.size()); + char h_buf[RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast(send(fd, h_buf, RpcHeader::HEADER_SIZE, 0)); + static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); + }; + + node1.set_handler(RpcType::BloomFilterPush, handler); + node1.set_handler(RpcType::ShuffleFragment, handler); + node1.set_handler(RpcType::PushData, handler); + node1.set_handler(RpcType::ExecuteFragment, handler); + node1.set_handler(RpcType::BloomFilterBits, bloom_bits_handler); + node1.set_handler(RpcType::UnmatchedRowsReport, unmatched_report_handler); + node1.set_handler(RpcType::FetchUnmatchedRows, fetch_unmatched_handler); + node2.set_handler(RpcType::BloomFilterPush, handler); + node2.set_handler(RpcType::ShuffleFragment, handler); + node2.set_handler(RpcType::PushData, handler); + node2.set_handler(RpcType::ExecuteFragment, handler); + node2.set_handler(RpcType::BloomFilterBits, bloom_bits_handler); + node2.set_handler(RpcType::UnmatchedRowsReport, unmatched_report_handler); + node2.set_handler(RpcType::FetchUnmatchedRows, fetch_unmatched_handler); + + ASSERT_TRUE(node1.start()); + ASSERT_TRUE(node2.start()); + + auto catalog = Catalog::create(); + const config::Config config; + ClusterManager cm(&config); + cm.register_node("n1", "127.0.0.1", 7860, config::RunMode::Data); + cm.register_node("n2", "127.0.0.1", 7861, config::RunMode::Data); + DistributedExecutor exec(*catalog, cm); + + // Execute RIGHT join - bloom filter should NOT be sent + auto lexer = + std::make_unique("SELECT * FROM table1 RIGHT JOIN table2 ON table1.id = table2.id"); + Parser parser(std::move(lexer)); + auto stmt = parser.parse_statement(); + auto res = + exec.execute(*stmt, "SELECT * FROM table1 RIGHT JOIN table2 ON table1.id = table2.id"); + + // For RIGHT join, bloom filter is skipped (should be 0) + // Even though we returned valid bloom bits, the coordinator should not push for outer joins + EXPECT_EQ(bloom_push_calls.load(), 0); + // Verify the query succeeded + EXPECT_TRUE(res.success()); + + node1.stop(); + node2.stop(); +} + +TEST(DistributedExecutorTests, Phase3SkippedForRightJoin) { + // Test that Phase 3 (UnmatchedRowsReport) is NOT called for RIGHT JOIN + // because the local executor on each data node already handles unmatched right rows. + // Phase 3-5 is only needed for FULL JOIN to collect unmatched LEFT rows. + + RpcServer node1(7870); + RpcServer node2(7871); + + std::atomic unmatched_report_calls{0}; + std::atomic fetch_unmatched_calls{0}; + + // Handler for ExecuteFragment that returns proper schema and rows + auto execute_fragment_handler = [&](const RpcHeader& h, const std::vector& p, int fd) { + (void)h; + auto args = ExecuteFragmentArgs::deserialize(p); + QueryResultsReply reply; + reply.success = true; + + // Build schema for joined result: table1(id, val), table2(id, val) + Schema schema; + schema.add_column("id", common::ValueType::TYPE_INT64); + schema.add_column("val", common::ValueType::TYPE_INT64); + schema.add_column("id", + common::ValueType::TYPE_INT64); // table2.id (ambiguous but matches) + schema.add_column("val", common::ValueType::TYPE_INT64); // table2.val + reply.schema = schema; + + // Return some matched rows (e.g., rows where table2.id = 1 and 2 matched) + // Format: {table1.id, table1.val, table2.id, table2.val} + reply.rows = { + Tuple{Value::make_int64(100), Value::make_int64(10), Value::make_int64(1), + Value::make_int64(100)}, + Tuple{Value::make_int64(200), Value::make_int64(20), Value::make_int64(2), + Value::make_int64(200)}, + }; + + auto resp_p = reply.serialize(); + RpcHeader resp_h; + resp_h.type = RpcType::QueryResults; + resp_h.payload_len = static_cast(resp_p.size()); + char h_buf[RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast(send(fd, h_buf, RpcHeader::HEADER_SIZE, 0)); + static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); + }; + + // Handler for UnmatchedRowsReport - tracks calls (should be 0 for RIGHT JOIN) + auto unmatched_report_handler = [&](const RpcHeader& h, const std::vector& p, int fd) { + (void)h; + auto args = UnmatchedRowsReportArgs::deserialize(p); + unmatched_report_calls++; + + UnmatchedRowsReportArgs reply; + reply.context_id = args.context_id; + reply.right_table = args.right_table; + reply.unmatched_keys = {"3"}; + + auto resp_p = reply.serialize(); + RpcHeader resp_h; + resp_h.type = RpcType::QueryResults; + resp_h.payload_len = static_cast(resp_p.size()); + char h_buf[RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast(send(fd, h_buf, RpcHeader::HEADER_SIZE, 0)); + static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); + }; + + // Handler for FetchUnmatchedRows - tracks calls (should be 0 for RIGHT JOIN) + auto fetch_unmatched_handler = [&](const RpcHeader& h, const std::vector& p, int fd) { + (void)h; + auto args = FetchUnmatchedRowsArgs::deserialize(p); + fetch_unmatched_calls++; + + UnmatchedRowsPushArgs reply; + reply.context_id = args.context_id; + reply.unmatched_rows = {Tuple{Value::make_null(), Value::make_null(), Value::make_int64(3), + Value::make_int64(30)}}; + + auto resp_p = reply.serialize(); + RpcHeader resp_h; + resp_h.type = RpcType::QueryResults; + resp_h.payload_len = static_cast(resp_p.size()); + char h_buf[RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast(send(fd, h_buf, RpcHeader::HEADER_SIZE, 0)); + static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); + }; + + // Regular handler for other RPCs + auto basic_handler = [&](const RpcHeader& h, const std::vector& p, int fd) { + (void)p; + QueryResultsReply reply; + reply.success = true; + + auto resp_p = reply.serialize(); + RpcHeader resp_h; + resp_h.type = RpcType::QueryResults; + resp_h.payload_len = static_cast(resp_p.size()); + char h_buf[RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast(send(fd, h_buf, RpcHeader::HEADER_SIZE, 0)); + static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); + }; + + // BloomFilterBits handler + auto bloom_bits_handler = [&](const RpcHeader& h, const std::vector& p, int fd) { + (void)h; + auto args = BloomFilterBitsArgs::deserialize(p); + BloomFilterBitsArgs reply_args; + reply_args.context_id = args.context_id; + reply_args.filter_data = {0xFF, 0xFF}; + reply_args.expected_elements = 100; + reply_args.num_hashes = 4; + + auto resp_p = reply_args.serialize(); + RpcHeader resp_h; + resp_h.type = RpcType::QueryResults; + resp_h.payload_len = static_cast(resp_p.size()); + char h_buf[RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast(send(fd, h_buf, RpcHeader::HEADER_SIZE, 0)); + static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); + }; + + // Set up handlers for node1 + node1.set_handler(RpcType::ExecuteFragment, execute_fragment_handler); + node1.set_handler(RpcType::ShuffleFragment, basic_handler); + node1.set_handler(RpcType::PushData, basic_handler); + node1.set_handler(RpcType::BloomFilterPush, basic_handler); + node1.set_handler(RpcType::BloomFilterBits, bloom_bits_handler); + node1.set_handler(RpcType::UnmatchedRowsReport, unmatched_report_handler); + node1.set_handler(RpcType::FetchUnmatchedRows, fetch_unmatched_handler); + + // Set up handlers for node2 + node2.set_handler(RpcType::ExecuteFragment, execute_fragment_handler); + node2.set_handler(RpcType::ShuffleFragment, basic_handler); + node2.set_handler(RpcType::PushData, basic_handler); + node2.set_handler(RpcType::BloomFilterPush, basic_handler); + node2.set_handler(RpcType::BloomFilterBits, bloom_bits_handler); + node2.set_handler(RpcType::UnmatchedRowsReport, unmatched_report_handler); + node2.set_handler(RpcType::FetchUnmatchedRows, fetch_unmatched_handler); + + ASSERT_TRUE(node1.start()); + ASSERT_TRUE(node2.start()); + + auto catalog = Catalog::create(); + const config::Config config; + ClusterManager cm(&config); + cm.register_node("n1", "127.0.0.1", 7870, config::RunMode::Data); + cm.register_node("n2", "127.0.0.1", 7871, config::RunMode::Data); + DistributedExecutor exec(*catalog, cm); + + // Execute RIGHT join + auto lexer = + std::make_unique("SELECT * FROM table1 RIGHT JOIN table2 ON table1.id = table2.id"); + Parser parser(std::move(lexer)); + auto stmt = parser.parse_statement(); + auto res = + exec.execute(*stmt, "SELECT * FROM table1 RIGHT JOIN table2 ON table1.id = table2.id"); + + // Verify Phase 3-4 RPCs were NOT called for RIGHT JOIN + // (local executor handles unmatched right rows correctly) + EXPECT_EQ(unmatched_report_calls.load(), 0); // NOT called for RIGHT JOIN + EXPECT_EQ(fetch_unmatched_calls.load(), 0); // NOT called for RIGHT JOIN + EXPECT_TRUE(res.success()); + + node1.stop(); + node2.stop(); +} + } // namespace