diff --git a/cpp/benchmarks/streaming/ndsh/q03.cpp b/cpp/benchmarks/streaming/ndsh/q03.cpp new file mode 100644 index 000000000..3bd4b01da --- /dev/null +++ b/cpp/benchmarks/streaming/ndsh/q03.cpp @@ -0,0 +1,1088 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-License-Identifier: Apache-2.0 + */ + + #include + #include + #include + #include + #include + #include + #include + + #include + #include + #include + + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include +#include + #include + #include + #include + #include + + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + + #include "concatenate.hpp" + #include "join.hpp" + #include "utilities.hpp" + + namespace { + + std::string get_table_path( + std::string const& input_directory, std::string const& table_name + ) { + auto dir = input_directory.empty() ? "." : input_directory; + auto file_path = dir + "/" + table_name + ".parquet"; + + if (std::filesystem::exists(file_path)) { + return file_path; + } + + return dir + "/" + table_name + "/"; + } + + rapidsmpf::streaming::Node read_customer( + std::shared_ptr ctx, + std::shared_ptr ch_out, + std::size_t num_producers, + cudf::size_type num_rows_per_chunk, + std::string const& input_directory +) { + auto files = rapidsmpf::ndsh::detail::list_parquet_files( + get_table_path(input_directory, "customer") + ); + auto options = cudf::io::parquet_reader_options::builder(cudf::io::source_info(files)) + .columns({ + "c_mktsegment", // 0 + "c_custkey" // 1 + }) + .build(); + return rapidsmpf::streaming::node::read_parquet( + ctx, ch_out, num_producers, options, num_rows_per_chunk + ); +} + + rapidsmpf::streaming::Node read_lineitem( + std::shared_ptr ctx, + std::shared_ptr ch_out, + std::size_t num_producers, + cudf::size_type num_rows_per_chunk, + std::string const& input_directory + ) { + auto files = rapidsmpf::ndsh::detail::list_parquet_files( + get_table_path(input_directory, "lineitem") + ); + auto options = cudf::io::parquet_reader_options::builder(cudf::io::source_info(files)) + .columns( + { + "l_orderkey", // 0 + "l_shipdate", // 1 -- Timestamp[ms] + "l_extendedprice", // 2 + "l_discount", // 3 + } + ) + .build(); + return rapidsmpf::streaming::node::read_parquet( + ctx, ch_out, num_producers, options, num_rows_per_chunk + ); + } + + + rapidsmpf::streaming::Node read_orders( + std::shared_ptr ctx, + std::shared_ptr ch_out, + std::size_t num_producers, + cudf::size_type num_rows_per_chunk, + std::string const& input_directory + ) { + auto files = rapidsmpf::ndsh::detail::list_parquet_files( + get_table_path(input_directory, "orders") + ); + auto options = cudf::io::parquet_reader_options::builder(cudf::io::source_info(files)) + .columns({ + "o_orderkey", // 0 + "o_orderdate", // 1 -- Timestamp[ms] + "o_shippriority", // 2 + "o_custkey" // 3 + }) + .build(); + return rapidsmpf::streaming::node::read_parquet( + ctx, ch_out, num_producers, options, num_rows_per_chunk + ); + } + +// customer.filter(pl.col("c_mktsegment") == var1) ## var1 = "BUILDING" + rapidsmpf::streaming::Node filter_customer( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out + ) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + auto mr = ctx->br()->device_mr(); + co_await ctx->executor()->schedule(); + while (true) { + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + break; + } + auto chunk = rapidsmpf::ndsh::to_device( + ctx, msg.release() + ); + auto chunk_stream = chunk.stream(); + auto table = chunk.table_view(); + auto var1 = cudf::make_string_scalar("BUILDING", chunk_stream, mr); + + auto mask = cudf::binary_operation( + table.column(0), // c_mktsegment is col 0 + *var1, + cudf::binary_operator::EQUAL, + cudf::data_type(cudf::type_id::BOOL8), + chunk_stream, + mr + ); + + auto filtered = cudf::apply_boolean_mask( + table, + mask->view(), + chunk_stream, mr + ); + // std::cout << "Number of columns in filtered customer output: " << table.select({1}).num_columns() << std::endl; + co_await ch_out->send( + rapidsmpf::streaming::to_message( + msg.sequence_number(), + std::make_unique( + std::move(filtered), + chunk_stream + ) + ) + ); + } + co_await ch_out->drain(ctx->executor()); + } + +// # .filter(pl.col("o_orderdate") < var2) ## var2 = date(1995, 3, 15) + rapidsmpf::streaming::Node filter_orders( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + auto mr = ctx->br()->device_mr(); + co_await ctx->executor()->schedule(); + while (true) { + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + break; + } + auto chunk = rapidsmpf::ndsh::to_device( + ctx, msg.release() + ); + auto chunk_stream = chunk.stream(); + auto table = chunk.table_view(); + + // auto ms_since_epoch = cudf::timestamp_ms{cudf::duration_ms{795225600000}}; + // auto var2 = cudf::timestamp_scalar{ms_since_epoch}; + auto days_since_epoch = cudf::timestamp_D{cudf::duration_D{9204}}; + auto var2 = cudf::timestamp_scalar{days_since_epoch}; + + auto mask = cudf::binary_operation( + table.column(1), // o_orderdate is col 1 + var2, + cudf::binary_operator::LESS, + cudf::data_type(cudf::type_id::BOOL8), + chunk_stream, + mr + ); + auto filtered = cudf::apply_boolean_mask( + table, + mask->view(), + chunk_stream, mr + ); + + // std::cout << "Number of columns in filtered orders output: " << table.num_columns() << std::endl; + co_await ch_out->send( + rapidsmpf::streaming::to_message( + msg.sequence_number(), + std::make_unique( + std::move(filtered), + chunk_stream + ) + ) + ); + } + co_await ch_out->drain(ctx->executor()); +} + + +// .filter(pl.col("l_shipdate") > var2) ## var2 = date(1995, 3, 15) +rapidsmpf::streaming::Node filter_lineitem( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + auto mr = ctx->br()->device_mr(); + co_await ctx->executor()->schedule(); + while (true) { + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + break; + } + auto chunk = rapidsmpf::ndsh::to_device( + ctx, msg.release() + ); + auto chunk_stream = chunk.stream(); + auto table = chunk.table_view(); + + // auto ms_since_epoch = cudf::timestamp_ms{cudf::duration_ms{795225600000}}; + // auto var2 = cudf::timestamp_scalar{ms_since_epoch}; + auto days_since_epoch = cudf::timestamp_D{cudf::duration_D{9204}}; + auto var2 = cudf::timestamp_scalar{days_since_epoch}; + + auto mask = cudf::binary_operation( + table.column(1), // l_shipdate is col 1 + var2, + cudf::binary_operator::GREATER, + cudf::data_type(cudf::type_id::BOOL8), + chunk_stream, + mr + ); + auto filtered = cudf::apply_boolean_mask( + table, + mask->view(), + chunk_stream, mr + ); + // std::cout << "Number of columns in filtered lineitem output: " << table.select({0, 2, 3}).num_columns() << std::endl; + co_await ch_out->send( + rapidsmpf::streaming::to_message( + msg.sequence_number(), + std::make_unique( + std::move(filtered), + chunk_stream + ) + ) + ); + } + co_await ch_out->drain(ctx->executor()); +} + + + [[maybe_unused]] rapidsmpf::streaming::Node with_columns( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out + ) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + + // customer_x_orders_x_lineitem is the input to the with_column op + // "c_mktsegment", # 0 + // "c_custkey", # 1 (customers<-orders on o_custkey) + // "o_orderkey", # 2 (orders<-lineitem on o_orderkey) + // "o_orderdate", # 3 + // "o_shippriority", # 4 + // "l_shipdate", # 5 + // "l_extendedprice", # 6 + // "l_discount", # 7 + co_await ctx->executor()->schedule(); + while (true) { + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + break; + } + auto chunk = rapidsmpf::ndsh::to_device( + ctx, msg.release() + ); + auto chunk_stream = chunk.stream(); + auto sequence_number = msg.sequence_number(); + auto table = chunk.table_view(); + + // std::cout << "Number of columns in with columns output: " << table.num_columns() << std::endl; + + // for (int i = 0; i < table.num_columns(); ++i) { + // cudf::data_type dtype = table.column(i).type(); + // cudf::type_id tid = dtype.id(); + // std::cout << "Column " << i << " type_id: " << static_cast(tid) << std::endl; + // } + + std::vector> result; + result.reserve(4); + + // o_orderkey + result.push_back( + std::make_unique( + table.column(2), chunk_stream, ctx->br()->device_mr() + ) + ); + // o_orderdate + result.push_back( + std::make_unique( + table.column(3), chunk_stream, ctx->br()->device_mr() + ) + ); + // o_shippriority + result.push_back( + std::make_unique( + table.column(4), chunk_stream, ctx->br()->device_mr() + ) + ); + auto extendedprice = table.column(6); + auto discount = table.column(7); + + std::string udf = + R"***( + static __device__ void calculate_amount(double *amount, double discount, double extprice) { + *amount = extprice * (1 - discount); + } + )***"; + result.push_back( + cudf::transform( + {discount, extendedprice}, + udf, + cudf::data_type(cudf::type_id::FLOAT64), + false, + std::nullopt, + cudf::null_aware::NO, + chunk_stream, + ctx->br()->device_mr() + ) + ); + // Out: [o_orderkey, o_orderdate, o_shippriority, revenue] + co_await ch_out->send( + rapidsmpf::streaming::to_message( + sequence_number, + std::make_unique( + std::make_unique(std::move(result)), chunk_stream + ) + ) + ); + } + co_await ch_out->drain(ctx->executor()); + } + +// In: [o_orderkey, o_orderdate, o_shippriority, revenue] + [[maybe_unused]] rapidsmpf::streaming::Node chunkwise_groupby_agg( + [[maybe_unused]] std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out + ) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + std::vector partial_results; + std::uint64_t sequence = 0; + co_await ctx->executor()->schedule(); + while (true) { + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + break; + } + ctx->comm()->logger().print("Chunkwise groupby"); + auto chunk = rapidsmpf::ndsh::to_device( + ctx, msg.release() + ); + auto chunk_stream = chunk.stream(); + auto table = chunk.table_view(); + + // std::cout << "Number of columns in groupby input: " << table.num_columns() << std::endl; + + // for (int i = 0; i < table.num_columns(); ++i) { + // cudf::data_type dtype = table.column(i).type(); + // cudf::type_id tid = dtype.id(); + // std::cout << "Column " << i << " type_id: " << static_cast(tid) << std::endl; + // } + + auto grouper = cudf::groupby::groupby( + // group by [o_orderkey, o_orderdate, o_shippriority] + table.select({0, 1, 2}), cudf::null_policy::EXCLUDE, cudf::sorted::NO + ); + auto requests = std::vector(); + std::vector> aggs; + aggs.push_back(cudf::make_sum_aggregation()); + requests.push_back( + cudf::groupby::aggregation_request(table.column(3), std::move(aggs)) + ); + auto [keys, results] = + grouper.aggregate(requests, chunk_stream, ctx->br()->device_mr()); + // Drop chunk, we don't need it. + std::ignore = std::move(chunk); + auto result = keys->release(); + for (auto&& r : results) { + std::ranges::move(r.results, std::back_inserter(result)); + } + co_await ch_out->send( + rapidsmpf::streaming::to_message( + sequence++, + std::make_unique( + std::make_unique(std::move(result)), chunk_stream + ) + ) + ); + } + co_await ch_out->drain(ctx->executor()); + } + + [[maybe_unused]] rapidsmpf::streaming::Node final_groupby_agg( + [[maybe_unused]] std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out, + rapidsmpf::OpID tag + ) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + co_await ctx->executor()->schedule(); + // TODO: requires concatenated input stream. + auto msg = co_await ch_in->receive(); + auto next = co_await ch_in->receive(); + ctx->comm()->logger().print("Final groupby"); + RAPIDSMPF_EXPECTS(next.empty(), "Expecting concatenated input at this point"); + auto chunk = + rapidsmpf::ndsh::to_device(ctx, msg.release()); + auto chunk_stream = chunk.stream(); + auto table = chunk.table_view(); + std::unique_ptr local_result{nullptr}; + if (!table.is_empty()) { + auto grouper = cudf::groupby::groupby( + table.select({0, 1, 2}), cudf::null_policy::EXCLUDE, cudf::sorted::NO + ); + auto requests = std::vector(); + std::vector> aggs; + aggs.push_back(cudf::make_sum_aggregation()); + requests.push_back( + cudf::groupby::aggregation_request(table.column(3), std::move(aggs)) + ); + auto [keys, results] = + grouper.aggregate(requests, chunk_stream, ctx->br()->device_mr()); + // Drop chunk, we don't need it. + std::ignore = std::move(chunk); + auto result = keys->release(); + for (auto&& r : results) { + std::ranges::move(r.results, std::back_inserter(result)); + } + local_result = std::make_unique(std::move(result)); + } + if (ctx->comm()->nranks() > 1) { + // Reduce across ranks... + // Need a reduce primitive in rapidsmpf, but let's just use an allgather and + // discard for now. + rapidsmpf::streaming::AllGather gatherer{ctx, tag}; + if (local_result) { + auto pack = + cudf::pack(local_result->view(), chunk_stream, ctx->br()->device_mr()); + gatherer.insert( + 0, + {rapidsmpf::PackedData( + std::move(pack.metadata), + ctx->br()->move(std::move(pack.gpu_data), chunk_stream) + )} + ); + } + gatherer.insert_finished(); + auto packed_data = + co_await gatherer.extract_all(rapidsmpf::streaming::AllGather::Ordered::NO); + if (ctx->comm()->rank() == 0) { + std::vector chunks; + chunks.reserve(packed_data.size()); + std::ranges::transform( + packed_data, std::back_inserter(chunks), [](auto& chunk) { + return std::move(chunk.data); + } + ); + auto global_result = rapidsmpf::unpack_and_concat( + rapidsmpf::unspill_partitions( + std::move(chunks), ctx->br(), true, ctx->statistics() + ), + chunk_stream, + ctx->br(), + ctx->statistics() + ); + if (ctx->comm()->rank() == 0) { + // We will only actually bother to do this on rank zero. + auto result_view = global_result->view(); + auto grouper = cudf::groupby::groupby( + result_view.select({0, 1, 2}), + cudf::null_policy::EXCLUDE, + cudf::sorted::NO + ); + auto requests = std::vector(); + std::vector> aggs; + aggs.push_back(cudf::make_sum_aggregation()); + requests.push_back( + cudf::groupby::aggregation_request( + result_view.column(3), std::move(aggs) + ) + ); + auto [keys, results] = + grouper.aggregate(requests, chunk_stream, ctx->br()->device_mr()); + global_result.reset(); + auto result = keys->release(); + for (auto&& r : results) { + std::ranges::move(r.results, std::back_inserter(result)); + } + co_await ch_out->send( + rapidsmpf::streaming::to_message( + 0, + std::make_unique( + std::make_unique(std::move(result)), chunk_stream + ) + ) + ); + } + } else { + std::ignore = std::move(packed_data); + } + } else { + co_await ch_out->send( + rapidsmpf::streaming::to_message( + 0, + std::make_unique( + std::move(local_result), chunk_stream + ) + ) + ); + } + co_await ch_out->drain(ctx->executor()); + } + +// change the order of columns from o_orderkey, o_orderdate, o_shippriority, revenue +// to o_orderkey, revenue, o_orderdate, o_shippriority +[[maybe_unused]] rapidsmpf::streaming::Node select_columns( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + + co_await ctx->executor()->schedule(); + while (true) { + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + break; + } + auto chunk = rapidsmpf::ndsh::to_device( + ctx, msg.release() + ); + auto chunk_stream = chunk.stream(); + auto sequence_number = msg.sequence_number(); + auto table = chunk.table_view(); + std::vector> result; + result.reserve(4); + + // o_orderkey + result.push_back( + std::make_unique( + table.column(0), chunk_stream, ctx->br()->device_mr() + ) + ); + // revenue + result.push_back( + std::make_unique( + table.column(3), chunk_stream, ctx->br()->device_mr() + ) + ); + // o_orderdate + result.push_back( + std::make_unique( + table.column(1), chunk_stream, ctx->br()->device_mr() + ) + ); + // o_shippriority + result.push_back( + std::make_unique( + table.column(2), chunk_stream, ctx->br()->device_mr() + ) + ); + co_await ch_out->send( + rapidsmpf::streaming::to_message( + sequence_number, + std::make_unique( + std::make_unique(std::move(result)), chunk_stream + ) + ) + ); + } + co_await ch_out->drain(ctx->executor()); +} + + + + [[maybe_unused]] rapidsmpf::streaming::Node sort_by( + [[maybe_unused]] std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out + ) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + co_await ctx->executor()->schedule(); + auto msg = co_await ch_in->receive(); + // We know we only have a single chunk from the groupby + if (msg.empty()) { + co_return; + } + auto chunk = + rapidsmpf::ndsh::to_device(ctx, msg.release()); + auto table = chunk.table_view(); + co_await ch_out->send( + rapidsmpf::streaming::to_message( + 0, + std::make_unique( + cudf::sort_by_key( + table, + table.select({1, 2}), + {cudf::order::DESCENDING, cudf::order::ASCENDING}, + {cudf::null_order::BEFORE, cudf::null_order::BEFORE}, + chunk.stream(), + ctx->br()->device_mr() + ), + chunk.stream() + ) + ) + ); + co_await ch_out->drain(ctx->executor()); + } + + + + +// take first 10 rows +[[maybe_unused]] rapidsmpf::streaming::Node head( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + + co_await ctx->executor()->schedule(); + while (true) { + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + break; + } + auto chunk = rapidsmpf::ndsh::to_device( + ctx, msg.release() + ); + auto chunk_stream = chunk.stream(); + auto sequence_number = msg.sequence_number(); + auto table = chunk.table_view(); + std::vector head_indices{0, 10}; + auto first_10_rows = cudf::slice(table, head_indices); + + co_await ch_out->send( + rapidsmpf::streaming::to_message( + sequence_number, + std::make_unique( + std::make_unique(std::move(first_10_rows[0])), chunk_stream + ) + ) + ); + } + co_await ch_out->drain(ctx->executor()); +} + + [[maybe_unused]] rapidsmpf::streaming::Node write_parquet( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::string output_path + ) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in}; + co_await ctx->executor()->schedule(); + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + co_return; + } + auto chunk = + rapidsmpf::ndsh::to_device(ctx, msg.release()); + auto sink = cudf::io::sink_info(output_path); + auto builder = cudf::io::parquet_writer_options::builder(sink, chunk.table_view()); + auto metadata = cudf::io::table_input_metadata(chunk.table_view()); + metadata.column_metadata[0].set_name("l_orderkey"); + metadata.column_metadata[1].set_name("revenue"); + metadata.column_metadata[2].set_name("o_orderdate"); + metadata.column_metadata[3].set_name("o_shippriority"); + builder = builder.metadata(metadata); + auto options = builder.build(); + cudf::io::write_parquet(options, chunk.stream()); + ctx->comm()->logger().print( + "Wrote chunk with ", + chunk.table_view().num_rows(), + " rows and ", + chunk.table_view().num_columns(), + " columns to ", + output_path + ); + } + + [[maybe_unused]] rapidsmpf::streaming::Node consume( + [[maybe_unused]] std::shared_ptr ctx, + std::shared_ptr ch_in + ) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in}; + co_await ctx->executor()->schedule(); + while (true) { + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + break; + } + auto chunk = rapidsmpf::ndsh::to_device( + ctx, msg.release() + ); + ctx->comm()->logger().print( + "Consumed chunk with ", + chunk.table_view().num_rows(), + " rows and ", + chunk.table_view().num_columns(), + " columns" + ); + } + } + } // namespace + + struct ProgramOptions { + int num_streaming_threads = 1; + cudf::size_type num_rows_per_chunk = 100'000'000; + bool use_shuffle_join = false; + std::string output_file; + std::string input_directory; + }; + + ProgramOptions parse_options(int argc, char** argv) { + ProgramOptions options; + + auto print_usage = [&argv]() { + std::cerr + << "Usage: " << argv[0] << " [options]\n" + << "Options:\n" + << " --num-streaming-threads Number of streaming threads (default: 1)\n" + << " --num-rows-per-chunk Number of rows per chunk (default: " + "100000000)\n" + << " --use-shuffle-join Use shuffle join (default: false)\n" + << " --output-file Output file path (required)\n" + << " --input-directory Input directory path (required)\n" + << " --help Show this help message\n"; + }; + + static struct option long_options[] = { + {"num-streaming-threads", required_argument, nullptr, 1}, + {"num-rows-per-chunk", required_argument, nullptr, 2}, + {"use-shuffle-join", no_argument, nullptr, 3}, + {"output-file", required_argument, nullptr, 4}, + {"input-directory", required_argument, nullptr, 5}, + {"help", no_argument, nullptr, 6}, + {nullptr, 0, nullptr, 0} + }; + + int opt; + int option_index = 0; + + bool saw_output_file = false; + bool saw_input_directory = false; + + while ((opt = getopt_long(argc, argv, "", long_options, &option_index)) != -1) { + switch (opt) { + case 1: + options.num_streaming_threads = std::atoi(optarg); + break; + case 2: + options.num_rows_per_chunk = std::atoi(optarg); + break; + case 3: + options.use_shuffle_join = true; + break; + case 4: + options.output_file = optarg; + saw_output_file = true; + break; + case 5: + options.input_directory = optarg; + saw_input_directory = true; + break; + case 6: + print_usage(); + std::exit(0); + case '?': + if (optopt == 0 && optind > 1) { + std::cerr << "Error: Unknown option '" << argv[optind - 1] << "'\n\n"; + } + print_usage(); + std::exit(1); + default: + print_usage(); + std::exit(1); + } + } + + // Check if required options were provided + if (!saw_output_file || !saw_input_directory) { + if (!saw_output_file) { + std::cerr << "Error: --output-file is required\n"; + } + if (!saw_input_directory) { + std::cerr << "Error: --input-directory is required\n"; + } + std::cerr << std::endl; + print_usage(); + std::exit(1); + } + + return options; + } + + int main(int argc, char** argv) { + cudaFree(nullptr); + rapidsmpf::mpi::init(&argc, &argv); + MPI_Comm mpi_comm; + RAPIDSMPF_MPI(MPI_Comm_dup(MPI_COMM_WORLD, &mpi_comm)); + auto cmd_options = parse_options(argc, argv); + auto limit_size = rmm::percent_of_free_device_memory(80); + rmm::mr::cuda_async_memory_resource mr{}; + rmm::mr::cuda_memory_resource base{}; + // rmm::mr::pool_memory_resource mr{&base, limit_size}; + auto stats_mr = rapidsmpf::RmmResourceAdaptor(&mr); + rmm::device_async_resource_ref mr_ref(stats_mr); + rmm::mr::set_current_device_resource(&stats_mr); + rmm::mr::set_current_device_resource_ref(mr_ref); + std::unordered_map + memory_available{}; + memory_available[rapidsmpf::MemoryType::DEVICE] = + rapidsmpf::LimitAvailableMemory{&stats_mr, static_cast(limit_size)}; + rapidsmpf::BufferResource br(stats_mr); // , std::move(memory_available)); + auto envvars = rapidsmpf::config::get_environment_variables(); + envvars["num_streaming_threads"] = std::to_string(cmd_options.num_streaming_threads); + auto options = rapidsmpf::config::Options(envvars); + auto stats = std::make_shared(&stats_mr); + { + auto comm = rapidsmpf::ucxx::init_using_mpi(mpi_comm, options); + // auto comm = std::make_shared(mpi_comm, options); + auto progress = + std::make_shared(comm->logger(), stats); + auto ctx = + std::make_shared(options, comm, &br, stats); + comm->logger().print( + "Executor has ", ctx->executor()->thread_count(), " threads" + ); + comm->logger().print("Executor has ", ctx->comm()->nranks(), " ranks"); + + std::string output_path = cmd_options.output_file; + std::vector timings; + for (int i = 0; i < 2; i++) { + rapidsmpf::OpID op_id{0}; + std::vector nodes; + auto start = std::chrono::steady_clock::now(); + { + RAPIDSMPF_NVTX_SCOPED_RANGE("Constructing Q3 pipeline"); + + // Input data channels + auto customer = ctx->create_channel(); + auto lineitem = ctx->create_channel(); + auto orders = ctx->create_channel(); + + // filtered channels + auto filtered_customer = ctx->create_channel(); + auto filtered_orders = ctx->create_channel(); + auto filtered_lineitem = ctx->create_channel(); + + // join channels + auto customer_x_orders = ctx->create_channel(); + auto customer_x_orders_x_lineitem = ctx->create_channel(); + + // read and filter customer + // In: "c_mktsegment", "c_custkey" + // Out: "c_mktsegment", "c_custkey" + nodes.push_back(read_customer( + ctx, + customer, + /* num_tickets */ 2, + cmd_options.num_rows_per_chunk, + cmd_options.input_directory + )); + nodes.push_back(filter_customer(ctx, customer, filtered_customer)); + + // read and filter orders + // In: "o_orderkey", "o_orderdate", "o_shippriority", "o_custkey" + // Out: "o_orderkey", "o_orderdate", "o_shippriority", "o_custkey" + nodes.push_back(read_orders( + ctx, + orders, + /* num_tickets */ 4, + cmd_options.num_rows_per_chunk, + cmd_options.input_directory + )); + nodes.push_back(filter_orders(ctx, orders, filtered_orders)); + + // read and filter lineitem + // In: "l_orderkey", "l_shipdate", "l_extendedprice", "l_discount" + // Out: "l_orderkey", "l_shipdate", "l_extendedprice", "l_discount" + nodes.push_back(read_lineitem( + ctx, + lineitem, + /* num_tickets */ 4, + cmd_options.num_rows_per_chunk, + cmd_options.input_directory + )); + nodes.push_back(filter_lineitem(ctx, lineitem, filtered_lineitem)); + + // join customers and orders + nodes.push_back( + // Keys c_custkey x o_custkey + // Left: "c_mktsegment", "c_custkey" + // Right: "o_orderkey", "o_orderdate", "o_shippriority", "o_custkey" + rapidsmpf::ndsh::inner_join_broadcast( + ctx, + filtered_customer, + filtered_orders, + customer_x_orders, + {1}, + {3}, + rapidsmpf::OpID{static_cast(10 * i + op_id++)}, + rapidsmpf::ndsh::KeepKeys::YES + ) + ); + + // join customer_x_orders and lineitem + // Keys o_orderkey x l_orderkey + // Left: "c_mktsegment", "c_custkey", "o_orderkey", "o_orderdate", "o_shippriority", + // Right: "l_orderkey", "l_shipdate", "l_extendedprice", "l_discount" + nodes.push_back( + rapidsmpf::ndsh::inner_join_broadcast( + ctx, + customer_x_orders, + filtered_lineitem, + customer_x_orders_x_lineitem, + {2}, + {0}, + rapidsmpf::OpID{static_cast(10 * i + op_id++)}, + rapidsmpf::ndsh::KeepKeys::YES + ) + ); + + // with columns (and eliminating columns) + auto groupby_input = ctx->create_channel(); + + nodes.push_back(with_columns( + ctx, + customer_x_orders_x_lineitem, + groupby_input + )); + + // groupby aggregation (agg (per chunk) -> concat -> agg (global)) + auto chunkwise_groupby_output = ctx->create_channel(); + nodes.push_back(chunkwise_groupby_agg( + ctx, + groupby_input, + chunkwise_groupby_output + )); + auto concatenated_groupby_output = ctx->create_channel(); + nodes.push_back(rapidsmpf::ndsh::concatenate( + ctx, + chunkwise_groupby_output, + concatenated_groupby_output, + rapidsmpf::ndsh::ConcatOrder::DONT_CARE + )); + auto groupby_output = ctx->create_channel(); + nodes.push_back(final_groupby_agg( + ctx, + concatenated_groupby_output, + groupby_output, + rapidsmpf::OpID{static_cast(10 * i + op_id++)} + )); + + // select columns + auto select_output = ctx->create_channel(); + nodes.push_back(select_columns( + ctx, + groupby_output, + select_output + )); + + // sort by + auto sorted_output = ctx->create_channel(); + nodes.push_back(sort_by( + ctx, + select_output, + sorted_output + )); + + // head + auto head_output = ctx->create_channel(); + nodes.push_back(head( + ctx, + sorted_output, + head_output + )); + + // write parquet + nodes.push_back(write_parquet( + ctx, + head_output, + output_path + )); + } + auto end = std::chrono::steady_clock::now(); + std::chrono::duration pipeline = end - start; + start = std::chrono::steady_clock::now(); + { + RAPIDSMPF_NVTX_SCOPED_RANGE("Q3 Iteration"); + rapidsmpf::streaming::run_streaming_pipeline(std::move(nodes)); + } + end = std::chrono::steady_clock::now(); + std::chrono::duration compute = end - start; + comm->logger().print( + "Iteration ", i, " pipeline construction time [s]: ", pipeline.count() + ); + comm->logger().print("Iteration ", i, " compute time [s]: ", compute.count()); + timings.push_back(pipeline.count()); + timings.push_back(compute.count()); + ctx->comm()->logger().print(stats->report()); + RAPIDSMPF_MPI(MPI_Barrier(mpi_comm)); + } + if (comm->rank() == 0) { + for (int i = 0; i < 2; i++) { + comm->logger().print( + "Iteration ", + i, + " pipeline construction time [s]: ", + timings[size_t(2 * i)] + ); + comm->logger().print( + "Iteration ", i, " compute time [s]: ", timings[size_t(2 * i + 1)] + ); + } + } + } + + RAPIDSMPF_MPI(MPI_Comm_free(&mpi_comm)); + RAPIDSMPF_MPI(MPI_Finalize()); + return 0; + } + \ No newline at end of file