From 30a05c7fde04ba2d7224c1c483cd21127b96d506 Mon Sep 17 00:00:00 2001 From: niranda perera Date: Mon, 1 Dec 2025 16:10:17 -0800 Subject: [PATCH 1/6] addind scaffolding Signed-off-by: niranda perera --- cpp/benchmarks/streaming/ndsh/CMakeLists.txt | 25 + cpp/benchmarks/streaming/ndsh/q13.cpp | 888 +++++++++++++++++++ 2 files changed, 913 insertions(+) create mode 100644 cpp/benchmarks/streaming/ndsh/q13.cpp diff --git a/cpp/benchmarks/streaming/ndsh/CMakeLists.txt b/cpp/benchmarks/streaming/ndsh/CMakeLists.txt index 6fa4bd27b..666072c7d 100644 --- a/cpp/benchmarks/streaming/ndsh/CMakeLists.txt +++ b/cpp/benchmarks/streaming/ndsh/CMakeLists.txt @@ -65,3 +65,28 @@ install( DESTINATION bin/benchmarks/librapidsmpf EXCLUDE_FROM_ALL ) + + +add_executable(q13 "q13.cpp") +set_target_properties( + q13 + PROPERTIES RUNTIME_OUTPUT_DIRECTORY "$" + CXX_STANDARD 20 + CXX_STANDARD_REQUIRED ON + CUDA_STANDARD 20 + CUDA_STANDARD_REQUIRED ON +) +target_compile_options( + q13 PRIVATE "$<$:${RAPIDSMPF_CXX_FLAGS}>" + "$<$:${RAPIDSMPF_CUDA_FLAGS}>" +) +target_link_libraries( + q13 PRIVATE rapidsmpfndsh rapidsmpf::rapidsmpf $ + $ maybe_asan +) +install( + TARGETS q13 + COMPONENT benchmarking + DESTINATION bin/benchmarks/librapidsmpf + EXCLUDE_FROM_ALL +) \ No newline at end of file diff --git a/cpp/benchmarks/streaming/ndsh/q13.cpp b/cpp/benchmarks/streaming/ndsh/q13.cpp new file mode 100644 index 000000000..6d74269a9 --- /dev/null +++ b/cpp/benchmarks/streaming/ndsh/q13.cpp @@ -0,0 +1,888 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "concatenate.hpp" +#include "join.hpp" +#include "utils.hpp" + +namespace { + +std::string get_table_path( + std::string const& input_directory, std::string const& table_name +) { + auto dir = input_directory.empty() ? "." : input_directory; + auto file_path = dir + "/" + table_name + ".parquet"; + if (std::filesystem::exists(file_path)) { + return file_path; + } + return dir + "/" + table_name + "/"; +} + +rapidsmpf::streaming::Node read_customer( + std::shared_ptr ctx, + std::shared_ptr ch_out, + std::size_t num_producers, + cudf::size_type num_rows_per_chunk, + std::string const& input_directory +) { + auto files = rapidsmpf::ndsh::detail::list_parquet_files( + get_table_path(input_directory, "customer") + ); + auto options = cudf::io::parquet_reader_options::builder(cudf::io::source_info(files)) + .columns({"c_custkey"}) + .build(); + return rapidsmpf::streaming::node::read_parquet( + ctx, ch_out, num_producers, options, num_rows_per_chunk + ); +} + +rapidsmpf::streaming::Node read_orders( + std::shared_ptr ctx, + std::shared_ptr ch_out, + std::size_t num_producers, + cudf::size_type num_rows_per_chunk, + std::string const& input_directory +) { + auto files = rapidsmpf::ndsh::detail::list_parquet_files( + get_table_path(input_directory, "orders") + ); + auto options = cudf::io::parquet_reader_options::builder(cudf::io::source_info(files)) + .columns({"o_comment", "o_custkey", "o_orderkey"}) + .build(); + return rapidsmpf::streaming::node::read_parquet( + ctx, ch_out, num_producers, options, num_rows_per_chunk + ); +} + +rapidsmpf::streaming::Node filter_orders( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + auto mr = ctx->br()->device_mr(); + while (true) { + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + break; + } + co_await ctx->executor()->schedule(); + auto chunk = rapidsmpf::ndsh::to_device( + ctx, msg.release() + ); + auto chunk_stream = chunk.stream(); + auto table = chunk.table_view(); + auto p_name = table.column(1); + auto regex_program = cudf::strings::regex_program::create("special.*requests"); + auto mask = cudf::strings::contains_re(p_name, *regex_program, chunk_stream, mr); + co_await ch_out->send(rapidsmpf::streaming::to_message( + msg.sequence_number(), + std::make_unique( + cudf::apply_boolean_mask( + table.select({0}), mask->view(), chunk_stream, mr + ), + chunk_stream + ) + )); + } + co_await ch_out->drain(ctx->executor()); +} + +rapidsmpf::streaming::Node select_columns( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + // n_name, ps_supplycost, l_discount, l_extendedprice, l_quantity, o_orderdate + + // Select n_name, year_part_of(o_orderdate), amount = (extendedprice * (1 + // - discount)) - (ps_supplycost * l_quantity) group by n_name year agg + // sum(amount).round(2) sort by n_name, o_year descending = true, false + while (true) { + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + break; + } + co_await ctx->executor()->schedule(); + auto chunk = rapidsmpf::ndsh::to_device( + ctx, msg.release() + ); + auto chunk_stream = chunk.stream(); + auto sequence_number = msg.sequence_number(); + auto table = chunk.table_view(); + std::vector> result; + result.reserve(3); + // n_name + result.push_back(std::make_unique( + table.column(0), chunk_stream, ctx->br()->device_mr() + )); + result.push_back(cudf::datetime::extract_datetime_component( + table.column(5), + cudf::datetime::datetime_component::YEAR, + chunk_stream, + ctx->br()->device_mr() + )); + auto discount = table.column(2); + auto extendedprice = table.column(3); + auto supplycost = table.column(1); + auto quantity = table.column(4); + std::string udf = + R"***( +static __device__ void calculate_amount(double *amount, double discount, double extprice, double supplycost, double quantity) { + *amount = extprice * (1 - discount) - supplycost * quantity; +} + )***"; + result.push_back(cudf::transform( + {discount, extendedprice, supplycost, quantity}, + udf, + cudf::data_type(cudf::type_id::FLOAT64), + false, + std::nullopt, + cudf::null_aware::NO, + chunk_stream, + ctx->br()->device_mr() + )); + co_await ch_out->send(rapidsmpf::streaming::to_message( + sequence_number, + std::make_unique( + std::make_unique(std::move(result)), chunk_stream + ) + )); + } + co_await ch_out->drain(ctx->executor()); +} + +rapidsmpf::streaming::Node chunkwise_groupby_agg( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + std::vector partial_results; + std::uint64_t sequence = 0; + while (true) { + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + break; + } + co_await ctx->executor()->schedule(); + auto chunk = rapidsmpf::ndsh::to_device( + ctx, msg.release() + ); + auto chunk_stream = chunk.stream(); + auto table = chunk.table_view(); + auto grouper = cudf::groupby::groupby( + table.select({0, 1}), cudf::null_policy::EXCLUDE, cudf::sorted::NO + ); + auto requests = std::vector(); + std::vector> aggs; + aggs.push_back(cudf::make_sum_aggregation()); + requests.push_back( + cudf::groupby::aggregation_request(table.column(2), std::move(aggs)) + ); + auto [keys, results] = + grouper.aggregate(requests, chunk_stream, ctx->br()->device_mr()); + // Drop chunk, we don't need it. + std::ignore = std::move(chunk); + auto result = keys->release(); + for (auto&& r : results) { + std::ranges::move(r.results, std::back_inserter(result)); + } + co_await ch_out->send(rapidsmpf::streaming::to_message( + sequence++, + std::make_unique( + std::make_unique(std::move(result)), chunk_stream + ) + )); + } + co_await ch_out->drain(ctx->executor()); +} + +rapidsmpf::streaming::Node final_groupby_agg( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out, + rapidsmpf::OpID tag +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + co_await ctx->executor()->schedule(); + // TODO: requires concatenated input stream. + auto msg = co_await ch_in->receive(); + auto next = co_await ch_in->receive(); + ctx->comm()->logger().print("Final groupby"); + RAPIDSMPF_EXPECTS(next.empty(), "Expecting concatenated input at this point"); + auto chunk = + rapidsmpf::ndsh::to_device(ctx, msg.release()); + auto chunk_stream = chunk.stream(); + auto table = chunk.table_view(); + std::unique_ptr local_result{nullptr}; + if (!table.is_empty()) { + auto grouper = cudf::groupby::groupby( + table.select({0, 1}), cudf::null_policy::EXCLUDE, cudf::sorted::NO + ); + auto requests = std::vector(); + std::vector> aggs; + aggs.push_back(cudf::make_sum_aggregation()); + requests.push_back( + cudf::groupby::aggregation_request(table.column(2), std::move(aggs)) + ); + auto [keys, results] = + grouper.aggregate(requests, chunk_stream, ctx->br()->device_mr()); + // Drop chunk, we don't need it. + std::ignore = std::move(chunk); + auto result = keys->release(); + for (auto&& r : results) { + std::ranges::move(r.results, std::back_inserter(result)); + } + local_result = std::make_unique(std::move(result)); + } + if (ctx->comm()->nranks() > 1) { + // Reduce across ranks... + // Need a reduce primitive in rapidsmpf, but let's just use an allgather and + // discard for now. + rapidsmpf::streaming::AllGather gatherer{ctx, tag}; + if (local_result) { + auto pack = + cudf::pack(local_result->view(), chunk_stream, ctx->br()->device_mr()); + gatherer.insert( + 0, + {rapidsmpf::PackedData( + std::move(pack.metadata), + ctx->br()->move(std::move(pack.gpu_data), chunk_stream) + )} + ); + } + gatherer.insert_finished(); + auto packed_data = + co_await gatherer.extract_all(rapidsmpf::streaming::AllGather::Ordered::NO); + if (ctx->comm()->rank() == 0) { + auto global_result = rapidsmpf::unpack_and_concat( + rapidsmpf::unspill_partitions( + std::move(packed_data), ctx->br(), true, ctx->statistics() + ), + chunk_stream, + ctx->br(), + ctx->statistics() + ); + if (ctx->comm()->rank() == 0) { + // We will only actually bother to do this on rank zero. + auto result_view = global_result->view(); + auto grouper = cudf::groupby::groupby( + result_view.select({0, 1}), + cudf::null_policy::EXCLUDE, + cudf::sorted::NO + ); + auto requests = std::vector(); + std::vector> aggs; + aggs.push_back(cudf::make_sum_aggregation()); + requests.push_back(cudf::groupby::aggregation_request( + result_view.column(2), std::move(aggs) + )); + auto [keys, results] = + grouper.aggregate(requests, chunk_stream, ctx->br()->device_mr()); + global_result.reset(); + auto result = keys->release(); + for (auto&& r : results) { + std::ranges::move(r.results, std::back_inserter(result)); + } + co_await ch_out->send(rapidsmpf::streaming::to_message( + 0, + std::make_unique( + std::make_unique(std::move(result)), chunk_stream + ) + )); + } + } else { + std::ignore = std::move(packed_data); + } + } else { + co_await ch_out->send(rapidsmpf::streaming::to_message( + 0, + std::make_unique( + std::move(local_result), chunk_stream + ) + )); + } + co_await ch_out->drain(ctx->executor()); +} + +rapidsmpf::streaming::Node sort_by( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + co_await ctx->executor()->schedule(); + auto msg = co_await ch_in->receive(); + // We know we only have a single chunk from the groupby + if (msg.empty()) { + co_return; + } + ctx->comm()->logger().print("Sortby"); + auto chunk = + rapidsmpf::ndsh::to_device(ctx, msg.release()); + auto table = chunk.table_view(); + auto rounded = cudf::round( + table.column(2), + 2, + cudf::rounding_method::HALF_EVEN, + chunk.stream(), + ctx->br()->device_mr() + ); + auto result = rapidsmpf::streaming::to_message( + 0, + std::make_unique( + cudf::sort_by_key( + cudf::table_view({table.column(0), table.column(1), rounded->view()}), + table.select({0, 1}), + {cudf::order::ASCENDING, cudf::order::DESCENDING}, + {cudf::null_order::BEFORE, cudf::null_order::BEFORE}, + chunk.stream(), + ctx->br()->device_mr() + ), + chunk.stream() + ) + ); + co_await ch_out->send(std::move(result)); + co_await ch_out->drain(ctx->executor()); +} + +rapidsmpf::streaming::Node write_parquet( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::string output_path +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in}; + co_await ctx->executor()->schedule(); + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + co_return; + } + ctx->comm()->logger().print("write parquet"); + auto chunk = + rapidsmpf::ndsh::to_device(ctx, msg.release()); + auto sink = cudf::io::sink_info(output_path); + auto builder = cudf::io::parquet_writer_options::builder(sink, chunk.table_view()); + auto metadata = cudf::io::table_input_metadata(chunk.table_view()); + metadata.column_metadata[0].set_name("nation"); + metadata.column_metadata[1].set_name("o_year"); + metadata.column_metadata[2].set_name("sum_profit"); + builder = builder.metadata(metadata); + auto options = builder.build(); + cudf::io::write_parquet(options, chunk.stream()); + ctx->comm()->logger().print( + "Wrote chunk with ", + chunk.table_view().num_rows(), + " rows and ", + chunk.table_view().num_columns(), + " columns to ", + output_path + ); +} + +} // namespace + +struct ProgramOptions { + int num_streaming_threads{1}; + cudf::size_type num_rows_per_chunk{100'000'000}; + std::optional spill_device_limit{std::nullopt}; + bool use_shuffle_join = false; + std::string output_file; + std::string input_directory; +}; + +ProgramOptions parse_options(int argc, char** argv) { + ProgramOptions options; + + auto print_usage = [&argv]() { + std::cerr + << "Usage: " << argv[0] << " [options]\n" + << "Options:\n" + << " --num-streaming-threads Number of streaming threads (default: 1)\n" + << " --num-rows-per-chunk Number of rows per chunk (default: " + "100000000)\n" + << " --spill-device-limit Fractional spill device limit (default: " + "None)\n" + << " --use-shuffle-join Use shuffle join (default: false)\n" + << " --output-file Output file path (required)\n" + << " --input-directory Input directory path (required)\n" + << " --help Show this help message\n"; + }; + + static struct option long_options[] = { + {"num-streaming-threads", required_argument, nullptr, 1}, + {"num-rows-per-chunk", required_argument, nullptr, 2}, + {"use-shuffle-join", no_argument, nullptr, 3}, + {"output-file", required_argument, nullptr, 4}, + {"input-directory", required_argument, nullptr, 5}, + {"help", no_argument, nullptr, 6}, + {"spill-device-limit", required_argument, nullptr, 7}, + {nullptr, 0, nullptr, 0} + }; + + int opt; + int option_index = 0; + + bool saw_output_file = false; + bool saw_input_directory = false; + + while ((opt = getopt_long(argc, argv, "", long_options, &option_index)) != -1) { + switch (opt) { + case 1: + options.num_streaming_threads = std::atoi(optarg); + break; + case 2: + options.num_rows_per_chunk = std::atoi(optarg); + break; + case 3: + options.use_shuffle_join = true; + break; + case 4: + options.output_file = optarg; + saw_output_file = true; + break; + case 5: + options.input_directory = optarg; + saw_input_directory = true; + break; + case 6: + print_usage(); + std::exit(0); + case 7: + options.spill_device_limit = std::stod(optarg); + break; + case '?': + if (optopt == 0 && optind > 1) { + std::cerr << "Error: Unknown option '" << argv[optind - 1] << "'\n\n"; + } + print_usage(); + std::exit(1); + default: + print_usage(); + std::exit(1); + } + } + + // Check if required options were provided + if (!saw_output_file || !saw_input_directory) { + if (!saw_output_file) { + std::cerr << "Error: --output-file is required\n"; + } + if (!saw_input_directory) { + std::cerr << "Error: --input-directory is required\n"; + } + std::cerr << std::endl; + print_usage(); + std::exit(1); + } + + return options; +} + +/** + * @brief Run a derived version of TPC-H query 9. + * + * The SQL form of the query is: + * @code{.sql} + * select + * nation, + * o_year, + * round(sum(amount), 2) as sum_profit + * from + * ( + * select + * n_name as nation, + * year(o_orderdate) as o_year, + * l_extendedprice * (1 - l_discount) - ps_supplycost * l_quantity as amount + * from + * part, + * supplier, + * lineitem, + * partsupp, + * orders, + * nation + * where + * s_suppkey = l_suppkey + * and ps_suppkey = l_suppkey + * and ps_partkey = l_partkey + * and p_partkey = l_partkey + * and o_orderkey = l_orderkey + * and s_nationkey = n_nationkey + * and p_name like '%green%' + * ) as profit + * group by + * nation, + * o_year + * order by + * nation, + * o_year desc + * @endcode{} + * + * + * @code{.sql} + * select + * c_count, + * COUNT(*) AS custdist + * from + * ( + * select + * c_custkey, + * COUNT(o_orderkey) AS c_count + * from + * customer + * left outer join orders on + * c_custkey = o_custkey + * and o_comment not like '%special%requests%' + * group by + * c_custkey + * ) as c_orders + * group by + * c_count + * order by + * custdist DESC, + * c_count DESC; + * @endcode{} + */ +int main(int argc, char** argv) { + cudaFree(nullptr); + rapidsmpf::mpi::init(&argc, &argv); + MPI_Comm mpi_comm; + RAPIDSMPF_MPI(MPI_Comm_dup(MPI_COMM_WORLD, &mpi_comm)); + auto cmd_options = parse_options(argc, argv); + auto limit_size = rmm::percent_of_free_device_memory( + static_cast(cmd_options.spill_device_limit.value_or(1) * 100) + ); + rmm::mr::cuda_async_memory_resource mr{}; + auto stats_mr = rapidsmpf::RmmResourceAdaptor(&mr); + rmm::device_async_resource_ref mr_ref(stats_mr); + rmm::mr::set_current_device_resource(&stats_mr); + rmm::mr::set_current_device_resource_ref(mr_ref); + std::unordered_map + memory_available{}; + if (cmd_options.spill_device_limit.has_value()) { + memory_available[rapidsmpf::MemoryType::DEVICE] = rapidsmpf::LimitAvailableMemory{ + &stats_mr, static_cast(limit_size) + }; + } + auto br = std::make_shared( + stats_mr, std::move(memory_available) + ); + auto envvars = rapidsmpf::config::get_environment_variables(); + envvars["num_streaming_threads"] = std::to_string(cmd_options.num_streaming_threads); + auto options = rapidsmpf::config::Options(envvars); + auto stats = std::make_shared(&stats_mr); + { + auto comm = rapidsmpf::ucxx::init_using_mpi(mpi_comm, options); + auto progress = + std::make_shared(comm->logger(), stats); + auto ctx = + std::make_shared(options, comm, br, stats); + comm->logger().print( + "Executor has ", ctx->executor()->thread_count(), " threads" + ); + comm->logger().print("Executor has ", ctx->comm()->nranks(), " ranks"); + + std::string output_path = cmd_options.output_file; + std::vector timings; + for (int i = 0; i < 2; i++) { + rapidsmpf::OpID op_id{0}; + std::vector nodes; + auto start = std::chrono::steady_clock::now(); + { + RAPIDSMPF_NVTX_SCOPED_RANGE("Constructing Q9 pipeline"); + auto orders = ctx->create_channel(); + auto filtered_orders = ctx->create_channel(); + auto customer = ctx->create_channel(); + + // auto part_x_partsupp = ctx->create_channel(); + // auto supplier = ctx->create_channel(); + // auto lineitem = ctx->create_channel(); + // auto supplier_x_part_x_partsupp = ctx->create_channel(); + // auto supplier_x_part_x_partsupp_x_lineitem = ctx->create_channel(); + + nodes.push_back(read_customer( + ctx, + customer, + /* num_tickets */ 4, + cmd_options.num_rows_per_chunk, + cmd_options.input_directory + )); // c_custkey + + nodes.push_back(read_orders( + ctx, + orders, + /* num_tickets */ 4, + cmd_options.num_rows_per_chunk, + cmd_options.input_directory + )); // o_comment, o_custkey, o_orderkey + + + nodes.push_back(filter_orders(ctx, orders, filtered_orders)); + + + nodes.push_back(read_partsupp( + ctx, + partsupp, + /* num_tickets */ 4, + cmd_options.num_rows_per_chunk, + cmd_options.input_directory + )); // ps_partkey, ps_suppkey, ps_supplycost + nodes.push_back( + // p_partkey x ps_partkey + rapidsmpf::ndsh::inner_join_broadcast( + ctx, + filtered_part, + partsupp, + part_x_partsupp, + {0}, + {0}, + rapidsmpf::OpID{static_cast(10 * i + op_id++)} + ) // p_partkey/ps_partkey, ps_suppkey, ps_supplycost + ); + nodes.push_back(read_supplier( + ctx, + supplier, + /* num_tickets */ 4, + cmd_options.num_rows_per_chunk, + cmd_options.input_directory + )); // s_nationkey, s_suppkey + nodes.push_back( + // s_suppkey x ps_suppkey + rapidsmpf::ndsh::inner_join_broadcast( + ctx, + supplier, + part_x_partsupp, + supplier_x_part_x_partsupp, + {1}, + {1}, + rapidsmpf::OpID{static_cast(10 * i + op_id++)} + + ) // s_nationkey, s_suppkey/ps_suppkey, p_partkey/ps_partkey, + // ps_supplycost + ); + nodes.push_back(read_lineitem( + ctx, + lineitem, + /* num_tickets */ 4, + cmd_options.num_rows_per_chunk, + cmd_options.input_directory + )); // l_discount, l_extendedprice, l_orderkey, l_partkey, l_quantity, + // l_suppkey + nodes.push_back( + // [p_partkey, ps_suppkey] x [l_partkey, l_suppkey] + rapidsmpf::ndsh::inner_join_broadcast( + ctx, + supplier_x_part_x_partsupp, + lineitem, + supplier_x_part_x_partsupp_x_lineitem, + {2, 1}, + {3, 5}, + rapidsmpf::OpID{static_cast(10 * i + op_id++)}, + rapidsmpf::ndsh::KeepKeys::NO + ) // s_nationkey, ps_supplycost, + // l_discount, l_extendedprice, l_orderkey, l_quantity + ); + auto nation = ctx->create_channel(); + auto orders = ctx->create_channel(); + nodes.push_back(read_nation( + ctx, + nation, + /* num_tickets */ 4, + cmd_options.num_rows_per_chunk, + cmd_options.input_directory + ) // n_name, n_nationkey + ); + nodes.push_back(read_orders( + ctx, + orders, + /* num_tickets */ 4, + cmd_options.num_rows_per_chunk, + cmd_options.input_directory + ) // o_orderdate, o_orderkey + ); + auto all_joined = ctx->create_channel(); + auto supplier_x_part_x_partsupp_x_lineitem_x_orders = + ctx->create_channel(); + if (cmd_options.use_shuffle_join) { + auto supplier_x_part_x_partsupp_x_lineitem_shuffled = + ctx->create_channel(); + auto orders_shuffled = ctx->create_channel(); + // TODO: customisable + std::uint32_t num_partitions = 16; + nodes.push_back(rapidsmpf::ndsh::shuffle( + ctx, + supplier_x_part_x_partsupp_x_lineitem, + supplier_x_part_x_partsupp_x_lineitem_shuffled, + {4}, + num_partitions, + rapidsmpf::OpID{static_cast(10 * i + op_id++)} + )); + nodes.push_back(rapidsmpf::ndsh::shuffle( + ctx, + orders, + orders_shuffled, + {1}, + num_partitions, + rapidsmpf::OpID{static_cast(10 * i + op_id++)} + )); + nodes.push_back( + // l_orderkey x o_orderkey + rapidsmpf::ndsh::inner_join_shuffle( + ctx, + supplier_x_part_x_partsupp_x_lineitem_shuffled, + orders_shuffled, + supplier_x_part_x_partsupp_x_lineitem_x_orders, + {4}, + {1}, + rapidsmpf::ndsh::KeepKeys::NO + ) // s_nationkey, ps_supplycost, l_discount, l_extendedprice, + // l_quantity, o_orderdate + ); + } else { + nodes.push_back( + // l_orderkey x o_orderkey + rapidsmpf::ndsh::inner_join_broadcast( + ctx, + supplier_x_part_x_partsupp_x_lineitem, + orders, + supplier_x_part_x_partsupp_x_lineitem_x_orders, + {4}, + {1}, + rapidsmpf::OpID{static_cast(10 * i + op_id++) + }, + rapidsmpf::ndsh::KeepKeys::NO + ) // s_nationkey, ps_supplycost, l_discount, l_extendedprice, + // l_quantity, o_orderdate + ); + } + nodes.push_back( + // n_nationkey x s_nationkey + rapidsmpf::ndsh::inner_join_broadcast( + ctx, + nation, + supplier_x_part_x_partsupp_x_lineitem_x_orders, + all_joined, + {1}, + {0}, + rapidsmpf::OpID{static_cast(10 * i + op_id++)}, + rapidsmpf::ndsh::KeepKeys::NO + ) // n_name, ps_supplycost, l_discount, l_extendedprice, + // l_quantity, o_orderdate + ); + auto groupby_input = ctx->create_channel(); + nodes.push_back(select_columns(ctx, all_joined, groupby_input)); + auto chunkwise_groupby_output = ctx->create_channel(); + nodes.push_back( + chunkwise_groupby_agg(ctx, groupby_input, chunkwise_groupby_output) + ); + auto concatenated_groupby_output = ctx->create_channel(); + nodes.push_back(rapidsmpf::ndsh::concatenate( + ctx, + chunkwise_groupby_output, + concatenated_groupby_output, + rapidsmpf::ndsh::ConcatOrder::DONT_CARE + )); + auto groupby_output = ctx->create_channel(); + nodes.push_back(final_groupby_agg( + ctx, + concatenated_groupby_output, + groupby_output, + rapidsmpf::OpID{static_cast(10 * i + op_id++)} + )); + auto sorted_output = ctx->create_channel(); + nodes.push_back(sort_by(ctx, groupby_output, sorted_output)); + nodes.push_back(write_parquet(ctx, sorted_output, output_path)); + } + auto end = std::chrono::steady_clock::now(); + std::chrono::duration pipeline = end - start; + start = std::chrono::steady_clock::now(); + { + RAPIDSMPF_NVTX_SCOPED_RANGE("Q9 Iteration"); + rapidsmpf::streaming::run_streaming_pipeline(std::move(nodes)); + } + end = std::chrono::steady_clock::now(); + std::chrono::duration compute = end - start; + comm->logger().print( + "Iteration ", i, " pipeline construction time [s]: ", pipeline.count() + ); + comm->logger().print("Iteration ", i, " compute time [s]: ", compute.count()); + timings.push_back(pipeline.count()); + timings.push_back(compute.count()); + ctx->comm()->logger().print(stats->report()); + RAPIDSMPF_MPI(MPI_Barrier(mpi_comm)); + } + if (comm->rank() == 0) { + for (int i = 0; i < 2; i++) { + comm->logger().print( + "Iteration ", + i, + " pipeline construction time [s]: ", + timings[size_t(2 * i)] + ); + comm->logger().print( + "Iteration ", i, " compute time [s]: ", timings[size_t(2 * i + 1)] + ); + } + } + } + + RAPIDSMPF_MPI(MPI_Comm_free(&mpi_comm)); + RAPIDSMPF_MPI(MPI_Finalize()); + return 0; +} From c0b3a94619f2b250ac0dc9490d017d639d2ff787 Mon Sep 17 00:00:00 2001 From: niranda perera Date: Tue, 2 Dec 2025 15:25:44 -0800 Subject: [PATCH 2/6] join working Signed-off-by: niranda perera --- cpp/benchmarks/streaming/ndsh/join.cpp | 162 ++++++- cpp/benchmarks/streaming/ndsh/join.hpp | 26 + cpp/benchmarks/streaming/ndsh/q13.cpp | 635 +++++-------------------- cufile.log | 2 + 4 files changed, 283 insertions(+), 542 deletions(-) create mode 100644 cufile.log diff --git a/cpp/benchmarks/streaming/ndsh/join.cpp b/cpp/benchmarks/streaming/ndsh/join.cpp index 271b6e5c0..9a68d988e 100644 --- a/cpp/benchmarks/streaming/ndsh/join.cpp +++ b/cpp/benchmarks/streaming/ndsh/join.cpp @@ -222,6 +222,81 @@ streaming::Message inner_join_chunk( ); } +/** + * @brief Join a table chunk against a build hash table returning a message of the result. + * + * @param ctx Streaming context + * @param right_chunk Chunk to join + * @param sequence Sequence number of the output + * @param joiner hash_join object, representing the build table. + * @param build_carrier Columns from the build-side table to be included in the output. + * @param right_on Key column indiecs in `right_chunk`. + * @param build_stream Stream the `joiner` will be deallocated on. + * @param build_event Event recording the creation of the `joiner`. + * + * @return Message of `TableChunk` containing the result of the inner join. + */ +streaming::Message left_join_chunk( + std::shared_ptr ctx, + streaming::TableChunk&& left_chunk, + std::uint64_t sequence, + cudf::hash_join& joiner, + cudf::table_view build_carrier, + std::vector left_on, + rmm::cuda_stream_view build_stream, + CudaEvent* build_event +) { + CudaEvent event; + left_chunk = to_device(ctx, std::move(left_chunk)); + auto chunk_stream = left_chunk.stream(); + build_event->stream_wait(chunk_stream); + auto probe_table = left_chunk.table_view(); + auto probe_keys = probe_table.select(left_on); + auto [probe_match, build_match] = + joiner.left_join(probe_keys, std::nullopt, chunk_stream, ctx->br()->device_mr()); + + cudf::column_view build_indices = // right + cudf::device_span(*build_match); + cudf::column_view probe_indices = // left + cudf::device_span(*probe_match); + // build_carrier is valid on build_stream, but chunk_stream is + // waiting for build_stream work to be done, so running this on + // chunk_stream is fine. + + // For LEFT join, keep all columns from the probe (left) table including keys, + // since they're always valid (unlike right-side keys which may be NULL). + auto result_columns = cudf::gather( + probe_table, + probe_indices, + cudf::out_of_bounds_policy::DONT_CHECK, + chunk_stream, + ctx->br()->device_mr() + ) + ->release(); + + // left join build indices could have sentinel values (INT_MIN), so they will be OOB. + std::ranges::move( + cudf::gather( + build_carrier, + build_indices, + cudf::out_of_bounds_policy::NULLIFY, + chunk_stream, + ctx->br()->device_mr() + ) + ->release(), + std::back_inserter(result_columns) + ); + // Deallocation of the join indices will happen on build_stream, so add stream dep + // This also ensure deallocation of the hash_join object waits for completion. + cuda_stream_join(build_stream, chunk_stream, &event); + return streaming::to_message( + sequence, + std::make_unique( + std::make_unique(std::move(result_columns)), chunk_stream + ) + ); +} + } // namespace streaming::Node inner_join_broadcast( @@ -347,6 +422,67 @@ streaming::Node inner_join_shuffle( co_await ch_out->drain(ctx->executor()); } +streaming::Node left_join_shuffle( + std::shared_ptr ctx, + std::shared_ptr left, + std::shared_ptr right, + std::shared_ptr ch_out, + std::vector left_on, + std::vector right_on +) { + streaming::ShutdownAtExit c{left, right, ch_out}; + ctx->comm()->logger().print("left shuffle join"); + co_await ctx->executor()->schedule(); + CudaEvent build_event; + while (true) { + // Requirement: two shuffles kick out partitions in the same order + auto left_msg = co_await left->receive(); + auto right_msg = co_await right->receive(); + if (left_msg.empty()) { + RAPIDSMPF_EXPECTS( + right_msg.empty(), "Left does not have same number of partitions as right" + ); + break; + } + RAPIDSMPF_EXPECTS( + left_msg.sequence_number() == right_msg.sequence_number(), + "Mismatching sequence numbers" + ); + + // use right as build table + auto build_chunk = to_device(ctx, right_msg.release()); + auto build_stream = build_chunk.stream(); + auto joiner = cudf::hash_join( + build_chunk.table_view().select(right_on), + cudf::null_equality::UNEQUAL, + build_stream + ); + build_event.record(build_stream); + + // drop key columns from build table. + std::vector to_keep; + std::ranges::copy_if( + std::ranges::iota_view(0, build_chunk.table_view().num_columns()), + std::back_inserter(to_keep), + [&](auto i) { return std::ranges::find(right_on, i) == right_on.end(); } + ); + auto build_carrier = build_chunk.table_view().select(to_keep); + + auto sequence = left_msg.sequence_number(); + co_await ch_out->send(left_join_chunk( + ctx, + left_msg.release(), + sequence, + joiner, + build_carrier, + left_on, + build_stream, + &build_event + )); + } + co_await ch_out->drain(ctx->executor()); +} + streaming::Node shuffle( std::shared_ptr ctx, std::shared_ptr ch_in, @@ -382,22 +518,20 @@ streaming::Node shuffle( auto packed_data = co_await shuffler.extract_async(pid); RAPIDSMPF_EXPECTS(packed_data.has_value(), "Partition already extracted"); auto stream = ctx->br()->stream_pool().get_stream(); - co_await ch_out->send( - streaming::to_message( - pid, - std::make_unique( - unpack_and_concat( - unspill_partitions( - std::move(*packed_data), ctx->br(), true, ctx->statistics() - ), - stream, - ctx->br(), - ctx->statistics() + co_await ch_out->send(streaming::to_message( + pid, + std::make_unique( + unpack_and_concat( + unspill_partitions( + std::move(*packed_data), ctx->br(), true, ctx->statistics() ), - stream - ) + stream, + ctx->br(), + ctx->statistics() + ), + stream ) - ); + )); } co_await ch_out->drain(ctx->executor()); } diff --git a/cpp/benchmarks/streaming/ndsh/join.hpp b/cpp/benchmarks/streaming/ndsh/join.hpp index ddd799112..15a2f0c19 100644 --- a/cpp/benchmarks/streaming/ndsh/join.hpp +++ b/cpp/benchmarks/streaming/ndsh/join.hpp @@ -77,6 +77,32 @@ streaming::Node inner_join_shuffle( KeepKeys keep_keys = KeepKeys::YES ); +/** + * @brief Perform a streaming inner join between two tables. + * + * @note This performs a shuffle join, the left and right channels are required to provide + * hash-partitioned data in-order. + * + * @param ctx Streaming context. + * @param left Channel of `TableChunk`s in hash-partitioned order. + * @param right Channel of `TableChunk`s in matching hash-partitioned order. + * @param ch_out Output channel of `TableChunk`s. + * @param left_on Column indices of the keys in the left table. + * @param right_on Column indices of the keys in the right table. + * @param keep_keys Does the result contain the key columns, or only "carrier" value + * columns + * + * @return Coroutine representing the completion of the join. + */ +streaming::Node left_join_shuffle( + std::shared_ptr ctx, + std::shared_ptr left, + std::shared_ptr right, + std::shared_ptr ch_out, + std::vector left_on, + std::vector right_on +); + /** * @brief Shuffle the input channel by hash-partitioning on given key columns. * diff --git a/cpp/benchmarks/streaming/ndsh/q13.cpp b/cpp/benchmarks/streaming/ndsh/q13.cpp index 6d74269a9..2fedee3ab 100644 --- a/cpp/benchmarks/streaming/ndsh/q13.cpp +++ b/cpp/benchmarks/streaming/ndsh/q13.cpp @@ -32,6 +32,7 @@ #include #include #include +#include #include #include #include @@ -109,6 +110,7 @@ rapidsmpf::streaming::Node read_orders( ); } +// TODO: can we push this into the read_orders node? rapidsmpf::streaming::Node filter_orders( std::shared_ptr ctx, std::shared_ptr ch_in, @@ -127,313 +129,66 @@ rapidsmpf::streaming::Node filter_orders( ); auto chunk_stream = chunk.stream(); auto table = chunk.table_view(); - auto p_name = table.column(1); + auto o_comment = table.column(0); + // Match rows that contain "special.*requests" and negate to get rows that don't auto regex_program = cudf::strings::regex_program::create("special.*requests"); - auto mask = cudf::strings::contains_re(p_name, *regex_program, chunk_stream, mr); - co_await ch_out->send(rapidsmpf::streaming::to_message( - msg.sequence_number(), - std::make_unique( - cudf::apply_boolean_mask( - table.select({0}), mask->view(), chunk_stream, mr - ), - chunk_stream - ) - )); - } - co_await ch_out->drain(ctx->executor()); -} - -rapidsmpf::streaming::Node select_columns( - std::shared_ptr ctx, - std::shared_ptr ch_in, - std::shared_ptr ch_out -) { - rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; - // n_name, ps_supplycost, l_discount, l_extendedprice, l_quantity, o_orderdate + auto contains_mask = + cudf::strings::contains_re(o_comment, *regex_program, chunk_stream, mr); + // Negate: we want rows that do NOT match + auto mask = cudf::unary_operation( + contains_mask->view(), cudf::unary_operator::NOT, chunk_stream, mr + ); - // Select n_name, year_part_of(o_orderdate), amount = (extendedprice * (1 - // - discount)) - (ps_supplycost * l_quantity) group by n_name year agg - // sum(amount).round(2) sort by n_name, o_year descending = true, false - while (true) { - auto msg = co_await ch_in->receive(); - if (msg.empty()) { - break; - } - co_await ctx->executor()->schedule(); - auto chunk = rapidsmpf::ndsh::to_device( - ctx, msg.release() + auto filtered = cudf::apply_boolean_mask( + table.select({1, 2}), mask->view(), chunk_stream, mr ); - auto chunk_stream = chunk.stream(); - auto sequence_number = msg.sequence_number(); - auto table = chunk.table_view(); - std::vector> result; - result.reserve(3); - // n_name - result.push_back(std::make_unique( - table.column(0), chunk_stream, ctx->br()->device_mr() - )); - result.push_back(cudf::datetime::extract_datetime_component( - table.column(5), - cudf::datetime::datetime_component::YEAR, - chunk_stream, - ctx->br()->device_mr() - )); - auto discount = table.column(2); - auto extendedprice = table.column(3); - auto supplycost = table.column(1); - auto quantity = table.column(4); - std::string udf = - R"***( -static __device__ void calculate_amount(double *amount, double discount, double extprice, double supplycost, double quantity) { - *amount = extprice * (1 - discount) - supplycost * quantity; -} - )***"; - result.push_back(cudf::transform( - {discount, extendedprice, supplycost, quantity}, - udf, - cudf::data_type(cudf::type_id::FLOAT64), - false, - std::nullopt, - cudf::null_aware::NO, - chunk_stream, - ctx->br()->device_mr() - )); + co_await ch_out->send(rapidsmpf::streaming::to_message( - sequence_number, + msg.sequence_number(), std::make_unique( - std::make_unique(std::move(result)), chunk_stream + std::move(filtered), chunk_stream ) )); } co_await ch_out->drain(ctx->executor()); } -rapidsmpf::streaming::Node chunkwise_groupby_agg( +rapidsmpf::streaming::Node write_parquet( std::shared_ptr ctx, std::shared_ptr ch_in, - std::shared_ptr ch_out + std::string output_path ) { - rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; - std::vector partial_results; - std::uint64_t sequence = 0; + rapidsmpf::streaming::ShutdownAtExit c{ch_in}; + co_await ctx->executor()->schedule(); while (true) { auto msg = co_await ch_in->receive(); if (msg.empty()) { - break; + co_return; } - co_await ctx->executor()->schedule(); + ctx->comm()->logger().print("write parquet"); auto chunk = rapidsmpf::ndsh::to_device( ctx, msg.release() ); - auto chunk_stream = chunk.stream(); - auto table = chunk.table_view(); - auto grouper = cudf::groupby::groupby( - table.select({0, 1}), cudf::null_policy::EXCLUDE, cudf::sorted::NO + auto sink = cudf::io::sink_info(output_path); + auto builder = + cudf::io::parquet_writer_options::builder(sink, chunk.table_view()); + auto metadata = cudf::io::table_input_metadata(chunk.table_view()); + // Q13 output: c_custkey, o_orderkey + metadata.column_metadata[0].set_name("c_custkey"); + metadata.column_metadata[1].set_name("o_orderkey"); + builder = builder.metadata(metadata); + auto options = builder.build(); + cudf::io::write_parquet(options, chunk.stream()); + ctx->comm()->logger().print( + "Wrote chunk with ", + chunk.table_view().num_rows(), + " rows and ", + chunk.table_view().num_columns(), + " columns to ", + output_path ); - auto requests = std::vector(); - std::vector> aggs; - aggs.push_back(cudf::make_sum_aggregation()); - requests.push_back( - cudf::groupby::aggregation_request(table.column(2), std::move(aggs)) - ); - auto [keys, results] = - grouper.aggregate(requests, chunk_stream, ctx->br()->device_mr()); - // Drop chunk, we don't need it. - std::ignore = std::move(chunk); - auto result = keys->release(); - for (auto&& r : results) { - std::ranges::move(r.results, std::back_inserter(result)); - } - co_await ch_out->send(rapidsmpf::streaming::to_message( - sequence++, - std::make_unique( - std::make_unique(std::move(result)), chunk_stream - ) - )); - } - co_await ch_out->drain(ctx->executor()); -} - -rapidsmpf::streaming::Node final_groupby_agg( - std::shared_ptr ctx, - std::shared_ptr ch_in, - std::shared_ptr ch_out, - rapidsmpf::OpID tag -) { - rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; - co_await ctx->executor()->schedule(); - // TODO: requires concatenated input stream. - auto msg = co_await ch_in->receive(); - auto next = co_await ch_in->receive(); - ctx->comm()->logger().print("Final groupby"); - RAPIDSMPF_EXPECTS(next.empty(), "Expecting concatenated input at this point"); - auto chunk = - rapidsmpf::ndsh::to_device(ctx, msg.release()); - auto chunk_stream = chunk.stream(); - auto table = chunk.table_view(); - std::unique_ptr local_result{nullptr}; - if (!table.is_empty()) { - auto grouper = cudf::groupby::groupby( - table.select({0, 1}), cudf::null_policy::EXCLUDE, cudf::sorted::NO - ); - auto requests = std::vector(); - std::vector> aggs; - aggs.push_back(cudf::make_sum_aggregation()); - requests.push_back( - cudf::groupby::aggregation_request(table.column(2), std::move(aggs)) - ); - auto [keys, results] = - grouper.aggregate(requests, chunk_stream, ctx->br()->device_mr()); - // Drop chunk, we don't need it. - std::ignore = std::move(chunk); - auto result = keys->release(); - for (auto&& r : results) { - std::ranges::move(r.results, std::back_inserter(result)); - } - local_result = std::make_unique(std::move(result)); - } - if (ctx->comm()->nranks() > 1) { - // Reduce across ranks... - // Need a reduce primitive in rapidsmpf, but let's just use an allgather and - // discard for now. - rapidsmpf::streaming::AllGather gatherer{ctx, tag}; - if (local_result) { - auto pack = - cudf::pack(local_result->view(), chunk_stream, ctx->br()->device_mr()); - gatherer.insert( - 0, - {rapidsmpf::PackedData( - std::move(pack.metadata), - ctx->br()->move(std::move(pack.gpu_data), chunk_stream) - )} - ); - } - gatherer.insert_finished(); - auto packed_data = - co_await gatherer.extract_all(rapidsmpf::streaming::AllGather::Ordered::NO); - if (ctx->comm()->rank() == 0) { - auto global_result = rapidsmpf::unpack_and_concat( - rapidsmpf::unspill_partitions( - std::move(packed_data), ctx->br(), true, ctx->statistics() - ), - chunk_stream, - ctx->br(), - ctx->statistics() - ); - if (ctx->comm()->rank() == 0) { - // We will only actually bother to do this on rank zero. - auto result_view = global_result->view(); - auto grouper = cudf::groupby::groupby( - result_view.select({0, 1}), - cudf::null_policy::EXCLUDE, - cudf::sorted::NO - ); - auto requests = std::vector(); - std::vector> aggs; - aggs.push_back(cudf::make_sum_aggregation()); - requests.push_back(cudf::groupby::aggregation_request( - result_view.column(2), std::move(aggs) - )); - auto [keys, results] = - grouper.aggregate(requests, chunk_stream, ctx->br()->device_mr()); - global_result.reset(); - auto result = keys->release(); - for (auto&& r : results) { - std::ranges::move(r.results, std::back_inserter(result)); - } - co_await ch_out->send(rapidsmpf::streaming::to_message( - 0, - std::make_unique( - std::make_unique(std::move(result)), chunk_stream - ) - )); - } - } else { - std::ignore = std::move(packed_data); - } - } else { - co_await ch_out->send(rapidsmpf::streaming::to_message( - 0, - std::make_unique( - std::move(local_result), chunk_stream - ) - )); } - co_await ch_out->drain(ctx->executor()); -} - -rapidsmpf::streaming::Node sort_by( - std::shared_ptr ctx, - std::shared_ptr ch_in, - std::shared_ptr ch_out -) { - rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; - co_await ctx->executor()->schedule(); - auto msg = co_await ch_in->receive(); - // We know we only have a single chunk from the groupby - if (msg.empty()) { - co_return; - } - ctx->comm()->logger().print("Sortby"); - auto chunk = - rapidsmpf::ndsh::to_device(ctx, msg.release()); - auto table = chunk.table_view(); - auto rounded = cudf::round( - table.column(2), - 2, - cudf::rounding_method::HALF_EVEN, - chunk.stream(), - ctx->br()->device_mr() - ); - auto result = rapidsmpf::streaming::to_message( - 0, - std::make_unique( - cudf::sort_by_key( - cudf::table_view({table.column(0), table.column(1), rounded->view()}), - table.select({0, 1}), - {cudf::order::ASCENDING, cudf::order::DESCENDING}, - {cudf::null_order::BEFORE, cudf::null_order::BEFORE}, - chunk.stream(), - ctx->br()->device_mr() - ), - chunk.stream() - ) - ); - co_await ch_out->send(std::move(result)); - co_await ch_out->drain(ctx->executor()); -} - -rapidsmpf::streaming::Node write_parquet( - std::shared_ptr ctx, - std::shared_ptr ch_in, - std::string output_path -) { - rapidsmpf::streaming::ShutdownAtExit c{ch_in}; - co_await ctx->executor()->schedule(); - auto msg = co_await ch_in->receive(); - if (msg.empty()) { - co_return; - } - ctx->comm()->logger().print("write parquet"); - auto chunk = - rapidsmpf::ndsh::to_device(ctx, msg.release()); - auto sink = cudf::io::sink_info(output_path); - auto builder = cudf::io::parquet_writer_options::builder(sink, chunk.table_view()); - auto metadata = cudf::io::table_input_metadata(chunk.table_view()); - metadata.column_metadata[0].set_name("nation"); - metadata.column_metadata[1].set_name("o_year"); - metadata.column_metadata[2].set_name("sum_profit"); - builder = builder.metadata(metadata); - auto options = builder.build(); - cudf::io::write_parquet(options, chunk.stream()); - ctx->comm()->logger().print( - "Wrote chunk with ", - chunk.table_view().num_rows(), - " rows and ", - chunk.table_view().num_columns(), - " columns to ", - output_path - ); + co_await ch_in->drain(ctx->executor()); } } // namespace @@ -541,42 +296,6 @@ ProgramOptions parse_options(int argc, char** argv) { * The SQL form of the query is: * @code{.sql} * select - * nation, - * o_year, - * round(sum(amount), 2) as sum_profit - * from - * ( - * select - * n_name as nation, - * year(o_orderdate) as o_year, - * l_extendedprice * (1 - l_discount) - ps_supplycost * l_quantity as amount - * from - * part, - * supplier, - * lineitem, - * partsupp, - * orders, - * nation - * where - * s_suppkey = l_suppkey - * and ps_suppkey = l_suppkey - * and ps_partkey = l_partkey - * and p_partkey = l_partkey - * and o_orderkey = l_orderkey - * and s_nationkey = n_nationkey - * and p_name like '%green%' - * ) as profit - * group by - * nation, - * o_year - * order by - * nation, - * o_year desc - * @endcode{} - * - * - * @code{.sql} - * select * c_count, * COUNT(*) AS custdist * from @@ -645,17 +364,9 @@ int main(int argc, char** argv) { std::vector nodes; auto start = std::chrono::steady_clock::now(); { - RAPIDSMPF_NVTX_SCOPED_RANGE("Constructing Q9 pipeline"); - auto orders = ctx->create_channel(); - auto filtered_orders = ctx->create_channel(); - auto customer = ctx->create_channel(); - - // auto part_x_partsupp = ctx->create_channel(); - // auto supplier = ctx->create_channel(); - // auto lineitem = ctx->create_channel(); - // auto supplier_x_part_x_partsupp = ctx->create_channel(); - // auto supplier_x_part_x_partsupp_x_lineitem = ctx->create_channel(); + RAPIDSMPF_NVTX_SCOPED_RANGE("Constructing Q13 pipeline"); + auto customer = ctx->create_channel(); nodes.push_back(read_customer( ctx, customer, @@ -664,6 +375,7 @@ int main(int argc, char** argv) { cmd_options.input_directory )); // c_custkey + auto orders = ctx->create_channel(); nodes.push_back(read_orders( ctx, orders, @@ -673,216 +385,83 @@ int main(int argc, char** argv) { )); // o_comment, o_custkey, o_orderkey - nodes.push_back(filter_orders(ctx, orders, filtered_orders)); - - - nodes.push_back(read_partsupp( - ctx, - partsupp, - /* num_tickets */ 4, - cmd_options.num_rows_per_chunk, - cmd_options.input_directory - )); // ps_partkey, ps_suppkey, ps_supplycost + auto filtered_orders = ctx->create_channel(); nodes.push_back( - // p_partkey x ps_partkey - rapidsmpf::ndsh::inner_join_broadcast( - ctx, - filtered_part, - partsupp, - part_x_partsupp, - {0}, - {0}, - rapidsmpf::OpID{static_cast(10 * i + op_id++)} - ) // p_partkey/ps_partkey, ps_suppkey, ps_supplycost + filter_orders(ctx, orders, filtered_orders) // o_custkey, o_orderkey ); - nodes.push_back(read_supplier( + + std::uint32_t num_partitions = 128; + + auto orders_shuffled = ctx->create_channel(); + nodes.push_back(rapidsmpf::ndsh::shuffle( ctx, - supplier, - /* num_tickets */ 4, - cmd_options.num_rows_per_chunk, - cmd_options.input_directory - )); // s_nationkey, s_suppkey - nodes.push_back( - // s_suppkey x ps_suppkey - rapidsmpf::ndsh::inner_join_broadcast( - ctx, - supplier, - part_x_partsupp, - supplier_x_part_x_partsupp, - {1}, - {1}, - rapidsmpf::OpID{static_cast(10 * i + op_id++)} - - ) // s_nationkey, s_suppkey/ps_suppkey, p_partkey/ps_partkey, - // ps_supplycost - ); - nodes.push_back(read_lineitem( + filtered_orders, + orders_shuffled, + {0}, + num_partitions, + rapidsmpf::OpID{static_cast(10 * i + op_id++)} + )); // o_custkey, o_orderkey + + auto customer_shuffled = ctx->create_channel(); + nodes.push_back(rapidsmpf::ndsh::shuffle( ctx, - lineitem, - /* num_tickets */ 4, - cmd_options.num_rows_per_chunk, - cmd_options.input_directory - )); // l_discount, l_extendedprice, l_orderkey, l_partkey, l_quantity, - // l_suppkey + customer, + customer_shuffled, + {0}, + num_partitions, + rapidsmpf::OpID{static_cast(10 * i + op_id++)} + )); // c_custkey + + // left join customer_shuffled and orders_shuffled + auto customer_x_orders = ctx->create_channel(); + nodes.push_back(rapidsmpf::ndsh::left_join_shuffle( + ctx, customer_shuffled, orders_shuffled, customer_x_orders, {0}, {0} + )); // c_custkey, o_orderkey + nodes.push_back( - // [p_partkey, ps_suppkey] x [l_partkey, l_suppkey] - rapidsmpf::ndsh::inner_join_broadcast( - ctx, - supplier_x_part_x_partsupp, - lineitem, - supplier_x_part_x_partsupp_x_lineitem, - {2, 1}, - {3, 5}, - rapidsmpf::OpID{static_cast(10 * i + op_id++)}, - rapidsmpf::ndsh::KeepKeys::NO - ) // s_nationkey, ps_supplycost, - // l_discount, l_extendedprice, l_orderkey, l_quantity - ); - auto nation = ctx->create_channel(); - auto orders = ctx->create_channel(); - nodes.push_back(read_nation( - ctx, - nation, - /* num_tickets */ 4, - cmd_options.num_rows_per_chunk, - cmd_options.input_directory - ) // n_name, n_nationkey - ); - nodes.push_back(read_orders( - ctx, - orders, - /* num_tickets */ 4, - cmd_options.num_rows_per_chunk, - cmd_options.input_directory - ) // o_orderdate, o_orderkey + write_parquet(ctx, customer_x_orders, cmd_options.output_file) ); - auto all_joined = ctx->create_channel(); - auto supplier_x_part_x_partsupp_x_lineitem_x_orders = - ctx->create_channel(); - if (cmd_options.use_shuffle_join) { - auto supplier_x_part_x_partsupp_x_lineitem_shuffled = - ctx->create_channel(); - auto orders_shuffled = ctx->create_channel(); - // TODO: customisable - std::uint32_t num_partitions = 16; - nodes.push_back(rapidsmpf::ndsh::shuffle( - ctx, - supplier_x_part_x_partsupp_x_lineitem, - supplier_x_part_x_partsupp_x_lineitem_shuffled, - {4}, - num_partitions, - rapidsmpf::OpID{static_cast(10 * i + op_id++)} - )); - nodes.push_back(rapidsmpf::ndsh::shuffle( - ctx, - orders, - orders_shuffled, - {1}, - num_partitions, - rapidsmpf::OpID{static_cast(10 * i + op_id++)} - )); - nodes.push_back( - // l_orderkey x o_orderkey - rapidsmpf::ndsh::inner_join_shuffle( - ctx, - supplier_x_part_x_partsupp_x_lineitem_shuffled, - orders_shuffled, - supplier_x_part_x_partsupp_x_lineitem_x_orders, - {4}, - {1}, - rapidsmpf::ndsh::KeepKeys::NO - ) // s_nationkey, ps_supplycost, l_discount, l_extendedprice, - // l_quantity, o_orderdate - ); - } else { - nodes.push_back( - // l_orderkey x o_orderkey - rapidsmpf::ndsh::inner_join_broadcast( - ctx, - supplier_x_part_x_partsupp_x_lineitem, - orders, - supplier_x_part_x_partsupp_x_lineitem_x_orders, - {4}, - {1}, - rapidsmpf::OpID{static_cast(10 * i + op_id++) - }, - rapidsmpf::ndsh::KeepKeys::NO - ) // s_nationkey, ps_supplycost, l_discount, l_extendedprice, - // l_quantity, o_orderdate - ); + + auto end = std::chrono::steady_clock::now(); + std::chrono::duration pipeline = end - start; + start = std::chrono::steady_clock::now(); + { + RAPIDSMPF_NVTX_SCOPED_RANGE("Q13 Iteration"); + rapidsmpf::streaming::run_streaming_pipeline(std::move(nodes)); } - nodes.push_back( - // n_nationkey x s_nationkey - rapidsmpf::ndsh::inner_join_broadcast( - ctx, - nation, - supplier_x_part_x_partsupp_x_lineitem_x_orders, - all_joined, - {1}, - {0}, - rapidsmpf::OpID{static_cast(10 * i + op_id++)}, - rapidsmpf::ndsh::KeepKeys::NO - ) // n_name, ps_supplycost, l_discount, l_extendedprice, - // l_quantity, o_orderdate - ); - auto groupby_input = ctx->create_channel(); - nodes.push_back(select_columns(ctx, all_joined, groupby_input)); - auto chunkwise_groupby_output = ctx->create_channel(); - nodes.push_back( - chunkwise_groupby_agg(ctx, groupby_input, chunkwise_groupby_output) - ); - auto concatenated_groupby_output = ctx->create_channel(); - nodes.push_back(rapidsmpf::ndsh::concatenate( - ctx, - chunkwise_groupby_output, - concatenated_groupby_output, - rapidsmpf::ndsh::ConcatOrder::DONT_CARE - )); - auto groupby_output = ctx->create_channel(); - nodes.push_back(final_groupby_agg( - ctx, - concatenated_groupby_output, - groupby_output, - rapidsmpf::OpID{static_cast(10 * i + op_id++)} - )); - auto sorted_output = ctx->create_channel(); - nodes.push_back(sort_by(ctx, groupby_output, sorted_output)); - nodes.push_back(write_parquet(ctx, sorted_output, output_path)); - } - auto end = std::chrono::steady_clock::now(); - std::chrono::duration pipeline = end - start; - start = std::chrono::steady_clock::now(); - { - RAPIDSMPF_NVTX_SCOPED_RANGE("Q9 Iteration"); - rapidsmpf::streaming::run_streaming_pipeline(std::move(nodes)); - } - end = std::chrono::steady_clock::now(); - std::chrono::duration compute = end - start; - comm->logger().print( - "Iteration ", i, " pipeline construction time [s]: ", pipeline.count() - ); - comm->logger().print("Iteration ", i, " compute time [s]: ", compute.count()); - timings.push_back(pipeline.count()); - timings.push_back(compute.count()); - ctx->comm()->logger().print(stats->report()); - RAPIDSMPF_MPI(MPI_Barrier(mpi_comm)); - } - if (comm->rank() == 0) { - for (int i = 0; i < 2; i++) { + end = std::chrono::steady_clock::now(); + std::chrono::duration compute = end - start; comm->logger().print( - "Iteration ", - i, - " pipeline construction time [s]: ", - timings[size_t(2 * i)] + "Iteration ", i, " pipeline construction time [s]: ", pipeline.count() ); comm->logger().print( - "Iteration ", i, " compute time [s]: ", timings[size_t(2 * i + 1)] + "Iteration ", i, " compute time [s]: ", compute.count() ); + timings.push_back(pipeline.count()); + timings.push_back(compute.count()); + ctx->comm()->logger().print(stats->report()); + RAPIDSMPF_MPI(MPI_Barrier(mpi_comm)); + } + + std::cout << "Rank " << comm->rank() << " has " << timings.size() + << " timings" << std::endl; + if (comm->rank() == 0) { + for (int i = 0; i < 2; i++) { + comm->logger().print( + "Iteration ", + i, + " pipeline construction time [s]: ", + timings[size_t(2 * i)] + ); + comm->logger().print( + "Iteration ", i, " compute time [s]: ", timings[size_t(2 * i + 1)] + ); + } } } - } - RAPIDSMPF_MPI(MPI_Comm_free(&mpi_comm)); - RAPIDSMPF_MPI(MPI_Finalize()); - return 0; + RAPIDSMPF_MPI(MPI_Comm_free(&mpi_comm)); + RAPIDSMPF_MPI(MPI_Finalize()); + return 0; + } } diff --git a/cufile.log b/cufile.log new file mode 100644 index 000000000..432c61ece --- /dev/null +++ b/cufile.log @@ -0,0 +1,2 @@ + 02-12-2025 14:49:44:723 [pid=154298 tid=154557] NOTICE cufio-drv:851 running in compatible mode + 02-12-2025 14:53:49:0 [pid=181014 tid=181134] NOTICE cufio-drv:851 running in compatible mode From d8ba508faedf368a55de1d403bfb6650ec446171 Mon Sep 17 00:00:00 2001 From: niranda perera Date: Wed, 3 Dec 2025 14:19:30 -0800 Subject: [PATCH 3/6] adding the query impl Signed-off-by: niranda perera --- cpp/benchmarks/streaming/ndsh/q13.cpp | 216 +++++++++++++++++++++++++- cufile.log | 2 - 2 files changed, 210 insertions(+), 8 deletions(-) delete mode 100644 cufile.log diff --git a/cpp/benchmarks/streaming/ndsh/q13.cpp b/cpp/benchmarks/streaming/ndsh/q13.cpp index 2fedee3ab..deb8a250c 100644 --- a/cpp/benchmarks/streaming/ndsh/q13.cpp +++ b/cpp/benchmarks/streaming/ndsh/q13.cpp @@ -153,6 +153,156 @@ rapidsmpf::streaming::Node filter_orders( co_await ch_out->drain(ctx->executor()); } +rapidsmpf::streaming::Node chunkwise_groupby_agg( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out, + cudf::size_type key_col_idx, + cudf::size_type value_col_idx, + auto&& agg_factory +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + std::vector partial_results; + std::uint64_t sequence = 0; + while (true) { + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + break; + } + co_await ctx->executor()->schedule(); + auto chunk = rapidsmpf::ndsh::to_device( + ctx, msg.template release() + ); + auto chunk_stream = chunk.stream(); + auto table = chunk.table_view(); + auto grouper = cudf::groupby::groupby( + table.select({key_col_idx}), cudf::null_policy::EXCLUDE, cudf::sorted::NO + ); + std::vector> aggs; + aggs.emplace_back(agg_factory()); + + auto requests = std::vector(); + requests.push_back(cudf::groupby::aggregation_request( + table.column(value_col_idx), std::move(aggs) + )); + + auto [keys, results] = + grouper.aggregate(requests, chunk_stream, ctx->br()->device_mr()); + // Drop chunk, we don't need it. + std::ignore = std::move(chunk); + auto result = keys->release(); + for (auto&& r : results) { + std::ranges::move(r.results, std::back_inserter(result)); + } + co_await ch_out->send(rapidsmpf::streaming::to_message( + sequence++, + std::make_unique( + std::make_unique(std::move(result)), chunk_stream + ) + )); + } + co_await ch_out->drain(ctx->executor()); +} + +rapidsmpf::streaming::Node all_gather_groupby_sort( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out, + rapidsmpf::OpID tag +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + co_await ctx->executor()->schedule(); + + auto msg = co_await ch_in->receive(); + auto next = co_await ch_in->receive(); + ctx->comm()->logger().print("All gather"); + RAPIDSMPF_EXPECTS(next.empty(), "Expecting concatenated input at this point"); + auto chunk = + rapidsmpf::ndsh::to_device(ctx, msg.release()); + auto chunk_stream = chunk.stream(); + auto local_table = chunk.table_view(); + + if (ctx->comm()->nranks() > 1) { + // Reduce across ranks... + // Need a reduce primitive in rapidsmpf, but let's just use an allgather and + // discard for now. + rapidsmpf::streaming::AllGather gatherer{ctx, tag}; + if (!local_table.is_empty()) { + auto pack = cudf::pack(local_table, chunk_stream, ctx->br()->device_mr()); + gatherer.insert( + 0, + {rapidsmpf::PackedData( + std::move(pack.metadata), + ctx->br()->move(std::move(pack.gpu_data), chunk_stream) + )} + ); + } + gatherer.insert_finished(); + auto packed_data = + co_await gatherer.extract_all(rapidsmpf::streaming::AllGather::Ordered::NO); + if (ctx->comm()->rank() == 0) { + auto global_result = rapidsmpf::unpack_and_concat( + rapidsmpf::unspill_partitions( + std::move(packed_data), ctx->br(), true, ctx->statistics() + ), + chunk_stream, + ctx->br(), + ctx->statistics() + ); + + auto grouper = cudf::groupby::groupby( + global_result->view().select({0}), + cudf::null_policy::EXCLUDE, + cudf::sorted::NO + ); + + std::vector> aggs; + aggs.emplace_back(cudf::make_sum_aggregation()); + + auto requests = std::vector(); + requests.push_back(cudf::groupby::aggregation_request( + global_result->view().column(1), std::move(aggs) + )); + + auto [keys, results] = + grouper.aggregate(requests, chunk_stream, ctx->br()->device_mr()); + global_result.reset(); + auto result = keys->release(); + for (auto&& r : results) { + std::ranges::move(r.results, std::back_inserter(result)); + } + auto grouped = std::make_unique(std::move(result)); + + // We will only actually bother to do this on rank zero. + auto grouped_view = grouped->view(); + auto sorted = cudf::sort_by_key( + grouped_view, + grouped_view.select({1, 0}), + {cudf::order::DESCENDING, cudf::order::ASCENDING}, + {}, + chunk_stream, + ctx->br()->device_mr() + ); + grouped.reset(); + + co_await ch_out->send(rapidsmpf::streaming::to_message( + 0, + std::make_unique( + std::move(sorted), chunk_stream + ) + )); + + } else { + // Drop chunk, we don't need it. + std::ignore = std::move(packed_data); + } + } else { + co_await ch_out->send(rapidsmpf::streaming::to_message( + 0, std::make_unique(std::move(chunk)) + )); + } +} + rapidsmpf::streaming::Node write_parquet( std::shared_ptr ctx, std::shared_ptr ch_in, @@ -169,13 +319,13 @@ rapidsmpf::streaming::Node write_parquet( auto chunk = rapidsmpf::ndsh::to_device( ctx, msg.release() ); - auto sink = cudf::io::sink_info(output_path); + auto sink = cudf::io::sink_info(output_path + ".parquet"); auto builder = cudf::io::parquet_writer_options::builder(sink, chunk.table_view()); auto metadata = cudf::io::table_input_metadata(chunk.table_view()); // Q13 output: c_custkey, o_orderkey - metadata.column_metadata[0].set_name("c_custkey"); - metadata.column_metadata[1].set_name("o_orderkey"); + metadata.column_metadata[0].set_name("count"); + metadata.column_metadata[1].set_name("custdist"); builder = builder.metadata(metadata); auto options = builder.build(); cudf::io::write_parquet(options, chunk.stream()); @@ -418,9 +568,63 @@ int main(int argc, char** argv) { ctx, customer_shuffled, orders_shuffled, customer_x_orders, {0}, {0} )); // c_custkey, o_orderkey - nodes.push_back( - write_parquet(ctx, customer_x_orders, cmd_options.output_file) - ); + auto chunkwise_groupby_output = ctx->create_channel(); + nodes.push_back(chunkwise_groupby_agg( + ctx, + customer_x_orders, + chunkwise_groupby_output, + 0, + 1, + [] { + return cudf::make_count_aggregation(); + } + )); // c_custkey, count + + auto concatenated_groupby_output = ctx->create_channel(); + nodes.push_back(rapidsmpf::ndsh::concatenate( + ctx, + chunkwise_groupby_output, + concatenated_groupby_output, + rapidsmpf::ndsh::ConcatOrder::DONT_CARE + )); // c_custkey, count + + auto groupby_output = ctx->create_channel(); + nodes.push_back(chunkwise_groupby_agg( + ctx, + concatenated_groupby_output, + groupby_output, + 0, + 1, + [] { return cudf::make_sum_aggregation(); } + )); // c_custkey, count + + auto groupby_count_output = ctx->create_channel(); + nodes.push_back(chunkwise_groupby_agg( + ctx, + groupby_output, + groupby_count_output, + 1, + 0, + [] { + return cudf::make_count_aggregation(); + } + )); // count, len + + + auto all_gather_and_sort_output = ctx->create_channel(); + nodes.push_back(all_gather_groupby_sort( + ctx, + groupby_count_output, + all_gather_and_sort_output, + rapidsmpf::OpID{static_cast(10 * i + op_id++)} + )); // count, custdist + + nodes.push_back(write_parquet( + ctx, + all_gather_and_sort_output, + cmd_options.output_file + "_r" + std::to_string(ctx->comm()->rank()) + + "_i" + std::to_string(i) + )); auto end = std::chrono::steady_clock::now(); std::chrono::duration pipeline = end - start; diff --git a/cufile.log b/cufile.log deleted file mode 100644 index 432c61ece..000000000 --- a/cufile.log +++ /dev/null @@ -1,2 +0,0 @@ - 02-12-2025 14:49:44:723 [pid=154298 tid=154557] NOTICE cufio-drv:851 running in compatible mode - 02-12-2025 14:53:49:0 [pid=181014 tid=181134] NOTICE cufio-drv:851 running in compatible mode From d17800ccd9f199ee22ed95f2be180061c0f0da8e Mon Sep 17 00:00:00 2001 From: niranda perera Date: Thu, 4 Dec 2025 15:09:48 -0800 Subject: [PATCH 4/6] precommit Signed-off-by: niranda perera --- cpp/benchmarks/streaming/ndsh/CMakeLists.txt | 3 +- cpp/benchmarks/streaming/ndsh/join.cpp | 26 +-- cpp/benchmarks/streaming/ndsh/q13.cpp | 168 ++++++++++--------- 3 files changed, 101 insertions(+), 96 deletions(-) diff --git a/cpp/benchmarks/streaming/ndsh/CMakeLists.txt b/cpp/benchmarks/streaming/ndsh/CMakeLists.txt index 666072c7d..6e56330ac 100644 --- a/cpp/benchmarks/streaming/ndsh/CMakeLists.txt +++ b/cpp/benchmarks/streaming/ndsh/CMakeLists.txt @@ -66,7 +66,6 @@ install( EXCLUDE_FROM_ALL ) - add_executable(q13 "q13.cpp") set_target_properties( q13 @@ -89,4 +88,4 @@ install( COMPONENT benchmarking DESTINATION bin/benchmarks/librapidsmpf EXCLUDE_FROM_ALL -) \ No newline at end of file +) diff --git a/cpp/benchmarks/streaming/ndsh/join.cpp b/cpp/benchmarks/streaming/ndsh/join.cpp index 9a68d988e..ca058cc06 100644 --- a/cpp/benchmarks/streaming/ndsh/join.cpp +++ b/cpp/benchmarks/streaming/ndsh/join.cpp @@ -518,20 +518,22 @@ streaming::Node shuffle( auto packed_data = co_await shuffler.extract_async(pid); RAPIDSMPF_EXPECTS(packed_data.has_value(), "Partition already extracted"); auto stream = ctx->br()->stream_pool().get_stream(); - co_await ch_out->send(streaming::to_message( - pid, - std::make_unique( - unpack_and_concat( - unspill_partitions( - std::move(*packed_data), ctx->br(), true, ctx->statistics() + co_await ch_out->send( + streaming::to_message( + pid, + std::make_unique( + unpack_and_concat( + unspill_partitions( + std::move(*packed_data), ctx->br(), true, ctx->statistics() + ), + stream, + ctx->br(), + ctx->statistics() ), - stream, - ctx->br(), - ctx->statistics() - ), - stream + stream + ) ) - )); + ); } co_await ch_out->drain(ctx->executor()); } diff --git a/cpp/benchmarks/streaming/ndsh/q13.cpp b/cpp/benchmarks/streaming/ndsh/q13.cpp index deb8a250c..30586a8ee 100644 --- a/cpp/benchmarks/streaming/ndsh/q13.cpp +++ b/cpp/benchmarks/streaming/ndsh/q13.cpp @@ -143,12 +143,14 @@ rapidsmpf::streaming::Node filter_orders( table.select({1, 2}), mask->view(), chunk_stream, mr ); - co_await ch_out->send(rapidsmpf::streaming::to_message( - msg.sequence_number(), - std::make_unique( - std::move(filtered), chunk_stream + co_await ch_out->send( + rapidsmpf::streaming::to_message( + msg.sequence_number(), + std::make_unique( + std::move(filtered), chunk_stream + ) ) - )); + ); } co_await ch_out->drain(ctx->executor()); } @@ -182,9 +184,11 @@ rapidsmpf::streaming::Node chunkwise_groupby_agg( aggs.emplace_back(agg_factory()); auto requests = std::vector(); - requests.push_back(cudf::groupby::aggregation_request( - table.column(value_col_idx), std::move(aggs) - )); + requests.push_back( + cudf::groupby::aggregation_request( + table.column(value_col_idx), std::move(aggs) + ) + ); auto [keys, results] = grouper.aggregate(requests, chunk_stream, ctx->br()->device_mr()); @@ -194,12 +198,14 @@ rapidsmpf::streaming::Node chunkwise_groupby_agg( for (auto&& r : results) { std::ranges::move(r.results, std::back_inserter(result)); } - co_await ch_out->send(rapidsmpf::streaming::to_message( - sequence++, - std::make_unique( - std::make_unique(std::move(result)), chunk_stream + co_await ch_out->send( + rapidsmpf::streaming::to_message( + sequence++, + std::make_unique( + std::make_unique(std::move(result)), chunk_stream + ) ) - )); + ); } co_await ch_out->drain(ctx->executor()); } @@ -260,9 +266,11 @@ rapidsmpf::streaming::Node all_gather_groupby_sort( aggs.emplace_back(cudf::make_sum_aggregation()); auto requests = std::vector(); - requests.push_back(cudf::groupby::aggregation_request( - global_result->view().column(1), std::move(aggs) - )); + requests.push_back( + cudf::groupby::aggregation_request( + global_result->view().column(1), std::move(aggs) + ) + ); auto [keys, results] = grouper.aggregate(requests, chunk_stream, ctx->br()->device_mr()); @@ -285,21 +293,25 @@ rapidsmpf::streaming::Node all_gather_groupby_sort( ); grouped.reset(); - co_await ch_out->send(rapidsmpf::streaming::to_message( - 0, - std::make_unique( - std::move(sorted), chunk_stream + co_await ch_out->send( + rapidsmpf::streaming::to_message( + 0, + std::make_unique( + std::move(sorted), chunk_stream + ) ) - )); + ); } else { // Drop chunk, we don't need it. std::ignore = std::move(packed_data); } } else { - co_await ch_out->send(rapidsmpf::streaming::to_message( - 0, std::make_unique(std::move(chunk)) - )); + co_await ch_out->send( + rapidsmpf::streaming::to_message( + 0, std::make_unique(std::move(chunk)) + ) + ); } } @@ -347,7 +359,6 @@ struct ProgramOptions { int num_streaming_threads{1}; cudf::size_type num_rows_per_chunk{100'000'000}; std::optional spill_device_limit{std::nullopt}; - bool use_shuffle_join = false; std::string output_file; std::string input_directory; }; @@ -364,7 +375,6 @@ ProgramOptions parse_options(int argc, char** argv) { "100000000)\n" << " --spill-device-limit Fractional spill device limit (default: " "None)\n" - << " --use-shuffle-join Use shuffle join (default: false)\n" << " --output-file Output file path (required)\n" << " --input-directory Input directory path (required)\n" << " --help Show this help message\n"; @@ -373,11 +383,10 @@ ProgramOptions parse_options(int argc, char** argv) { static struct option long_options[] = { {"num-streaming-threads", required_argument, nullptr, 1}, {"num-rows-per-chunk", required_argument, nullptr, 2}, - {"use-shuffle-join", no_argument, nullptr, 3}, - {"output-file", required_argument, nullptr, 4}, - {"input-directory", required_argument, nullptr, 5}, - {"help", no_argument, nullptr, 6}, - {"spill-device-limit", required_argument, nullptr, 7}, + {"output-file", required_argument, nullptr, 3}, + {"input-directory", required_argument, nullptr, 4}, + {"help", no_argument, nullptr, 5}, + {"spill-device-limit", required_argument, nullptr, 6}, {nullptr, 0, nullptr, 0} }; @@ -396,20 +405,17 @@ ProgramOptions parse_options(int argc, char** argv) { options.num_rows_per_chunk = std::atoi(optarg); break; case 3: - options.use_shuffle_join = true; - break; - case 4: options.output_file = optarg; saw_output_file = true; break; - case 5: + case 4: options.input_directory = optarg; saw_input_directory = true; break; - case 6: + case 5: print_usage(); std::exit(0); - case 7: + case 6: options.spill_device_limit = std::stod(optarg); break; case '?': @@ -543,69 +549,69 @@ int main(int argc, char** argv) { std::uint32_t num_partitions = 128; auto orders_shuffled = ctx->create_channel(); - nodes.push_back(rapidsmpf::ndsh::shuffle( - ctx, - filtered_orders, - orders_shuffled, - {0}, - num_partitions, - rapidsmpf::OpID{static_cast(10 * i + op_id++)} - )); // o_custkey, o_orderkey + nodes.push_back( + rapidsmpf::ndsh::shuffle( + ctx, + filtered_orders, + orders_shuffled, + {0}, + num_partitions, + rapidsmpf::OpID{static_cast(10 * i + op_id++)} + ) + ); // o_custkey, o_orderkey auto customer_shuffled = ctx->create_channel(); - nodes.push_back(rapidsmpf::ndsh::shuffle( - ctx, - customer, - customer_shuffled, - {0}, - num_partitions, - rapidsmpf::OpID{static_cast(10 * i + op_id++)} - )); // c_custkey + nodes.push_back( + rapidsmpf::ndsh::shuffle( + ctx, + customer, + customer_shuffled, + {0}, + num_partitions, + rapidsmpf::OpID{static_cast(10 * i + op_id++)} + ) + ); // c_custkey // left join customer_shuffled and orders_shuffled auto customer_x_orders = ctx->create_channel(); - nodes.push_back(rapidsmpf::ndsh::left_join_shuffle( - ctx, customer_shuffled, orders_shuffled, customer_x_orders, {0}, {0} - )); // c_custkey, o_orderkey + nodes.push_back( + rapidsmpf::ndsh::left_join_shuffle( + ctx, + customer_shuffled, + orders_shuffled, + customer_x_orders, + {0}, + {0} + ) + ); // c_custkey, o_orderkey auto chunkwise_groupby_output = ctx->create_channel(); nodes.push_back(chunkwise_groupby_agg( - ctx, - customer_x_orders, - chunkwise_groupby_output, - 0, - 1, - [] { + ctx, customer_x_orders, chunkwise_groupby_output, 0, 1, [] { return cudf::make_count_aggregation(); } )); // c_custkey, count auto concatenated_groupby_output = ctx->create_channel(); - nodes.push_back(rapidsmpf::ndsh::concatenate( - ctx, - chunkwise_groupby_output, - concatenated_groupby_output, - rapidsmpf::ndsh::ConcatOrder::DONT_CARE - )); // c_custkey, count + nodes.push_back( + rapidsmpf::ndsh::concatenate( + ctx, + chunkwise_groupby_output, + concatenated_groupby_output, + rapidsmpf::ndsh::ConcatOrder::DONT_CARE + ) + ); // c_custkey, count auto groupby_output = ctx->create_channel(); nodes.push_back(chunkwise_groupby_agg( - ctx, - concatenated_groupby_output, - groupby_output, - 0, - 1, - [] { return cudf::make_sum_aggregation(); } + ctx, concatenated_groupby_output, groupby_output, 0, 1, [] { + return cudf::make_sum_aggregation(); + } )); // c_custkey, count auto groupby_count_output = ctx->create_channel(); nodes.push_back(chunkwise_groupby_agg( - ctx, - groupby_output, - groupby_count_output, - 1, - 0, - [] { + ctx, groupby_output, groupby_count_output, 1, 0, [] { return cudf::make_count_aggregation(); } )); // count, len @@ -647,8 +653,6 @@ int main(int argc, char** argv) { RAPIDSMPF_MPI(MPI_Barrier(mpi_comm)); } - std::cout << "Rank " << comm->rank() << " has " << timings.size() - << " timings" << std::endl; if (comm->rank() == 0) { for (int i = 0; i < 2; i++) { comm->logger().print( From d253e8786035ec861744c82134cb590f3cd05757 Mon Sep 17 00:00:00 2001 From: niranda perera Date: Thu, 4 Dec 2025 15:47:46 -0800 Subject: [PATCH 5/6] possible race fix Signed-off-by: niranda perera --- cpp/benchmarks/streaming/ndsh/join.cpp | 2 +- cpp/benchmarks/streaming/ndsh/q13.cpp | 132 ++++++++++++++++--------- 2 files changed, 87 insertions(+), 47 deletions(-) diff --git a/cpp/benchmarks/streaming/ndsh/join.cpp b/cpp/benchmarks/streaming/ndsh/join.cpp index ca058cc06..bb60968a3 100644 --- a/cpp/benchmarks/streaming/ndsh/join.cpp +++ b/cpp/benchmarks/streaming/ndsh/join.cpp @@ -431,7 +431,7 @@ streaming::Node left_join_shuffle( std::vector right_on ) { streaming::ShutdownAtExit c{left, right, ch_out}; - ctx->comm()->logger().print("left shuffle join"); + ctx->comm()->logger().print("Left shuffle join"); co_await ctx->executor()->schedule(); CudaEvent build_event; while (true) { diff --git a/cpp/benchmarks/streaming/ndsh/q13.cpp b/cpp/benchmarks/streaming/ndsh/q13.cpp index 30586a8ee..812844541 100644 --- a/cpp/benchmarks/streaming/ndsh/q13.cpp +++ b/cpp/benchmarks/streaming/ndsh/q13.cpp @@ -210,7 +210,7 @@ rapidsmpf::streaming::Node chunkwise_groupby_agg( co_await ch_out->drain(ctx->executor()); } -rapidsmpf::streaming::Node all_gather_groupby_sort( +rapidsmpf::streaming::Node all_gather_concatenated( std::shared_ptr ctx, std::shared_ptr ch_in, std::shared_ptr ch_out, @@ -256,52 +256,14 @@ rapidsmpf::streaming::Node all_gather_groupby_sort( ctx->statistics() ); - auto grouper = cudf::groupby::groupby( - global_result->view().select({0}), - cudf::null_policy::EXCLUDE, - cudf::sorted::NO - ); - - std::vector> aggs; - aggs.emplace_back(cudf::make_sum_aggregation()); - - auto requests = std::vector(); - requests.push_back( - cudf::groupby::aggregation_request( - global_result->view().column(1), std::move(aggs) - ) - ); - - auto [keys, results] = - grouper.aggregate(requests, chunk_stream, ctx->br()->device_mr()); - global_result.reset(); - auto result = keys->release(); - for (auto&& r : results) { - std::ranges::move(r.results, std::back_inserter(result)); - } - auto grouped = std::make_unique(std::move(result)); - - // We will only actually bother to do this on rank zero. - auto grouped_view = grouped->view(); - auto sorted = cudf::sort_by_key( - grouped_view, - grouped_view.select({1, 0}), - {cudf::order::DESCENDING, cudf::order::ASCENDING}, - {}, - chunk_stream, - ctx->br()->device_mr() - ); - grouped.reset(); - co_await ch_out->send( rapidsmpf::streaming::to_message( 0, std::make_unique( - std::move(sorted), chunk_stream + std::move(global_result), chunk_stream ) ) ); - } else { // Drop chunk, we don't need it. std::ignore = std::move(packed_data); @@ -313,6 +275,78 @@ rapidsmpf::streaming::Node all_gather_groupby_sort( ) ); } + + co_await ch_out->drain(ctx->executor()); +} + +rapidsmpf::streaming::Node groupby_and_sort( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + co_await ctx->executor()->schedule(); + ctx->comm()->logger().print("Groupby and sort"); + + auto msg = co_await ch_in->receive(); + + // We know we only have a single chunk from the allgather in rank 0. + if (msg.empty()) { + co_return; + } + + auto next = co_await ch_in->receive(); + RAPIDSMPF_EXPECTS(next.empty(), "Expecting concatenated input at this point"); + auto chunk = + rapidsmpf::ndsh::to_device(ctx, msg.release()); + auto chunk_stream = chunk.stream(); + auto all_gathered = chunk.table_view(); + + auto grouper = cudf::groupby::groupby( + all_gathered.select({0}), cudf::null_policy::EXCLUDE, cudf::sorted::NO + ); + + std::vector> aggs; + aggs.emplace_back(cudf::make_sum_aggregation()); + + auto requests = std::vector(); + requests.push_back( + cudf::groupby::aggregation_request(all_gathered.column(1), std::move(aggs)) + ); + + auto [keys, results] = + grouper.aggregate(requests, chunk_stream, ctx->br()->device_mr()); + // Drop chunk, we don't need it. + std::ignore = std::move(chunk); + + auto result = keys->release(); + for (auto&& r : results) { + std::ranges::move(r.results, std::back_inserter(result)); + } + auto grouped = std::make_unique(std::move(result)); + + + // We will only actually bother to do this on rank zero. + auto sorted = cudf::sort_by_key( + grouped->view(), + grouped->view().select({1, 0}), + {cudf::order::DESCENDING, cudf::order::ASCENDING}, + {}, + chunk_stream, + ctx->br()->device_mr() + ); + grouped.reset(); + + co_await ch_out->send( + rapidsmpf::streaming::to_message( + 0, + std::make_unique( + std::move(sorted), chunk_stream + ) + ) + ); + + co_await ch_out->drain(ctx->executor()); } rapidsmpf::streaming::Node write_parquet( @@ -327,7 +361,7 @@ rapidsmpf::streaming::Node write_parquet( if (msg.empty()) { co_return; } - ctx->comm()->logger().print("write parquet"); + ctx->comm()->logger().print("Write parquet"); auto chunk = rapidsmpf::ndsh::to_device( ctx, msg.release() ); @@ -616,18 +650,24 @@ int main(int argc, char** argv) { } )); // count, len - - auto all_gather_and_sort_output = ctx->create_channel(); - nodes.push_back(all_gather_groupby_sort( + auto all_gather_concatenated_output = ctx->create_channel(); + nodes.push_back(all_gather_concatenated( ctx, groupby_count_output, - all_gather_and_sort_output, + all_gather_concatenated_output, rapidsmpf::OpID{static_cast(10 * i + op_id++)} )); // count, custdist + auto groupby_and_sort_output = ctx->create_channel(); + nodes.push_back(groupby_and_sort( + ctx, + all_gather_concatenated_output, + groupby_and_sort_output + )); // count, custdist + nodes.push_back(write_parquet( ctx, - all_gather_and_sort_output, + groupby_and_sort_output, cmd_options.output_file + "_r" + std::to_string(ctx->comm()->rank()) + "_i" + std::to_string(i) )); From 4dccab572d8c61ba00468ad9836e8e66a2840f08 Mon Sep 17 00:00:00 2001 From: niranda perera Date: Thu, 4 Dec 2025 16:41:04 -0800 Subject: [PATCH 6/6] minor change Signed-off-by: niranda perera --- cpp/benchmarks/streaming/ndsh/q13.cpp | 50 ++++++++++++++++----------- 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/cpp/benchmarks/streaming/ndsh/q13.cpp b/cpp/benchmarks/streaming/ndsh/q13.cpp index 812844541..c146cfe3f 100644 --- a/cpp/benchmarks/streaming/ndsh/q13.cpp +++ b/cpp/benchmarks/streaming/ndsh/q13.cpp @@ -302,34 +302,42 @@ rapidsmpf::streaming::Node groupby_and_sort( auto chunk_stream = chunk.stream(); auto all_gathered = chunk.table_view(); - auto grouper = cudf::groupby::groupby( - all_gathered.select({0}), cudf::null_policy::EXCLUDE, cudf::sorted::NO - ); + cudf::table_view grouped_view; + std::unique_ptr grouped; + // if there were multiple ranks, we have a concatenated table with the groupby + // results after allgather. This needs be grouped again. + if (ctx->comm()->nranks() > 1) { + auto grouper = cudf::groupby::groupby( + all_gathered.select({0}), cudf::null_policy::EXCLUDE, cudf::sorted::NO + ); - std::vector> aggs; - aggs.emplace_back(cudf::make_sum_aggregation()); + std::vector> aggs; + aggs.emplace_back(cudf::make_sum_aggregation()); - auto requests = std::vector(); - requests.push_back( - cudf::groupby::aggregation_request(all_gathered.column(1), std::move(aggs)) - ); + auto requests = std::vector(); + requests.push_back( + cudf::groupby::aggregation_request(all_gathered.column(1), std::move(aggs)) + ); - auto [keys, results] = - grouper.aggregate(requests, chunk_stream, ctx->br()->device_mr()); - // Drop chunk, we don't need it. - std::ignore = std::move(chunk); + auto [keys, results] = + grouper.aggregate(requests, chunk_stream, ctx->br()->device_mr()); + // Drop chunk, we don't need it. + std::ignore = std::move(chunk); - auto result = keys->release(); - for (auto&& r : results) { - std::ranges::move(r.results, std::back_inserter(result)); + auto result = keys->release(); + for (auto&& r : results) { + std::ranges::move(r.results, std::back_inserter(result)); + } + grouped = std::make_unique(std::move(result)); + grouped_view = grouped->view(); + } else { + grouped_view = all_gathered; } - auto grouped = std::make_unique(std::move(result)); - // We will only actually bother to do this on rank zero. auto sorted = cudf::sort_by_key( - grouped->view(), - grouped->view().select({1, 0}), + grouped_view, + grouped_view.select({1, 0}), {cudf::order::DESCENDING, cudf::order::ASCENDING}, {}, chunk_stream, @@ -580,7 +588,7 @@ int main(int argc, char** argv) { filter_orders(ctx, orders, filtered_orders) // o_custkey, o_orderkey ); - std::uint32_t num_partitions = 128; + std::uint32_t num_partitions = 128; // should be configurable? auto orders_shuffled = ctx->create_channel(); nodes.push_back(