diff --git a/cpp/benchmarks/streaming/ndsh/CMakeLists.txt b/cpp/benchmarks/streaming/ndsh/CMakeLists.txt index 6fa4bd27b..6e56330ac 100644 --- a/cpp/benchmarks/streaming/ndsh/CMakeLists.txt +++ b/cpp/benchmarks/streaming/ndsh/CMakeLists.txt @@ -65,3 +65,27 @@ install( DESTINATION bin/benchmarks/librapidsmpf EXCLUDE_FROM_ALL ) + +add_executable(q13 "q13.cpp") +set_target_properties( + q13 + PROPERTIES RUNTIME_OUTPUT_DIRECTORY "$" + CXX_STANDARD 20 + CXX_STANDARD_REQUIRED ON + CUDA_STANDARD 20 + CUDA_STANDARD_REQUIRED ON +) +target_compile_options( + q13 PRIVATE "$<$:${RAPIDSMPF_CXX_FLAGS}>" + "$<$:${RAPIDSMPF_CUDA_FLAGS}>" +) +target_link_libraries( + q13 PRIVATE rapidsmpfndsh rapidsmpf::rapidsmpf $ + $ maybe_asan +) +install( + TARGETS q13 + COMPONENT benchmarking + DESTINATION bin/benchmarks/librapidsmpf + EXCLUDE_FROM_ALL +) diff --git a/cpp/benchmarks/streaming/ndsh/join.cpp b/cpp/benchmarks/streaming/ndsh/join.cpp index 271b6e5c0..bb60968a3 100644 --- a/cpp/benchmarks/streaming/ndsh/join.cpp +++ b/cpp/benchmarks/streaming/ndsh/join.cpp @@ -222,6 +222,81 @@ streaming::Message inner_join_chunk( ); } +/** + * @brief Join a table chunk against a build hash table returning a message of the result. + * + * @param ctx Streaming context + * @param right_chunk Chunk to join + * @param sequence Sequence number of the output + * @param joiner hash_join object, representing the build table. + * @param build_carrier Columns from the build-side table to be included in the output. + * @param right_on Key column indiecs in `right_chunk`. + * @param build_stream Stream the `joiner` will be deallocated on. + * @param build_event Event recording the creation of the `joiner`. + * + * @return Message of `TableChunk` containing the result of the inner join. + */ +streaming::Message left_join_chunk( + std::shared_ptr ctx, + streaming::TableChunk&& left_chunk, + std::uint64_t sequence, + cudf::hash_join& joiner, + cudf::table_view build_carrier, + std::vector left_on, + rmm::cuda_stream_view build_stream, + CudaEvent* build_event +) { + CudaEvent event; + left_chunk = to_device(ctx, std::move(left_chunk)); + auto chunk_stream = left_chunk.stream(); + build_event->stream_wait(chunk_stream); + auto probe_table = left_chunk.table_view(); + auto probe_keys = probe_table.select(left_on); + auto [probe_match, build_match] = + joiner.left_join(probe_keys, std::nullopt, chunk_stream, ctx->br()->device_mr()); + + cudf::column_view build_indices = // right + cudf::device_span(*build_match); + cudf::column_view probe_indices = // left + cudf::device_span(*probe_match); + // build_carrier is valid on build_stream, but chunk_stream is + // waiting for build_stream work to be done, so running this on + // chunk_stream is fine. + + // For LEFT join, keep all columns from the probe (left) table including keys, + // since they're always valid (unlike right-side keys which may be NULL). + auto result_columns = cudf::gather( + probe_table, + probe_indices, + cudf::out_of_bounds_policy::DONT_CHECK, + chunk_stream, + ctx->br()->device_mr() + ) + ->release(); + + // left join build indices could have sentinel values (INT_MIN), so they will be OOB. + std::ranges::move( + cudf::gather( + build_carrier, + build_indices, + cudf::out_of_bounds_policy::NULLIFY, + chunk_stream, + ctx->br()->device_mr() + ) + ->release(), + std::back_inserter(result_columns) + ); + // Deallocation of the join indices will happen on build_stream, so add stream dep + // This also ensure deallocation of the hash_join object waits for completion. + cuda_stream_join(build_stream, chunk_stream, &event); + return streaming::to_message( + sequence, + std::make_unique( + std::make_unique(std::move(result_columns)), chunk_stream + ) + ); +} + } // namespace streaming::Node inner_join_broadcast( @@ -347,6 +422,67 @@ streaming::Node inner_join_shuffle( co_await ch_out->drain(ctx->executor()); } +streaming::Node left_join_shuffle( + std::shared_ptr ctx, + std::shared_ptr left, + std::shared_ptr right, + std::shared_ptr ch_out, + std::vector left_on, + std::vector right_on +) { + streaming::ShutdownAtExit c{left, right, ch_out}; + ctx->comm()->logger().print("Left shuffle join"); + co_await ctx->executor()->schedule(); + CudaEvent build_event; + while (true) { + // Requirement: two shuffles kick out partitions in the same order + auto left_msg = co_await left->receive(); + auto right_msg = co_await right->receive(); + if (left_msg.empty()) { + RAPIDSMPF_EXPECTS( + right_msg.empty(), "Left does not have same number of partitions as right" + ); + break; + } + RAPIDSMPF_EXPECTS( + left_msg.sequence_number() == right_msg.sequence_number(), + "Mismatching sequence numbers" + ); + + // use right as build table + auto build_chunk = to_device(ctx, right_msg.release()); + auto build_stream = build_chunk.stream(); + auto joiner = cudf::hash_join( + build_chunk.table_view().select(right_on), + cudf::null_equality::UNEQUAL, + build_stream + ); + build_event.record(build_stream); + + // drop key columns from build table. + std::vector to_keep; + std::ranges::copy_if( + std::ranges::iota_view(0, build_chunk.table_view().num_columns()), + std::back_inserter(to_keep), + [&](auto i) { return std::ranges::find(right_on, i) == right_on.end(); } + ); + auto build_carrier = build_chunk.table_view().select(to_keep); + + auto sequence = left_msg.sequence_number(); + co_await ch_out->send(left_join_chunk( + ctx, + left_msg.release(), + sequence, + joiner, + build_carrier, + left_on, + build_stream, + &build_event + )); + } + co_await ch_out->drain(ctx->executor()); +} + streaming::Node shuffle( std::shared_ptr ctx, std::shared_ptr ch_in, diff --git a/cpp/benchmarks/streaming/ndsh/join.hpp b/cpp/benchmarks/streaming/ndsh/join.hpp index ddd799112..15a2f0c19 100644 --- a/cpp/benchmarks/streaming/ndsh/join.hpp +++ b/cpp/benchmarks/streaming/ndsh/join.hpp @@ -77,6 +77,32 @@ streaming::Node inner_join_shuffle( KeepKeys keep_keys = KeepKeys::YES ); +/** + * @brief Perform a streaming inner join between two tables. + * + * @note This performs a shuffle join, the left and right channels are required to provide + * hash-partitioned data in-order. + * + * @param ctx Streaming context. + * @param left Channel of `TableChunk`s in hash-partitioned order. + * @param right Channel of `TableChunk`s in matching hash-partitioned order. + * @param ch_out Output channel of `TableChunk`s. + * @param left_on Column indices of the keys in the left table. + * @param right_on Column indices of the keys in the right table. + * @param keep_keys Does the result contain the key columns, or only "carrier" value + * columns + * + * @return Coroutine representing the completion of the join. + */ +streaming::Node left_join_shuffle( + std::shared_ptr ctx, + std::shared_ptr left, + std::shared_ptr right, + std::shared_ptr ch_out, + std::vector left_on, + std::vector right_on +); + /** * @brief Shuffle the input channel by hash-partitioning on given key columns. * diff --git a/cpp/benchmarks/streaming/ndsh/q13.cpp b/cpp/benchmarks/streaming/ndsh/q13.cpp new file mode 100644 index 000000000..c146cfe3f --- /dev/null +++ b/cpp/benchmarks/streaming/ndsh/q13.cpp @@ -0,0 +1,723 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "concatenate.hpp" +#include "join.hpp" +#include "utils.hpp" + +namespace { + +std::string get_table_path( + std::string const& input_directory, std::string const& table_name +) { + auto dir = input_directory.empty() ? "." : input_directory; + auto file_path = dir + "/" + table_name + ".parquet"; + if (std::filesystem::exists(file_path)) { + return file_path; + } + return dir + "/" + table_name + "/"; +} + +rapidsmpf::streaming::Node read_customer( + std::shared_ptr ctx, + std::shared_ptr ch_out, + std::size_t num_producers, + cudf::size_type num_rows_per_chunk, + std::string const& input_directory +) { + auto files = rapidsmpf::ndsh::detail::list_parquet_files( + get_table_path(input_directory, "customer") + ); + auto options = cudf::io::parquet_reader_options::builder(cudf::io::source_info(files)) + .columns({"c_custkey"}) + .build(); + return rapidsmpf::streaming::node::read_parquet( + ctx, ch_out, num_producers, options, num_rows_per_chunk + ); +} + +rapidsmpf::streaming::Node read_orders( + std::shared_ptr ctx, + std::shared_ptr ch_out, + std::size_t num_producers, + cudf::size_type num_rows_per_chunk, + std::string const& input_directory +) { + auto files = rapidsmpf::ndsh::detail::list_parquet_files( + get_table_path(input_directory, "orders") + ); + auto options = cudf::io::parquet_reader_options::builder(cudf::io::source_info(files)) + .columns({"o_comment", "o_custkey", "o_orderkey"}) + .build(); + return rapidsmpf::streaming::node::read_parquet( + ctx, ch_out, num_producers, options, num_rows_per_chunk + ); +} + +// TODO: can we push this into the read_orders node? +rapidsmpf::streaming::Node filter_orders( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + auto mr = ctx->br()->device_mr(); + while (true) { + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + break; + } + co_await ctx->executor()->schedule(); + auto chunk = rapidsmpf::ndsh::to_device( + ctx, msg.release() + ); + auto chunk_stream = chunk.stream(); + auto table = chunk.table_view(); + auto o_comment = table.column(0); + // Match rows that contain "special.*requests" and negate to get rows that don't + auto regex_program = cudf::strings::regex_program::create("special.*requests"); + auto contains_mask = + cudf::strings::contains_re(o_comment, *regex_program, chunk_stream, mr); + // Negate: we want rows that do NOT match + auto mask = cudf::unary_operation( + contains_mask->view(), cudf::unary_operator::NOT, chunk_stream, mr + ); + + auto filtered = cudf::apply_boolean_mask( + table.select({1, 2}), mask->view(), chunk_stream, mr + ); + + co_await ch_out->send( + rapidsmpf::streaming::to_message( + msg.sequence_number(), + std::make_unique( + std::move(filtered), chunk_stream + ) + ) + ); + } + co_await ch_out->drain(ctx->executor()); +} + +rapidsmpf::streaming::Node chunkwise_groupby_agg( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out, + cudf::size_type key_col_idx, + cudf::size_type value_col_idx, + auto&& agg_factory +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + std::vector partial_results; + std::uint64_t sequence = 0; + while (true) { + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + break; + } + co_await ctx->executor()->schedule(); + auto chunk = rapidsmpf::ndsh::to_device( + ctx, msg.template release() + ); + auto chunk_stream = chunk.stream(); + auto table = chunk.table_view(); + auto grouper = cudf::groupby::groupby( + table.select({key_col_idx}), cudf::null_policy::EXCLUDE, cudf::sorted::NO + ); + std::vector> aggs; + aggs.emplace_back(agg_factory()); + + auto requests = std::vector(); + requests.push_back( + cudf::groupby::aggregation_request( + table.column(value_col_idx), std::move(aggs) + ) + ); + + auto [keys, results] = + grouper.aggregate(requests, chunk_stream, ctx->br()->device_mr()); + // Drop chunk, we don't need it. + std::ignore = std::move(chunk); + auto result = keys->release(); + for (auto&& r : results) { + std::ranges::move(r.results, std::back_inserter(result)); + } + co_await ch_out->send( + rapidsmpf::streaming::to_message( + sequence++, + std::make_unique( + std::make_unique(std::move(result)), chunk_stream + ) + ) + ); + } + co_await ch_out->drain(ctx->executor()); +} + +rapidsmpf::streaming::Node all_gather_concatenated( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out, + rapidsmpf::OpID tag +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + co_await ctx->executor()->schedule(); + + auto msg = co_await ch_in->receive(); + auto next = co_await ch_in->receive(); + ctx->comm()->logger().print("All gather"); + RAPIDSMPF_EXPECTS(next.empty(), "Expecting concatenated input at this point"); + auto chunk = + rapidsmpf::ndsh::to_device(ctx, msg.release()); + auto chunk_stream = chunk.stream(); + auto local_table = chunk.table_view(); + + if (ctx->comm()->nranks() > 1) { + // Reduce across ranks... + // Need a reduce primitive in rapidsmpf, but let's just use an allgather and + // discard for now. + rapidsmpf::streaming::AllGather gatherer{ctx, tag}; + if (!local_table.is_empty()) { + auto pack = cudf::pack(local_table, chunk_stream, ctx->br()->device_mr()); + gatherer.insert( + 0, + {rapidsmpf::PackedData( + std::move(pack.metadata), + ctx->br()->move(std::move(pack.gpu_data), chunk_stream) + )} + ); + } + gatherer.insert_finished(); + auto packed_data = + co_await gatherer.extract_all(rapidsmpf::streaming::AllGather::Ordered::NO); + if (ctx->comm()->rank() == 0) { + auto global_result = rapidsmpf::unpack_and_concat( + rapidsmpf::unspill_partitions( + std::move(packed_data), ctx->br(), true, ctx->statistics() + ), + chunk_stream, + ctx->br(), + ctx->statistics() + ); + + co_await ch_out->send( + rapidsmpf::streaming::to_message( + 0, + std::make_unique( + std::move(global_result), chunk_stream + ) + ) + ); + } else { + // Drop chunk, we don't need it. + std::ignore = std::move(packed_data); + } + } else { + co_await ch_out->send( + rapidsmpf::streaming::to_message( + 0, std::make_unique(std::move(chunk)) + ) + ); + } + + co_await ch_out->drain(ctx->executor()); +} + +rapidsmpf::streaming::Node groupby_and_sort( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + co_await ctx->executor()->schedule(); + ctx->comm()->logger().print("Groupby and sort"); + + auto msg = co_await ch_in->receive(); + + // We know we only have a single chunk from the allgather in rank 0. + if (msg.empty()) { + co_return; + } + + auto next = co_await ch_in->receive(); + RAPIDSMPF_EXPECTS(next.empty(), "Expecting concatenated input at this point"); + auto chunk = + rapidsmpf::ndsh::to_device(ctx, msg.release()); + auto chunk_stream = chunk.stream(); + auto all_gathered = chunk.table_view(); + + cudf::table_view grouped_view; + std::unique_ptr grouped; + // if there were multiple ranks, we have a concatenated table with the groupby + // results after allgather. This needs be grouped again. + if (ctx->comm()->nranks() > 1) { + auto grouper = cudf::groupby::groupby( + all_gathered.select({0}), cudf::null_policy::EXCLUDE, cudf::sorted::NO + ); + + std::vector> aggs; + aggs.emplace_back(cudf::make_sum_aggregation()); + + auto requests = std::vector(); + requests.push_back( + cudf::groupby::aggregation_request(all_gathered.column(1), std::move(aggs)) + ); + + auto [keys, results] = + grouper.aggregate(requests, chunk_stream, ctx->br()->device_mr()); + // Drop chunk, we don't need it. + std::ignore = std::move(chunk); + + auto result = keys->release(); + for (auto&& r : results) { + std::ranges::move(r.results, std::back_inserter(result)); + } + grouped = std::make_unique(std::move(result)); + grouped_view = grouped->view(); + } else { + grouped_view = all_gathered; + } + + // We will only actually bother to do this on rank zero. + auto sorted = cudf::sort_by_key( + grouped_view, + grouped_view.select({1, 0}), + {cudf::order::DESCENDING, cudf::order::ASCENDING}, + {}, + chunk_stream, + ctx->br()->device_mr() + ); + grouped.reset(); + + co_await ch_out->send( + rapidsmpf::streaming::to_message( + 0, + std::make_unique( + std::move(sorted), chunk_stream + ) + ) + ); + + co_await ch_out->drain(ctx->executor()); +} + +rapidsmpf::streaming::Node write_parquet( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::string output_path +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in}; + co_await ctx->executor()->schedule(); + while (true) { + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + co_return; + } + ctx->comm()->logger().print("Write parquet"); + auto chunk = rapidsmpf::ndsh::to_device( + ctx, msg.release() + ); + auto sink = cudf::io::sink_info(output_path + ".parquet"); + auto builder = + cudf::io::parquet_writer_options::builder(sink, chunk.table_view()); + auto metadata = cudf::io::table_input_metadata(chunk.table_view()); + // Q13 output: c_custkey, o_orderkey + metadata.column_metadata[0].set_name("count"); + metadata.column_metadata[1].set_name("custdist"); + builder = builder.metadata(metadata); + auto options = builder.build(); + cudf::io::write_parquet(options, chunk.stream()); + ctx->comm()->logger().print( + "Wrote chunk with ", + chunk.table_view().num_rows(), + " rows and ", + chunk.table_view().num_columns(), + " columns to ", + output_path + ); + } + co_await ch_in->drain(ctx->executor()); +} + +} // namespace + +struct ProgramOptions { + int num_streaming_threads{1}; + cudf::size_type num_rows_per_chunk{100'000'000}; + std::optional spill_device_limit{std::nullopt}; + std::string output_file; + std::string input_directory; +}; + +ProgramOptions parse_options(int argc, char** argv) { + ProgramOptions options; + + auto print_usage = [&argv]() { + std::cerr + << "Usage: " << argv[0] << " [options]\n" + << "Options:\n" + << " --num-streaming-threads Number of streaming threads (default: 1)\n" + << " --num-rows-per-chunk Number of rows per chunk (default: " + "100000000)\n" + << " --spill-device-limit Fractional spill device limit (default: " + "None)\n" + << " --output-file Output file path (required)\n" + << " --input-directory Input directory path (required)\n" + << " --help Show this help message\n"; + }; + + static struct option long_options[] = { + {"num-streaming-threads", required_argument, nullptr, 1}, + {"num-rows-per-chunk", required_argument, nullptr, 2}, + {"output-file", required_argument, nullptr, 3}, + {"input-directory", required_argument, nullptr, 4}, + {"help", no_argument, nullptr, 5}, + {"spill-device-limit", required_argument, nullptr, 6}, + {nullptr, 0, nullptr, 0} + }; + + int opt; + int option_index = 0; + + bool saw_output_file = false; + bool saw_input_directory = false; + + while ((opt = getopt_long(argc, argv, "", long_options, &option_index)) != -1) { + switch (opt) { + case 1: + options.num_streaming_threads = std::atoi(optarg); + break; + case 2: + options.num_rows_per_chunk = std::atoi(optarg); + break; + case 3: + options.output_file = optarg; + saw_output_file = true; + break; + case 4: + options.input_directory = optarg; + saw_input_directory = true; + break; + case 5: + print_usage(); + std::exit(0); + case 6: + options.spill_device_limit = std::stod(optarg); + break; + case '?': + if (optopt == 0 && optind > 1) { + std::cerr << "Error: Unknown option '" << argv[optind - 1] << "'\n\n"; + } + print_usage(); + std::exit(1); + default: + print_usage(); + std::exit(1); + } + } + + // Check if required options were provided + if (!saw_output_file || !saw_input_directory) { + if (!saw_output_file) { + std::cerr << "Error: --output-file is required\n"; + } + if (!saw_input_directory) { + std::cerr << "Error: --input-directory is required\n"; + } + std::cerr << std::endl; + print_usage(); + std::exit(1); + } + + return options; +} + +/** + * @brief Run a derived version of TPC-H query 9. + * + * The SQL form of the query is: + * @code{.sql} + * select + * c_count, + * COUNT(*) AS custdist + * from + * ( + * select + * c_custkey, + * COUNT(o_orderkey) AS c_count + * from + * customer + * left outer join orders on + * c_custkey = o_custkey + * and o_comment not like '%special%requests%' + * group by + * c_custkey + * ) as c_orders + * group by + * c_count + * order by + * custdist DESC, + * c_count DESC; + * @endcode{} + */ +int main(int argc, char** argv) { + cudaFree(nullptr); + rapidsmpf::mpi::init(&argc, &argv); + MPI_Comm mpi_comm; + RAPIDSMPF_MPI(MPI_Comm_dup(MPI_COMM_WORLD, &mpi_comm)); + auto cmd_options = parse_options(argc, argv); + auto limit_size = rmm::percent_of_free_device_memory( + static_cast(cmd_options.spill_device_limit.value_or(1) * 100) + ); + rmm::mr::cuda_async_memory_resource mr{}; + auto stats_mr = rapidsmpf::RmmResourceAdaptor(&mr); + rmm::device_async_resource_ref mr_ref(stats_mr); + rmm::mr::set_current_device_resource(&stats_mr); + rmm::mr::set_current_device_resource_ref(mr_ref); + std::unordered_map + memory_available{}; + if (cmd_options.spill_device_limit.has_value()) { + memory_available[rapidsmpf::MemoryType::DEVICE] = rapidsmpf::LimitAvailableMemory{ + &stats_mr, static_cast(limit_size) + }; + } + auto br = std::make_shared( + stats_mr, std::move(memory_available) + ); + auto envvars = rapidsmpf::config::get_environment_variables(); + envvars["num_streaming_threads"] = std::to_string(cmd_options.num_streaming_threads); + auto options = rapidsmpf::config::Options(envvars); + auto stats = std::make_shared(&stats_mr); + { + auto comm = rapidsmpf::ucxx::init_using_mpi(mpi_comm, options); + auto progress = + std::make_shared(comm->logger(), stats); + auto ctx = + std::make_shared(options, comm, br, stats); + comm->logger().print( + "Executor has ", ctx->executor()->thread_count(), " threads" + ); + comm->logger().print("Executor has ", ctx->comm()->nranks(), " ranks"); + + std::string output_path = cmd_options.output_file; + std::vector timings; + for (int i = 0; i < 2; i++) { + rapidsmpf::OpID op_id{0}; + std::vector nodes; + auto start = std::chrono::steady_clock::now(); + { + RAPIDSMPF_NVTX_SCOPED_RANGE("Constructing Q13 pipeline"); + + auto customer = ctx->create_channel(); + nodes.push_back(read_customer( + ctx, + customer, + /* num_tickets */ 4, + cmd_options.num_rows_per_chunk, + cmd_options.input_directory + )); // c_custkey + + auto orders = ctx->create_channel(); + nodes.push_back(read_orders( + ctx, + orders, + /* num_tickets */ 4, + cmd_options.num_rows_per_chunk, + cmd_options.input_directory + )); // o_comment, o_custkey, o_orderkey + + + auto filtered_orders = ctx->create_channel(); + nodes.push_back( + filter_orders(ctx, orders, filtered_orders) // o_custkey, o_orderkey + ); + + std::uint32_t num_partitions = 128; // should be configurable? + + auto orders_shuffled = ctx->create_channel(); + nodes.push_back( + rapidsmpf::ndsh::shuffle( + ctx, + filtered_orders, + orders_shuffled, + {0}, + num_partitions, + rapidsmpf::OpID{static_cast(10 * i + op_id++)} + ) + ); // o_custkey, o_orderkey + + auto customer_shuffled = ctx->create_channel(); + nodes.push_back( + rapidsmpf::ndsh::shuffle( + ctx, + customer, + customer_shuffled, + {0}, + num_partitions, + rapidsmpf::OpID{static_cast(10 * i + op_id++)} + ) + ); // c_custkey + + // left join customer_shuffled and orders_shuffled + auto customer_x_orders = ctx->create_channel(); + nodes.push_back( + rapidsmpf::ndsh::left_join_shuffle( + ctx, + customer_shuffled, + orders_shuffled, + customer_x_orders, + {0}, + {0} + ) + ); // c_custkey, o_orderkey + + auto chunkwise_groupby_output = ctx->create_channel(); + nodes.push_back(chunkwise_groupby_agg( + ctx, customer_x_orders, chunkwise_groupby_output, 0, 1, [] { + return cudf::make_count_aggregation(); + } + )); // c_custkey, count + + auto concatenated_groupby_output = ctx->create_channel(); + nodes.push_back( + rapidsmpf::ndsh::concatenate( + ctx, + chunkwise_groupby_output, + concatenated_groupby_output, + rapidsmpf::ndsh::ConcatOrder::DONT_CARE + ) + ); // c_custkey, count + + auto groupby_output = ctx->create_channel(); + nodes.push_back(chunkwise_groupby_agg( + ctx, concatenated_groupby_output, groupby_output, 0, 1, [] { + return cudf::make_sum_aggregation(); + } + )); // c_custkey, count + + auto groupby_count_output = ctx->create_channel(); + nodes.push_back(chunkwise_groupby_agg( + ctx, groupby_output, groupby_count_output, 1, 0, [] { + return cudf::make_count_aggregation(); + } + )); // count, len + + auto all_gather_concatenated_output = ctx->create_channel(); + nodes.push_back(all_gather_concatenated( + ctx, + groupby_count_output, + all_gather_concatenated_output, + rapidsmpf::OpID{static_cast(10 * i + op_id++)} + )); // count, custdist + + auto groupby_and_sort_output = ctx->create_channel(); + nodes.push_back(groupby_and_sort( + ctx, + all_gather_concatenated_output, + groupby_and_sort_output + )); // count, custdist + + nodes.push_back(write_parquet( + ctx, + groupby_and_sort_output, + cmd_options.output_file + "_r" + std::to_string(ctx->comm()->rank()) + + "_i" + std::to_string(i) + )); + + auto end = std::chrono::steady_clock::now(); + std::chrono::duration pipeline = end - start; + start = std::chrono::steady_clock::now(); + { + RAPIDSMPF_NVTX_SCOPED_RANGE("Q13 Iteration"); + rapidsmpf::streaming::run_streaming_pipeline(std::move(nodes)); + } + end = std::chrono::steady_clock::now(); + std::chrono::duration compute = end - start; + comm->logger().print( + "Iteration ", i, " pipeline construction time [s]: ", pipeline.count() + ); + comm->logger().print( + "Iteration ", i, " compute time [s]: ", compute.count() + ); + timings.push_back(pipeline.count()); + timings.push_back(compute.count()); + ctx->comm()->logger().print(stats->report()); + RAPIDSMPF_MPI(MPI_Barrier(mpi_comm)); + } + + if (comm->rank() == 0) { + for (int i = 0; i < 2; i++) { + comm->logger().print( + "Iteration ", + i, + " pipeline construction time [s]: ", + timings[size_t(2 * i)] + ); + comm->logger().print( + "Iteration ", i, " compute time [s]: ", timings[size_t(2 * i + 1)] + ); + } + } + } + + RAPIDSMPF_MPI(MPI_Comm_free(&mpi_comm)); + RAPIDSMPF_MPI(MPI_Finalize()); + return 0; + } +}