From 3dcb2cd6acb0c035f3010de0911c0b133bfe4c5d Mon Sep 17 00:00:00 2001 From: Nick Becker Date: Sun, 9 Nov 2025 10:42:00 -0800 Subject: [PATCH 1/7] WIP q3 cpp --- cpp/benchmarks/streaming/ndsh/q03.cpp | 1079 +++++++++++++++++++++++++ 1 file changed, 1079 insertions(+) create mode 100644 cpp/benchmarks/streaming/ndsh/q03.cpp diff --git a/cpp/benchmarks/streaming/ndsh/q03.cpp b/cpp/benchmarks/streaming/ndsh/q03.cpp new file mode 100644 index 000000000..93dff5a24 --- /dev/null +++ b/cpp/benchmarks/streaming/ndsh/q03.cpp @@ -0,0 +1,1079 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-License-Identifier: Apache-2.0 + */ + + #include + #include + #include + #include + #include + #include + #include + + #include + #include + #include + + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include +#include + #include + #include + #include + #include + + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + + #include "concatenate.hpp" + #include "join.hpp" + #include "utilities.hpp" + + namespace { + + std::string get_table_path( + std::string const& input_directory, std::string const& table_name + ) { + auto dir = input_directory.empty() ? "." : input_directory; + auto file_path = dir + "/" + table_name + ".parquet"; + + if (std::filesystem::exists(file_path)) { + return file_path; + } + + return dir + "/" + table_name + "/"; + } + + rapidsmpf::streaming::Node read_customer( + std::shared_ptr ctx, + std::shared_ptr ch_out, + std::size_t num_producers, + cudf::size_type num_rows_per_chunk, + std::string const& input_directory +) { + auto files = rapidsmpf::ndsh::detail::list_parquet_files( + get_table_path(input_directory, "customer") + ); + auto options = cudf::io::parquet_reader_options::builder(cudf::io::source_info(files)) + .columns({"c_mktsegment", "c_custkey"}) + .build(); + return rapidsmpf::streaming::node::read_parquet( + ctx, ch_out, num_producers, options, num_rows_per_chunk + ); +} + + rapidsmpf::streaming::Node read_lineitem( + std::shared_ptr ctx, + std::shared_ptr ch_out, + std::size_t num_producers, + cudf::size_type num_rows_per_chunk, + std::string const& input_directory + ) { + auto files = rapidsmpf::ndsh::detail::list_parquet_files( + get_table_path(input_directory, "lineitem") + ); + auto options = cudf::io::parquet_reader_options::builder(cudf::io::source_info(files)) + .columns( + {"l_discount", + "l_extendedprice", + "l_orderkey", + "l_partkey", + "l_quantity", + "l_suppkey"} + ) + .build(); + return rapidsmpf::streaming::node::read_parquet( + ctx, ch_out, num_producers, options, num_rows_per_chunk + ); + } + + + rapidsmpf::streaming::Node read_orders( + std::shared_ptr ctx, + std::shared_ptr ch_out, + std::size_t num_producers, + cudf::size_type num_rows_per_chunk, + std::string const& input_directory + ) { + auto files = rapidsmpf::ndsh::detail::list_parquet_files( + get_table_path(input_directory, "orders") + ); + auto options = cudf::io::parquet_reader_options::builder(cudf::io::source_info(files)) + .columns({"o_orderdate", "o_orderkey"}) + .build(); + return rapidsmpf::streaming::node::read_parquet( + ctx, ch_out, num_producers, options, num_rows_per_chunk + ); + } + +// customer.filter(pl.col("c_mktsegment") == var1) ## var1 = "BUILDING" + rapidsmpf::streaming::Node filter_customer( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out + ) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + auto mr = ctx->br()->device_mr(); + co_await ctx->executor()->schedule(); + while (true) { + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + break; + } + auto chunk = rapidsmpf::ndsh::to_device( + ctx, msg.release() + ); + auto chunk_stream = chunk.stream(); + auto table = chunk.table_view(); + auto p_name = table.column(0); + auto target = cudf::make_string_scalar("green", chunk_stream, mr); + auto mask = cudf::strings::contains( + p_name, *static_cast(target.get()), chunk_stream, mr + ); + co_await ch_out->send( + rapidsmpf::streaming::to_message( + msg.sequence_number(), + std::make_unique( + cudf::apply_boolean_mask( + table.select({1}), mask->view(), chunk_stream, mr + ), + chunk_stream + ) + ) + ); + } + co_await ch_out->drain(ctx->executor()); + } + + +// # .filter(pl.col("o_orderdate") < var2) ## var2 = date(1995, 3, 15) + rapidsmpf::streaming::Node filter_orders( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + auto mr = ctx->br()->device_mr(); + co_await ctx->executor()->schedule(); + while (true) { + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + break; + } + auto chunk = rapidsmpf::ndsh::to_device( + ctx, msg.release() + ); + auto chunk_stream = chunk.stream(); + auto table = chunk.table_view(); + auto p_name = table.column(0); + + + std::tm timeinfo = {}; + timeinfo.tm_year = 1993 - 1900; // years since 1900 + timeinfo.tm_mon = 3 - 1; // months since January + timeinfo.tm_mday = 15; + time_t epoch_secs = std::mktime(&timeinfo); + int64_t epoch_ms = static_cast(epoch_secs) * 1000; + auto var2 = cudf::make_fixed_width_scalar( + epoch_ms, + chunk_stream, + mr + ); + + auto mask = cudf::binary_operation( + table.column(1), // o_orderdate is col 1 + *var2.get(), + cudf::binary_operator::LESS, + cudf::data_type(cudf::type_id::BOOL8), + chunk_stream, + mr + ); + co_await ch_out->send( + rapidsmpf::streaming::to_message( + msg.sequence_number(), + std::make_unique( + cudf::apply_boolean_mask( + table.select({1}), mask->view(), chunk_stream, mr + ), + chunk_stream + ) + ) + ); + } + co_await ch_out->drain(ctx->executor()); +} + + +// .filter(pl.col("l_shipdate") > var2) ## var2 = date(1995, 3, 15) +// # .filter(pl.col("o_orderdate") < var2) ## var2 = date(1995, 3, 15) +rapidsmpf::streaming::Node filter_lineitem( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + auto mr = ctx->br()->device_mr(); + co_await ctx->executor()->schedule(); + while (true) { + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + break; + } + auto chunk = rapidsmpf::ndsh::to_device( + ctx, msg.release() + ); + auto chunk_stream = chunk.stream(); + auto table = chunk.table_view(); + auto p_name = table.column(0); + + + std::tm timeinfo = {}; + timeinfo.tm_year = 1993 - 1900; // years since 1900 + timeinfo.tm_mon = 3 - 1; // months since January + timeinfo.tm_mday = 15; + time_t epoch_secs = std::mktime(&timeinfo); + int64_t epoch_ms = static_cast(epoch_secs) * 1000; + auto var2 = cudf::make_fixed_width_scalar( + epoch_ms, + chunk_stream, + mr + ); + + auto mask = cudf::binary_operation( + table.column(1), // l_shipdate is col 1 + *var2.get(), + cudf::binary_operator::GREATER, + cudf::data_type(cudf::type_id::BOOL8), + chunk_stream, + mr + ); + co_await ch_out->send( + rapidsmpf::streaming::to_message( + msg.sequence_number(), + std::make_unique( + cudf::apply_boolean_mask( + // no longer need l_shipdate + table.select({0, 2, 3}), mask->view(), chunk_stream, mr + ), + chunk_stream + ) + ) + ); + } + co_await ch_out->drain(ctx->executor()); +} + + + [[maybe_unused]] rapidsmpf::streaming::Node with_columns( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out + ) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + + // customer_x_orders_x_lineitem is the input to the with_column op + // "c_custkey", # 0 (customers<-orders on o_custkey) + // "o_orderkey", # 1 (orders<-lineitem on o_orderkey) + // "o_orderdate", # 2 + // "o_shippriority", # 3 + // "l_shipdate", # 4 + // "l_extendedprice", # 5 + // "l_discount", # 6 + co_await ctx->executor()->schedule(); + while (true) { + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + break; + } + auto chunk = rapidsmpf::ndsh::to_device( + ctx, msg.release() + ); + auto chunk_stream = chunk.stream(); + auto sequence_number = msg.sequence_number(); + auto table = chunk.table_view(); + std::vector> result; + result.reserve(4); + + // o_orderkey + result.push_back( + std::make_unique( + table.column(1), chunk_stream, ctx->br()->device_mr() + ) + ); + // o_orderdate + result.push_back( + std::make_unique( + table.column(2), chunk_stream, ctx->br()->device_mr() + ) + ); + // o_shippriority + result.push_back( + std::make_unique( + table.column(3), chunk_stream, ctx->br()->device_mr() + ) + ); + auto extendedprice = table.column(5); + auto discount = table.column(6); + + std::string udf = + R"***( + static __device__ void calculate_amount(double *amount, double discount, double extprice) { + *amount = extprice * (1 - discount); + } + )***"; + result.push_back( + cudf::transform( + {discount, extendedprice}, + udf, + cudf::data_type(cudf::type_id::FLOAT64), + false, + std::nullopt, + cudf::null_aware::NO, + chunk_stream, + ctx->br()->device_mr() + ) + ); + co_await ch_out->send( + rapidsmpf::streaming::to_message( + sequence_number, + std::make_unique( + std::make_unique(std::move(result)), chunk_stream + ) + ) + ); + } + co_await ch_out->drain(ctx->executor()); + } + +// change the order of columns from o_orderkey, o_orderdate, o_shippriority, revenue +// to o_orderkey, revenue, o_orderdate, o_shippriority + [[maybe_unused]] rapidsmpf::streaming::Node select_columns( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + + co_await ctx->executor()->schedule(); + while (true) { + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + break; + } + auto chunk = rapidsmpf::ndsh::to_device( + ctx, msg.release() + ); + auto chunk_stream = chunk.stream(); + auto sequence_number = msg.sequence_number(); + auto table = chunk.table_view(); + std::vector> result; + result.reserve(4); + + // o_orderkey + result.push_back( + std::make_unique( + table.column(0), chunk_stream, ctx->br()->device_mr() + ) + ); + // revenue + result.push_back( + std::make_unique( + table.column(3), chunk_stream, ctx->br()->device_mr() + ) + ); + // o_orderdate + result.push_back( + std::make_unique( + table.column(1), chunk_stream, ctx->br()->device_mr() + ) + ); + // o_shippriority + result.push_back( + std::make_unique( + table.column(2), chunk_stream, ctx->br()->device_mr() + ) + ); + co_await ch_out->send( + rapidsmpf::streaming::to_message( + sequence_number, + std::make_unique( + std::make_unique(std::move(result)), chunk_stream + ) + ) + ); + } + co_await ch_out->drain(ctx->executor()); +} + + + [[maybe_unused]] rapidsmpf::streaming::Node chunkwise_groupby_agg( + [[maybe_unused]] std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out + ) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + std::vector partial_results; + std::uint64_t sequence = 0; + co_await ctx->executor()->schedule(); + while (true) { + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + break; + } + ctx->comm()->logger().print("Chunkwise groupby"); + auto chunk = rapidsmpf::ndsh::to_device( + ctx, msg.release() + ); + auto chunk_stream = chunk.stream(); + auto table = chunk.table_view(); + auto grouper = cudf::groupby::groupby( + table.select({0, 1}), cudf::null_policy::EXCLUDE, cudf::sorted::NO + ); + auto requests = std::vector(); + std::vector> aggs; + aggs.push_back(cudf::make_sum_aggregation()); + requests.push_back( + cudf::groupby::aggregation_request(table.column(2), std::move(aggs)) + ); + auto [keys, results] = + grouper.aggregate(requests, chunk_stream, ctx->br()->device_mr()); + // Drop chunk, we don't need it. + std::ignore = std::move(chunk); + auto result = keys->release(); + for (auto&& r : results) { + std::ranges::move(r.results, std::back_inserter(result)); + } + co_await ch_out->send( + rapidsmpf::streaming::to_message( + sequence++, + std::make_unique( + std::make_unique(std::move(result)), chunk_stream + ) + ) + ); + } + co_await ch_out->drain(ctx->executor()); + } + + [[maybe_unused]] rapidsmpf::streaming::Node final_groupby_agg( + [[maybe_unused]] std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out, + rapidsmpf::OpID tag + ) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + co_await ctx->executor()->schedule(); + // TODO: requires concatenated input stream. + auto msg = co_await ch_in->receive(); + auto next = co_await ch_in->receive(); + ctx->comm()->logger().print("Final groupby"); + RAPIDSMPF_EXPECTS(next.empty(), "Expecting concatenated input at this point"); + auto chunk = + rapidsmpf::ndsh::to_device(ctx, msg.release()); + auto chunk_stream = chunk.stream(); + auto table = chunk.table_view(); + std::unique_ptr local_result{nullptr}; + if (!table.is_empty()) { + auto grouper = cudf::groupby::groupby( + table.select({0, 1}), cudf::null_policy::EXCLUDE, cudf::sorted::NO + ); + auto requests = std::vector(); + std::vector> aggs; + aggs.push_back(cudf::make_sum_aggregation()); + requests.push_back( + cudf::groupby::aggregation_request(table.column(2), std::move(aggs)) + ); + auto [keys, results] = + grouper.aggregate(requests, chunk_stream, ctx->br()->device_mr()); + // Drop chunk, we don't need it. + std::ignore = std::move(chunk); + auto result = keys->release(); + for (auto&& r : results) { + std::ranges::move(r.results, std::back_inserter(result)); + } + local_result = std::make_unique(std::move(result)); + } + if (ctx->comm()->nranks() > 1) { + // Reduce across ranks... + // Need a reduce primitive in rapidsmpf, but let's just use an allgather and + // discard for now. + rapidsmpf::streaming::AllGather gatherer{ctx, tag}; + if (local_result) { + auto pack = + cudf::pack(local_result->view(), chunk_stream, ctx->br()->device_mr()); + gatherer.insert( + 0, + {rapidsmpf::PackedData( + std::move(pack.metadata), + ctx->br()->move(std::move(pack.gpu_data), chunk_stream) + )} + ); + } + gatherer.insert_finished(); + auto packed_data = + co_await gatherer.extract_all(rapidsmpf::streaming::AllGather::Ordered::NO); + if (ctx->comm()->rank() == 0) { + std::vector chunks; + chunks.reserve(packed_data.size()); + std::ranges::transform( + packed_data, std::back_inserter(chunks), [](auto& chunk) { + return std::move(chunk.data); + } + ); + auto global_result = rapidsmpf::unpack_and_concat( + rapidsmpf::unspill_partitions( + std::move(chunks), ctx->br(), true, ctx->statistics() + ), + chunk_stream, + ctx->br(), + ctx->statistics() + ); + if (ctx->comm()->rank() == 0) { + // We will only actually bother to do this on rank zero. + auto result_view = global_result->view(); + auto grouper = cudf::groupby::groupby( + result_view.select({0, 1}), + cudf::null_policy::EXCLUDE, + cudf::sorted::NO + ); + auto requests = std::vector(); + std::vector> aggs; + aggs.push_back(cudf::make_sum_aggregation()); + requests.push_back( + cudf::groupby::aggregation_request( + result_view.column(2), std::move(aggs) + ) + ); + auto [keys, results] = + grouper.aggregate(requests, chunk_stream, ctx->br()->device_mr()); + global_result.reset(); + auto result = keys->release(); + for (auto&& r : results) { + std::ranges::move(r.results, std::back_inserter(result)); + } + co_await ch_out->send( + rapidsmpf::streaming::to_message( + 0, + std::make_unique( + std::make_unique(std::move(result)), chunk_stream + ) + ) + ); + } + } else { + std::ignore = std::move(packed_data); + } + } else { + co_await ch_out->send( + rapidsmpf::streaming::to_message( + 0, + std::make_unique( + std::move(local_result), chunk_stream + ) + ) + ); + } + co_await ch_out->drain(ctx->executor()); + } + + [[maybe_unused]] rapidsmpf::streaming::Node sort_by( + [[maybe_unused]] std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out + ) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + co_await ctx->executor()->schedule(); + auto msg = co_await ch_in->receive(); + // We know we only have a single chunk from the groupby + if (msg.empty()) { + co_return; + } + auto chunk = + rapidsmpf::ndsh::to_device(ctx, msg.release()); + auto table = chunk.table_view(); + auto rounded = cudf::round( + table.column(2), + 2, + cudf::rounding_method::HALF_EVEN, + chunk.stream(), + ctx->br()->device_mr() + ); + co_await ch_out->send( + rapidsmpf::streaming::to_message( + 0, + std::make_unique( + cudf::sort_by_key( + cudf::table_view({table.column(0), table.column(1), rounded->view()}), + table.select({0, 1}), + {cudf::order::ASCENDING, cudf::order::DESCENDING}, + {cudf::null_order::BEFORE, cudf::null_order::BEFORE}, + chunk.stream(), + ctx->br()->device_mr() + ), + chunk.stream() + ) + ) + ); + co_await ch_out->drain(ctx->executor()); + } + + [[maybe_unused]] rapidsmpf::streaming::Node write_parquet( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::string output_path + ) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in}; + co_await ctx->executor()->schedule(); + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + co_return; + } + auto chunk = + rapidsmpf::ndsh::to_device(ctx, msg.release()); + auto sink = cudf::io::sink_info(output_path); + auto builder = cudf::io::parquet_writer_options::builder(sink, chunk.table_view()); + auto metadata = cudf::io::table_input_metadata(chunk.table_view()); + metadata.column_metadata[0].set_name("nation"); + metadata.column_metadata[1].set_name("o_year"); + metadata.column_metadata[2].set_name("sum_profit"); + builder = builder.metadata(metadata); + auto options = builder.build(); + cudf::io::write_parquet(options, chunk.stream()); + ctx->comm()->logger().print( + "Wrote chunk with ", + chunk.table_view().num_rows(), + " rows and ", + chunk.table_view().num_columns(), + " columns to ", + output_path + ); + } + + [[maybe_unused]] rapidsmpf::streaming::Node consume( + [[maybe_unused]] std::shared_ptr ctx, + std::shared_ptr ch_in + ) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in}; + co_await ctx->executor()->schedule(); + while (true) { + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + break; + } + auto chunk = rapidsmpf::ndsh::to_device( + ctx, msg.release() + ); + ctx->comm()->logger().print( + "Consumed chunk with ", + chunk.table_view().num_rows(), + " rows and ", + chunk.table_view().num_columns(), + " columns" + ); + } + } + } // namespace + + struct ProgramOptions { + int num_streaming_threads = 1; + cudf::size_type num_rows_per_chunk = 100'000'000; + bool use_shuffle_join = false; + std::string output_file; + std::string input_directory; + }; + + ProgramOptions parse_options(int argc, char** argv) { + ProgramOptions options; + + auto print_usage = [&argv]() { + std::cerr + << "Usage: " << argv[0] << " [options]\n" + << "Options:\n" + << " --num-streaming-threads Number of streaming threads (default: 1)\n" + << " --num-rows-per-chunk Number of rows per chunk (default: " + "100000000)\n" + << " --use-shuffle-join Use shuffle join (default: false)\n" + << " --output-file Output file path (required)\n" + << " --input-directory Input directory path (required)\n" + << " --help Show this help message\n"; + }; + + static struct option long_options[] = { + {"num-streaming-threads", required_argument, nullptr, 1}, + {"num-rows-per-chunk", required_argument, nullptr, 2}, + {"use-shuffle-join", no_argument, nullptr, 3}, + {"output-file", required_argument, nullptr, 4}, + {"input-directory", required_argument, nullptr, 5}, + {"help", no_argument, nullptr, 6}, + {nullptr, 0, nullptr, 0} + }; + + int opt; + int option_index = 0; + + bool saw_output_file = false; + bool saw_input_directory = false; + + while ((opt = getopt_long(argc, argv, "", long_options, &option_index)) != -1) { + switch (opt) { + case 1: + options.num_streaming_threads = std::atoi(optarg); + break; + case 2: + options.num_rows_per_chunk = std::atoi(optarg); + break; + case 3: + options.use_shuffle_join = true; + break; + case 4: + options.output_file = optarg; + saw_output_file = true; + break; + case 5: + options.input_directory = optarg; + saw_input_directory = true; + break; + case 6: + print_usage(); + std::exit(0); + case '?': + if (optopt == 0 && optind > 1) { + std::cerr << "Error: Unknown option '" << argv[optind - 1] << "'\n\n"; + } + print_usage(); + std::exit(1); + default: + print_usage(); + std::exit(1); + } + } + + // Check if required options were provided + if (!saw_output_file || !saw_input_directory) { + if (!saw_output_file) { + std::cerr << "Error: --output-file is required\n"; + } + if (!saw_input_directory) { + std::cerr << "Error: --input-directory is required\n"; + } + std::cerr << std::endl; + print_usage(); + std::exit(1); + } + + return options; + } + + int main(int argc, char** argv) { + cudaFree(nullptr); + rapidsmpf::mpi::init(&argc, &argv); + MPI_Comm mpi_comm; + RAPIDSMPF_MPI(MPI_Comm_dup(MPI_COMM_WORLD, &mpi_comm)); + auto cmd_options = parse_options(argc, argv); + auto limit_size = rmm::percent_of_free_device_memory(80); + rmm::mr::cuda_async_memory_resource mr{}; + rmm::mr::cuda_memory_resource base{}; + // rmm::mr::pool_memory_resource mr{&base, limit_size}; + auto stats_mr = rapidsmpf::RmmResourceAdaptor(&mr); + rmm::device_async_resource_ref mr_ref(stats_mr); + rmm::mr::set_current_device_resource(&stats_mr); + rmm::mr::set_current_device_resource_ref(mr_ref); + std::unordered_map + memory_available{}; + memory_available[rapidsmpf::MemoryType::DEVICE] = + rapidsmpf::LimitAvailableMemory{&stats_mr, static_cast(limit_size)}; + rapidsmpf::BufferResource br(stats_mr); // , std::move(memory_available)); + auto envvars = rapidsmpf::config::get_environment_variables(); + envvars["num_streaming_threads"] = std::to_string(cmd_options.num_streaming_threads); + auto options = rapidsmpf::config::Options(envvars); + auto stats = std::make_shared(&stats_mr); + { + auto comm = rapidsmpf::ucxx::init_using_mpi(mpi_comm, options); + // auto comm = std::make_shared(mpi_comm, options); + auto progress = + std::make_shared(comm->logger(), stats); + auto ctx = + std::make_shared(options, comm, &br, stats); + comm->logger().print( + "Executor has ", ctx->executor()->thread_count(), " threads" + ); + comm->logger().print("Executor has ", ctx->comm()->nranks(), " ranks"); + + std::string output_path = cmd_options.output_file; + std::vector timings; + for (int i = 0; i < 2; i++) { + rapidsmpf::OpID op_id{0}; + std::vector nodes; + auto start = std::chrono::steady_clock::now(); + { + RAPIDSMPF_NVTX_SCOPED_RANGE("Constructing Q9 pipeline"); + auto part = ctx->create_channel(); + auto filtered_part = ctx->create_channel(); + auto partsupp = ctx->create_channel(); + auto part_x_partsupp = ctx->create_channel(); + auto supplier = ctx->create_channel(); + auto lineitem = ctx->create_channel(); + auto supplier_x_part_x_partsupp = ctx->create_channel(); + auto supplier_x_part_x_partsupp_x_lineitem = ctx->create_channel(); + nodes.push_back(read_part( + ctx, + part, + /* num_tickets */ 2, + cmd_options.num_rows_per_chunk, + cmd_options.input_directory + )); // p_partkey, p_name + nodes.push_back(filter_part(ctx, part, filtered_part)); // p_partkey + nodes.push_back(read_partsupp( + ctx, + partsupp, + /* num_tickets */ 4, + cmd_options.num_rows_per_chunk, + cmd_options.input_directory + )); // ps_partkey, ps_suppkey, ps_supplycost + nodes.push_back( + // p_partkey x ps_partkey + rapidsmpf::ndsh::inner_join_broadcast( + ctx, + filtered_part, + partsupp, + part_x_partsupp, + {0}, + {0}, + rapidsmpf::OpID{static_cast(10 * i + op_id++)} + ) // p_partkey/ps_partkey, ps_suppkey, ps_supplycost + ); + nodes.push_back(read_supplier( + ctx, + supplier, + /* num_tickets */ 2, + cmd_options.num_rows_per_chunk, + cmd_options.input_directory + )); // s_nationkey, s_suppkey + nodes.push_back( + // s_suppkey x ps_suppkey + rapidsmpf::ndsh::inner_join_broadcast( + ctx, + supplier, + part_x_partsupp, + supplier_x_part_x_partsupp, + {1}, + {1}, + rapidsmpf::OpID{static_cast(10 * i + op_id++)} + + ) // s_nationkey, s_suppkey/ps_suppkey, p_partkey/ps_partkey, + // ps_supplycost + ); + nodes.push_back(read_lineitem( + ctx, + lineitem, + /* num_tickets */ 4, + cmd_options.num_rows_per_chunk, + cmd_options.input_directory + )); // l_discount, l_extendedprice, l_orderkey, l_partkey, l_quantity, + // l_suppkey + nodes.push_back( + // [p_partkey, ps_suppkey] x [l_partkey, l_suppkey] + rapidsmpf::ndsh::inner_join_broadcast( + ctx, + supplier_x_part_x_partsupp, + lineitem, + supplier_x_part_x_partsupp_x_lineitem, + {2, 1}, + {3, 5}, + rapidsmpf::OpID{static_cast(10 * i + op_id++)}, + rapidsmpf::ndsh::KeepKeys::NO + ) // s_nationkey, ps_supplycost, + // l_discount, l_extendedprice, l_orderkey, l_quantity + ); + auto nation = ctx->create_channel(); + auto orders = ctx->create_channel(); + nodes.push_back( + read_nation( + ctx, + nation, + /* num_tickets */ 1, + cmd_options.num_rows_per_chunk, + cmd_options.input_directory + ) // n_name, n_nationkey + ); + nodes.push_back( + read_orders( + ctx, + orders, + /* num_tickets */ 4, + cmd_options.num_rows_per_chunk, + cmd_options.input_directory + ) // o_orderdate, o_orderkey + ); + auto all_joined = ctx->create_channel(); + auto supplier_x_part_x_partsupp_x_lineitem_x_orders = + ctx->create_channel(); + if (cmd_options.use_shuffle_join) { + auto supplier_x_part_x_partsupp_x_lineitem_shuffled = + ctx->create_channel(); + auto orders_shuffled = ctx->create_channel(); + std::uint32_t num_partitions = 8; + nodes.push_back( + rapidsmpf::ndsh::shuffle( + ctx, + supplier_x_part_x_partsupp_x_lineitem, + supplier_x_part_x_partsupp_x_lineitem_shuffled, + {4}, + num_partitions, + rapidsmpf::OpID{ + static_cast(10 * i + op_id++) + } + ) + ); + nodes.push_back( + rapidsmpf::ndsh::shuffle( + ctx, + orders, + orders_shuffled, + {1}, + num_partitions, + rapidsmpf::OpID{ + static_cast(10 * i + op_id++) + } + ) + ); + nodes.push_back( + // l_orderkey x o_orderkey + rapidsmpf::ndsh::inner_join_shuffle( + ctx, + supplier_x_part_x_partsupp_x_lineitem_shuffled, + orders_shuffled, + supplier_x_part_x_partsupp_x_lineitem_x_orders, + {4}, + {1}, + rapidsmpf::ndsh::KeepKeys::NO + ) // s_nationkey, ps_supplycost, l_discount, l_extendedprice, + // l_quantity, o_orderdate + ); + } else { + nodes.push_back( + // l_orderkey x o_orderkey + rapidsmpf::ndsh::inner_join_broadcast( + ctx, + supplier_x_part_x_partsupp_x_lineitem, + orders, + supplier_x_part_x_partsupp_x_lineitem_x_orders, + {4}, + {1}, + rapidsmpf::OpID{ + static_cast(10 * i + op_id++) + }, + rapidsmpf::ndsh::KeepKeys::NO + ) // s_nationkey, ps_supplycost, l_discount, l_extendedprice, + // l_quantity, o_orderdate + ); + } + nodes.push_back( + // n_nationkey x s_nationkey + rapidsmpf::ndsh::inner_join_broadcast( + ctx, + nation, + supplier_x_part_x_partsupp_x_lineitem_x_orders, + all_joined, + {1}, + {0}, + rapidsmpf::OpID{static_cast(10 * i + op_id++)}, + rapidsmpf::ndsh::KeepKeys::NO + ) // n_name, ps_supplycost, l_discount, l_extendedprice, + // l_quantity, o_orderdate + ); + auto groupby_input = ctx->create_channel(); + nodes.push_back(select_columns(ctx, all_joined, groupby_input)); + auto chunkwise_groupby_output = ctx->create_channel(); + nodes.push_back( + chunkwise_groupby_agg(ctx, groupby_input, chunkwise_groupby_output) + ); + auto concatenated_groupby_output = ctx->create_channel(); + nodes.push_back( + rapidsmpf::ndsh::concatenate( + ctx, + chunkwise_groupby_output, + concatenated_groupby_output, + rapidsmpf::ndsh::ConcatOrder::DONT_CARE + ) + ); + auto groupby_output = ctx->create_channel(); + nodes.push_back(final_groupby_agg( + ctx, + concatenated_groupby_output, + groupby_output, + rapidsmpf::OpID{static_cast(10 * i + op_id++)} + )); + auto sorted_output = ctx->create_channel(); + nodes.push_back(sort_by(ctx, groupby_output, sorted_output)); + nodes.push_back(write_parquet(ctx, sorted_output, output_path)); + } + auto end = std::chrono::steady_clock::now(); + std::chrono::duration pipeline = end - start; + start = std::chrono::steady_clock::now(); + { + RAPIDSMPF_NVTX_SCOPED_RANGE("Q9 Iteration"); + rapidsmpf::streaming::run_streaming_pipeline(std::move(nodes)); + } + end = std::chrono::steady_clock::now(); + std::chrono::duration compute = end - start; + comm->logger().print( + "Iteration ", i, " pipeline construction time [s]: ", pipeline.count() + ); + comm->logger().print("Iteration ", i, " compute time [s]: ", compute.count()); + timings.push_back(pipeline.count()); + timings.push_back(compute.count()); + ctx->comm()->logger().print(stats->report()); + RAPIDSMPF_MPI(MPI_Barrier(mpi_comm)); + } + if (comm->rank() == 0) { + for (int i = 0; i < 2; i++) { + comm->logger().print( + "Iteration ", + i, + " pipeline construction time [s]: ", + timings[size_t(2 * i)] + ); + comm->logger().print( + "Iteration ", i, " compute time [s]: ", timings[size_t(2 * i + 1)] + ); + } + } + } + + RAPIDSMPF_MPI(MPI_Comm_free(&mpi_comm)); + RAPIDSMPF_MPI(MPI_Finalize()); + return 0; + } + \ No newline at end of file From 899a5626a7a424d848bc3de30dfd1222471cd21d Mon Sep 17 00:00:00 2001 From: Nick Becker Date: Sun, 9 Nov 2025 18:32:22 -0800 Subject: [PATCH 2/7] more wip, mostly working components --- cpp/benchmarks/streaming/ndsh/q03.cpp | 72 ++++++++++++++++++++------- 1 file changed, 53 insertions(+), 19 deletions(-) diff --git a/cpp/benchmarks/streaming/ndsh/q03.cpp b/cpp/benchmarks/streaming/ndsh/q03.cpp index 93dff5a24..a1f379090 100644 --- a/cpp/benchmarks/streaming/ndsh/q03.cpp +++ b/cpp/benchmarks/streaming/ndsh/q03.cpp @@ -17,6 +17,7 @@ #include #include + #include #include #include #include @@ -458,13 +459,14 @@ rapidsmpf::streaming::Node filter_lineitem( auto chunk_stream = chunk.stream(); auto table = chunk.table_view(); auto grouper = cudf::groupby::groupby( - table.select({0, 1}), cudf::null_policy::EXCLUDE, cudf::sorted::NO + // grup by [o_orderkey, o_orderdate, o_shippriority] + table.select({0, 1, 2}), cudf::null_policy::EXCLUDE, cudf::sorted::NO ); auto requests = std::vector(); std::vector> aggs; aggs.push_back(cudf::make_sum_aggregation()); requests.push_back( - cudf::groupby::aggregation_request(table.column(2), std::move(aggs)) + cudf::groupby::aggregation_request(table.column(3), std::move(aggs)) ); auto [keys, results] = grouper.aggregate(requests, chunk_stream, ctx->br()->device_mr()); @@ -506,13 +508,13 @@ rapidsmpf::streaming::Node filter_lineitem( std::unique_ptr local_result{nullptr}; if (!table.is_empty()) { auto grouper = cudf::groupby::groupby( - table.select({0, 1}), cudf::null_policy::EXCLUDE, cudf::sorted::NO + table.select({0, 1, 2}), cudf::null_policy::EXCLUDE, cudf::sorted::NO ); auto requests = std::vector(); std::vector> aggs; aggs.push_back(cudf::make_sum_aggregation()); requests.push_back( - cudf::groupby::aggregation_request(table.column(2), std::move(aggs)) + cudf::groupby::aggregation_request(table.column(3), std::move(aggs)) ); auto [keys, results] = grouper.aggregate(requests, chunk_stream, ctx->br()->device_mr()); @@ -563,7 +565,7 @@ rapidsmpf::streaming::Node filter_lineitem( // We will only actually bother to do this on rank zero. auto result_view = global_result->view(); auto grouper = cudf::groupby::groupby( - result_view.select({0, 1}), + result_view.select({0, 1, 2}), cudf::null_policy::EXCLUDE, cudf::sorted::NO ); @@ -572,7 +574,7 @@ rapidsmpf::streaming::Node filter_lineitem( aggs.push_back(cudf::make_sum_aggregation()); requests.push_back( cudf::groupby::aggregation_request( - result_view.column(2), std::move(aggs) + result_view.column(3), std::move(aggs) ) ); auto [keys, results] = @@ -622,21 +624,14 @@ rapidsmpf::streaming::Node filter_lineitem( auto chunk = rapidsmpf::ndsh::to_device(ctx, msg.release()); auto table = chunk.table_view(); - auto rounded = cudf::round( - table.column(2), - 2, - cudf::rounding_method::HALF_EVEN, - chunk.stream(), - ctx->br()->device_mr() - ); co_await ch_out->send( rapidsmpf::streaming::to_message( 0, std::make_unique( cudf::sort_by_key( - cudf::table_view({table.column(0), table.column(1), rounded->view()}), - table.select({0, 1}), - {cudf::order::ASCENDING, cudf::order::DESCENDING}, + table, + table.select({1, 2}), + {cudf::order::DESCENDING, cudf::order::ASCENDING}, {cudf::null_order::BEFORE, cudf::null_order::BEFORE}, chunk.stream(), ctx->br()->device_mr() @@ -647,6 +642,44 @@ rapidsmpf::streaming::Node filter_lineitem( ); co_await ch_out->drain(ctx->executor()); } + + + + +// take first 10 rows +[[maybe_unused]] rapidsmpf::streaming::Node head( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + + co_await ctx->executor()->schedule(); + while (true) { + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + break; + } + auto chunk = rapidsmpf::ndsh::to_device( + ctx, msg.release() + ); + auto chunk_stream = chunk.stream(); + auto sequence_number = msg.sequence_number(); + auto table = chunk.table_view(); + std::vector head_indices{0, 10}; + auto sliced_table = cudf::slice(table, head_indices); + + co_await ch_out->send( + rapidsmpf::streaming::to_message( + sequence_number, + std::make_unique( + std::make_unique(std::move(sliced_table)), chunk_stream + ) + ) + ); + } + co_await ch_out->drain(ctx->executor()); +} [[maybe_unused]] rapidsmpf::streaming::Node write_parquet( std::shared_ptr ctx, @@ -664,9 +697,10 @@ rapidsmpf::streaming::Node filter_lineitem( auto sink = cudf::io::sink_info(output_path); auto builder = cudf::io::parquet_writer_options::builder(sink, chunk.table_view()); auto metadata = cudf::io::table_input_metadata(chunk.table_view()); - metadata.column_metadata[0].set_name("nation"); - metadata.column_metadata[1].set_name("o_year"); - metadata.column_metadata[2].set_name("sum_profit"); + metadata.column_metadata[0].set_name("l_orderkey"); + metadata.column_metadata[1].set_name("revenue"); + metadata.column_metadata[2].set_name("o_orderdate"); + metadata.column_metadata[3].set_name("o_shippriority"); builder = builder.metadata(metadata); auto options = builder.build(); cudf::io::write_parquet(options, chunk.stream()); From bc1a8a4aab3c4ba7fb66f3a44d88d4b2345d1aef Mon Sep 17 00:00:00 2001 From: Nick Becker Date: Sun, 9 Nov 2025 20:31:12 -0800 Subject: [PATCH 3/7] more WIP, columns mismatch in join --- cpp/benchmarks/streaming/ndsh/q03.cpp | 444 +++++++++++++------------- 1 file changed, 220 insertions(+), 224 deletions(-) diff --git a/cpp/benchmarks/streaming/ndsh/q03.cpp b/cpp/benchmarks/streaming/ndsh/q03.cpp index a1f379090..ffd315441 100644 --- a/cpp/benchmarks/streaming/ndsh/q03.cpp +++ b/cpp/benchmarks/streaming/ndsh/q03.cpp @@ -32,6 +32,7 @@ #include #include #include + #include #include #include #include @@ -87,7 +88,10 @@ get_table_path(input_directory, "customer") ); auto options = cudf::io::parquet_reader_options::builder(cudf::io::source_info(files)) - .columns({"c_mktsegment", "c_custkey"}) + .columns({ + "c_mktsegment", // 0 + "c_custkey" // 1 + }) .build(); return rapidsmpf::streaming::node::read_parquet( ctx, ch_out, num_producers, options, num_rows_per_chunk @@ -106,12 +110,12 @@ ); auto options = cudf::io::parquet_reader_options::builder(cudf::io::source_info(files)) .columns( - {"l_discount", - "l_extendedprice", - "l_orderkey", - "l_partkey", - "l_quantity", - "l_suppkey"} + { + "l_orderkey", // 0 + "l_shipdate", // 1 + "l_extendedprice", // 2 + "l_discount", // 3 + } ) .build(); return rapidsmpf::streaming::node::read_parquet( @@ -131,7 +135,12 @@ get_table_path(input_directory, "orders") ); auto options = cudf::io::parquet_reader_options::builder(cudf::io::source_info(files)) - .columns({"o_orderdate", "o_orderkey"}) + .columns({ + "o_orderkey", // 0 + "o_orderdate", // 1 + "o_shippriority", // 2 + "o_custkey" // 3 + }) .build(); return rapidsmpf::streaming::node::read_parquet( ctx, ch_out, num_producers, options, num_rows_per_chunk @@ -157,11 +166,16 @@ ); auto chunk_stream = chunk.stream(); auto table = chunk.table_view(); - auto p_name = table.column(0); - auto target = cudf::make_string_scalar("green", chunk_stream, mr); - auto mask = cudf::strings::contains( - p_name, *static_cast(target.get()), chunk_stream, mr - ); + auto c_mktsegment = table.column(0); + auto var1 = cudf::make_string_scalar("BUILDING", chunk_stream, mr); + auto mask = cudf::binary_operation( + table.column(0), // c_mktsegment is col 0 + *var1.get(), + cudf::binary_operator::EQUAL, + cudf::data_type(cudf::type_id::BOOL8), + chunk_stream, + mr + ); co_await ch_out->send( rapidsmpf::streaming::to_message( msg.sequence_number(), @@ -177,6 +191,25 @@ co_await ch_out->drain(ctx->executor()); } +// std::tm make_tm(int year, int month, int day) +// { +// std::tm tm{}; +// tm.tm_year = year - 1900; +// tm.tm_mon = month - 1; +// tm.tm_mday = day; +// return tm; +// } + +// int32_t days_since_epoch(int year, int month, int day) +// { +// std::tm tm = make_tm(year, month, day); +// std::tm epoch = make_tm(1970, 1, 1); +// std::time_t time = std::mktime(&tm); +// std::time_t epoch_time = std::mktime(&epoch); +// double diff = std::difftime(time, epoch_time) / (60 * 60 * 24); +// return static_cast(diff); +// } + // # .filter(pl.col("o_orderdate") < var2) ## var2 = date(1995, 3, 15) rapidsmpf::streaming::Node filter_orders( @@ -197,24 +230,43 @@ ); auto chunk_stream = chunk.stream(); auto table = chunk.table_view(); - auto p_name = table.column(0); - std::tm timeinfo = {}; - timeinfo.tm_year = 1993 - 1900; // years since 1900 - timeinfo.tm_mon = 3 - 1; // months since January - timeinfo.tm_mday = 15; - time_t epoch_secs = std::mktime(&timeinfo); - int64_t epoch_ms = static_cast(epoch_secs) * 1000; - auto var2 = cudf::make_fixed_width_scalar( - epoch_ms, - chunk_stream, - mr - ); + // std::tm timeinfo = {}; + // timeinfo.tm_year = 1993 - 1900; // years since 1900 + // timeinfo.tm_mon = 3 - 1; // months since January + // timeinfo.tm_mday = 15; + // time_t epoch_secs = std::mktime(&timeinfo); + // int64_t epoch_ms = static_cast(epoch_secs) * 1000; + // auto var2 = cudf::make_fixed_width_scalar( + // epoch_ms, + // chunk_stream, + // mr + // ); + + // auto o_orderdate_int64 = cudf::cast( + // table.column(1), + // cudf::data_type{cudf::type_id::INT64} + // ); + + cudf::data_type dtype = table.column(1).type(); + cudf::type_id type_id = dtype.id(); + std::cout << "Column type_id: " << static_cast(type_id) << std::endl; + + auto days_since_epoch = cudf::timestamp_D{cudf::duration_D{8440}}; + auto var2 = cudf::timestamp_scalar{days_since_epoch}; + // auto cv = table->get_column(6).view(); + + // auto mask = + // cudf::binary_operation(cv, date, cudf::binary_operator::LESS_EQUAL, + // cudf::data_type{cudf::type_id::BOOL8}); + + // auto var2 = cudf::timestamp_scalar(days_since_epoch(1993, 3, 15), true); auto mask = cudf::binary_operation( table.column(1), // o_orderdate is col 1 - *var2.get(), + // *o_orderdate_int64, + var2, cudf::binary_operator::LESS, cudf::data_type(cudf::type_id::BOOL8), chunk_stream, @@ -225,7 +277,10 @@ msg.sequence_number(), std::make_unique( cudf::apply_boolean_mask( - table.select({1}), mask->view(), chunk_stream, mr + table, // still need all columns + mask->view(), + chunk_stream, + mr ), chunk_stream ) @@ -256,24 +311,27 @@ rapidsmpf::streaming::Node filter_lineitem( ); auto chunk_stream = chunk.stream(); auto table = chunk.table_view(); - auto p_name = table.column(0); - std::tm timeinfo = {}; - timeinfo.tm_year = 1993 - 1900; // years since 1900 - timeinfo.tm_mon = 3 - 1; // months since January - timeinfo.tm_mday = 15; - time_t epoch_secs = std::mktime(&timeinfo); - int64_t epoch_ms = static_cast(epoch_secs) * 1000; - auto var2 = cudf::make_fixed_width_scalar( - epoch_ms, - chunk_stream, - mr - ); + // std::tm timeinfo = {}; + // timeinfo.tm_year = 1993 - 1900; // years since 1900 + // timeinfo.tm_mon = 3 - 1; // months since January + // timeinfo.tm_mday = 15; + // time_t epoch_secs = std::mktime(&timeinfo); + // int64_t epoch_ms = static_cast(epoch_secs) * 1000; + // auto var2 = cudf::make_fixed_width_scalar( + // epoch_ms, + // chunk_stream, + // mr + // ); + + auto days_since_epoch = cudf::timestamp_D{cudf::duration_D{8440}}; + auto var2 = cudf::timestamp_scalar{days_since_epoch}; auto mask = cudf::binary_operation( table.column(1), // l_shipdate is col 1 - *var2.get(), + // *var2.get(), + var2, cudf::binary_operator::GREATER, cudf::data_type(cudf::type_id::BOOL8), chunk_stream, @@ -871,213 +929,151 @@ rapidsmpf::streaming::Node filter_lineitem( std::vector nodes; auto start = std::chrono::steady_clock::now(); { - RAPIDSMPF_NVTX_SCOPED_RANGE("Constructing Q9 pipeline"); - auto part = ctx->create_channel(); - auto filtered_part = ctx->create_channel(); - auto partsupp = ctx->create_channel(); - auto part_x_partsupp = ctx->create_channel(); - auto supplier = ctx->create_channel(); + RAPIDSMPF_NVTX_SCOPED_RANGE("Constructing Q3 pipeline"); + + // Input data channels + auto customer = ctx->create_channel(); auto lineitem = ctx->create_channel(); - auto supplier_x_part_x_partsupp = ctx->create_channel(); - auto supplier_x_part_x_partsupp_x_lineitem = ctx->create_channel(); - nodes.push_back(read_part( + auto orders = ctx->create_channel(); + + // filtered channels + auto filtered_customer = ctx->create_channel(); + auto filtered_orders = ctx->create_channel(); + auto filtered_lineitem = ctx->create_channel(); + + // join channels + auto customer_x_orders = ctx->create_channel(); + auto customer_x_orders_x_lineitem = ctx->create_channel(); + auto all_joined = ctx->create_channel(); + + + // read and filter customer + nodes.push_back(read_customer( ctx, - part, + customer, /* num_tickets */ 2, cmd_options.num_rows_per_chunk, cmd_options.input_directory - )); // p_partkey, p_name - nodes.push_back(filter_part(ctx, part, filtered_part)); // p_partkey - nodes.push_back(read_partsupp( + )); + nodes.push_back(filter_customer(ctx, customer, filtered_customer)); + + // read and filter orders + nodes.push_back(read_orders( ctx, - partsupp, + orders, /* num_tickets */ 4, cmd_options.num_rows_per_chunk, cmd_options.input_directory - )); // ps_partkey, ps_suppkey, ps_supplycost - nodes.push_back( - // p_partkey x ps_partkey - rapidsmpf::ndsh::inner_join_broadcast( - ctx, - filtered_part, - partsupp, - part_x_partsupp, - {0}, - {0}, - rapidsmpf::OpID{static_cast(10 * i + op_id++)} - ) // p_partkey/ps_partkey, ps_suppkey, ps_supplycost - ); - nodes.push_back(read_supplier( - ctx, - supplier, - /* num_tickets */ 2, - cmd_options.num_rows_per_chunk, - cmd_options.input_directory - )); // s_nationkey, s_suppkey - nodes.push_back( - // s_suppkey x ps_suppkey - rapidsmpf::ndsh::inner_join_broadcast( - ctx, - supplier, - part_x_partsupp, - supplier_x_part_x_partsupp, - {1}, - {1}, - rapidsmpf::OpID{static_cast(10 * i + op_id++)} - - ) // s_nationkey, s_suppkey/ps_suppkey, p_partkey/ps_partkey, - // ps_supplycost - ); + )); + nodes.push_back(filter_orders(ctx, orders, filtered_orders)); + + // read and filter lineitem nodes.push_back(read_lineitem( - ctx, - lineitem, - /* num_tickets */ 4, - cmd_options.num_rows_per_chunk, - cmd_options.input_directory - )); // l_discount, l_extendedprice, l_orderkey, l_partkey, l_quantity, - // l_suppkey - nodes.push_back( - // [p_partkey, ps_suppkey] x [l_partkey, l_suppkey] - rapidsmpf::ndsh::inner_join_broadcast( - ctx, - supplier_x_part_x_partsupp, - lineitem, - supplier_x_part_x_partsupp_x_lineitem, - {2, 1}, - {3, 5}, - rapidsmpf::OpID{static_cast(10 * i + op_id++)}, - rapidsmpf::ndsh::KeepKeys::NO - ) // s_nationkey, ps_supplycost, - // l_discount, l_extendedprice, l_orderkey, l_quantity - ); - auto nation = ctx->create_channel(); - auto orders = ctx->create_channel(); - nodes.push_back( - read_nation( - ctx, - nation, - /* num_tickets */ 1, - cmd_options.num_rows_per_chunk, - cmd_options.input_directory - ) // n_name, n_nationkey - ); - nodes.push_back( - read_orders( - ctx, - orders, - /* num_tickets */ 4, - cmd_options.num_rows_per_chunk, - cmd_options.input_directory - ) // o_orderdate, o_orderkey - ); - auto all_joined = ctx->create_channel(); - auto supplier_x_part_x_partsupp_x_lineitem_x_orders = - ctx->create_channel(); - if (cmd_options.use_shuffle_join) { - auto supplier_x_part_x_partsupp_x_lineitem_shuffled = - ctx->create_channel(); - auto orders_shuffled = ctx->create_channel(); - std::uint32_t num_partitions = 8; - nodes.push_back( - rapidsmpf::ndsh::shuffle( - ctx, - supplier_x_part_x_partsupp_x_lineitem, - supplier_x_part_x_partsupp_x_lineitem_shuffled, - {4}, - num_partitions, - rapidsmpf::OpID{ - static_cast(10 * i + op_id++) - } - ) - ); - nodes.push_back( - rapidsmpf::ndsh::shuffle( - ctx, - orders, - orders_shuffled, - {1}, - num_partitions, - rapidsmpf::OpID{ - static_cast(10 * i + op_id++) - } - ) - ); - nodes.push_back( - // l_orderkey x o_orderkey - rapidsmpf::ndsh::inner_join_shuffle( - ctx, - supplier_x_part_x_partsupp_x_lineitem_shuffled, - orders_shuffled, - supplier_x_part_x_partsupp_x_lineitem_x_orders, - {4}, - {1}, - rapidsmpf::ndsh::KeepKeys::NO - ) // s_nationkey, ps_supplycost, l_discount, l_extendedprice, - // l_quantity, o_orderdate - ); - } else { - nodes.push_back( - // l_orderkey x o_orderkey - rapidsmpf::ndsh::inner_join_broadcast( - ctx, - supplier_x_part_x_partsupp_x_lineitem, - orders, - supplier_x_part_x_partsupp_x_lineitem_x_orders, - {4}, - {1}, - rapidsmpf::OpID{ - static_cast(10 * i + op_id++) - }, - rapidsmpf::ndsh::KeepKeys::NO - ) // s_nationkey, ps_supplycost, l_discount, l_extendedprice, - // l_quantity, o_orderdate - ); - } + ctx, + lineitem, + /* num_tickets */ 4, + cmd_options.num_rows_per_chunk, + cmd_options.input_directory + )); + nodes.push_back(filter_lineitem(ctx, lineitem, filtered_lineitem)); + + // join orders into customer nodes.push_back( - // n_nationkey x s_nationkey + // c_custkey x o_orderkey rapidsmpf::ndsh::inner_join_broadcast( ctx, - nation, - supplier_x_part_x_partsupp_x_lineitem_x_orders, - all_joined, - {1}, + filtered_customer, + filtered_orders, + customer_x_orders, {0}, + {3}, rapidsmpf::OpID{static_cast(10 * i + op_id++)}, - rapidsmpf::ndsh::KeepKeys::NO - ) // n_name, ps_supplycost, l_discount, l_extendedprice, - // l_quantity, o_orderdate - ); - auto groupby_input = ctx->create_channel(); - nodes.push_back(select_columns(ctx, all_joined, groupby_input)); - auto chunkwise_groupby_output = ctx->create_channel(); - nodes.push_back( - chunkwise_groupby_agg(ctx, groupby_input, chunkwise_groupby_output) - ); - auto concatenated_groupby_output = ctx->create_channel(); - nodes.push_back( - rapidsmpf::ndsh::concatenate( - ctx, - chunkwise_groupby_output, - concatenated_groupby_output, - rapidsmpf::ndsh::ConcatOrder::DONT_CARE + rapidsmpf::ndsh::KeepKeys::YES ) ); - auto groupby_output = ctx->create_channel(); - nodes.push_back(final_groupby_agg( - ctx, - concatenated_groupby_output, - groupby_output, - rapidsmpf::OpID{static_cast(10 * i + op_id++)} - )); + + // join lineitem into customer_x_orders + nodes.push_back( + // o_orderkey x l_orderkey + rapidsmpf::ndsh::inner_join_broadcast( + ctx, + customer_x_orders, + filtered_lineitem, + customer_x_orders_x_lineitem, + {1}, + {0}, + rapidsmpf::OpID{static_cast(10 * i + op_id++)}, + rapidsmpf::ndsh::KeepKeys::YES + ) + ); + + // with columns + auto groupby_input = ctx->create_channel(); + + nodes.push_back(with_columns( + ctx, + customer_x_orders_x_lineitem, + groupby_input + )); + + // groupby aggregation (agg (per chunk) -> concat -> agg (global)) + auto chunkwise_groupby_output = ctx->create_channel(); + nodes.push_back(chunkwise_groupby_agg( + ctx, + groupby_input, + chunkwise_groupby_output + )); + auto concatenated_groupby_output = ctx->create_channel(); + nodes.push_back(rapidsmpf::ndsh::concatenate( + ctx, + chunkwise_groupby_output, + concatenated_groupby_output, + rapidsmpf::ndsh::ConcatOrder::DONT_CARE + )); + auto groupby_output = ctx->create_channel(); + nodes.push_back(final_groupby_agg( + ctx, + concatenated_groupby_output, + groupby_output, + rapidsmpf::OpID{static_cast(10 * i + op_id++)} + )); + + // select columns + auto select_output = ctx->create_channel(); + nodes.push_back(select_columns( + ctx, + groupby_output, + select_output + )); + + // sort by auto sorted_output = ctx->create_channel(); - nodes.push_back(sort_by(ctx, groupby_output, sorted_output)); - nodes.push_back(write_parquet(ctx, sorted_output, output_path)); + nodes.push_back(sort_by( + ctx, + select_output, + sorted_output + )); + + // head + auto head_output = ctx->create_channel(); + nodes.push_back(head( + ctx, + sorted_output, + head_output + )); + + // write parquet + nodes.push_back(write_parquet( + ctx, + head_output, + output_path + )); } auto end = std::chrono::steady_clock::now(); std::chrono::duration pipeline = end - start; start = std::chrono::steady_clock::now(); { - RAPIDSMPF_NVTX_SCOPED_RANGE("Q9 Iteration"); + RAPIDSMPF_NVTX_SCOPED_RANGE("Q3 Iteration"); rapidsmpf::streaming::run_streaming_pipeline(std::move(nodes)); } end = std::chrono::steady_clock::now(); From 83e392a2e8871ee8ae5d3a2c04fc26343555ae92 Mon Sep 17 00:00:00 2001 From: Nick Becker Date: Mon, 10 Nov 2025 13:01:22 -0800 Subject: [PATCH 4/7] more WIP, updated head, join still failing after tag 1 finishes --- cpp/benchmarks/streaming/ndsh/q03.cpp | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/cpp/benchmarks/streaming/ndsh/q03.cpp b/cpp/benchmarks/streaming/ndsh/q03.cpp index ffd315441..e0be55f64 100644 --- a/cpp/benchmarks/streaming/ndsh/q03.cpp +++ b/cpp/benchmarks/streaming/ndsh/q03.cpp @@ -176,6 +176,7 @@ chunk_stream, mr ); + // std::cout << "Number of columns in filtered customer output: " << table.select({1}).num_columns() << std::endl; co_await ch_out->send( rapidsmpf::streaming::to_message( msg.sequence_number(), @@ -272,6 +273,7 @@ chunk_stream, mr ); + // std::cout << "Number of columns in filtered orders output: " << table.num_columns() << std::endl; co_await ch_out->send( rapidsmpf::streaming::to_message( msg.sequence_number(), @@ -292,7 +294,6 @@ // .filter(pl.col("l_shipdate") > var2) ## var2 = date(1995, 3, 15) -// # .filter(pl.col("o_orderdate") < var2) ## var2 = date(1995, 3, 15) rapidsmpf::streaming::Node filter_lineitem( std::shared_ptr ctx, std::shared_ptr ch_in, @@ -337,6 +338,7 @@ rapidsmpf::streaming::Node filter_lineitem( chunk_stream, mr ); + // std::cout << "Number of columns in filtered lineitem output: " << table.select({0, 2, 3}).num_columns() << std::endl; co_await ch_out->send( rapidsmpf::streaming::to_message( msg.sequence_number(), @@ -366,9 +368,8 @@ rapidsmpf::streaming::Node filter_lineitem( // "o_orderkey", # 1 (orders<-lineitem on o_orderkey) // "o_orderdate", # 2 // "o_shippriority", # 3 - // "l_shipdate", # 4 - // "l_extendedprice", # 5 - // "l_discount", # 6 + // "l_extendedprice", # 4 (l_shipdate was filtered out) + // "l_discount", # 5 co_await ctx->executor()->schedule(); while (true) { auto msg = co_await ch_in->receive(); @@ -402,8 +403,8 @@ rapidsmpf::streaming::Node filter_lineitem( table.column(3), chunk_stream, ctx->br()->device_mr() ) ); - auto extendedprice = table.column(5); - auto discount = table.column(6); + auto extendedprice = table.column(4); + auto discount = table.column(5); std::string udf = R"***( @@ -725,13 +726,13 @@ rapidsmpf::streaming::Node filter_lineitem( auto sequence_number = msg.sequence_number(); auto table = chunk.table_view(); std::vector head_indices{0, 10}; - auto sliced_table = cudf::slice(table, head_indices); + auto first_10_rows = cudf::slice(table, head_indices); co_await ch_out->send( rapidsmpf::streaming::to_message( sequence_number, std::make_unique( - std::make_unique(std::move(sliced_table)), chunk_stream + std::make_unique(std::move(first_10_rows[0])), chunk_stream ) ) ); @@ -948,6 +949,8 @@ rapidsmpf::streaming::Node filter_lineitem( // read and filter customer + // In: "c_mktsegment", "c_custkey" + // Out: "c_custkey" nodes.push_back(read_customer( ctx, customer, @@ -958,6 +961,8 @@ rapidsmpf::streaming::Node filter_lineitem( nodes.push_back(filter_customer(ctx, customer, filtered_customer)); // read and filter orders + // In: "o_orderkey", "o_orderdate", "o_shippriority", "o_custkey" + // Out: "o_orderkey", "o_orderdate", "o_shippriority", "o_custkey" nodes.push_back(read_orders( ctx, orders, @@ -968,6 +973,8 @@ rapidsmpf::streaming::Node filter_lineitem( nodes.push_back(filter_orders(ctx, orders, filtered_orders)); // read and filter lineitem + // In: "l_orderkey", "l_shipdate", "l_extendedprice", "l_discount" + // Out: "l_orderkey", "l_extendedprice", "l_discount" nodes.push_back(read_lineitem( ctx, lineitem, From 220f4b384d7de6fafd63cedecdc773488b67e08b Mon Sep 17 00:00:00 2001 From: Nick Becker Date: Mon, 10 Nov 2025 19:17:17 -0800 Subject: [PATCH 5/7] getting incorrect results with one MPI rank, but query finishes. getting msg not ready errors with two ranks --- cpp/benchmarks/streaming/ndsh/q03.cpp | 97 ++++++--------------------- 1 file changed, 19 insertions(+), 78 deletions(-) diff --git a/cpp/benchmarks/streaming/ndsh/q03.cpp b/cpp/benchmarks/streaming/ndsh/q03.cpp index e0be55f64..7dc456c54 100644 --- a/cpp/benchmarks/streaming/ndsh/q03.cpp +++ b/cpp/benchmarks/streaming/ndsh/q03.cpp @@ -112,7 +112,7 @@ .columns( { "l_orderkey", // 0 - "l_shipdate", // 1 + "l_shipdate", // 1 -- Timestamp[ms] "l_extendedprice", // 2 "l_discount", // 3 } @@ -137,7 +137,7 @@ auto options = cudf::io::parquet_reader_options::builder(cudf::io::source_info(files)) .columns({ "o_orderkey", // 0 - "o_orderdate", // 1 + "o_orderdate", // 1 -- Timestamp[ms] "o_shippriority", // 2 "o_custkey" // 3 }) @@ -166,11 +166,10 @@ ); auto chunk_stream = chunk.stream(); auto table = chunk.table_view(); - auto c_mktsegment = table.column(0); auto var1 = cudf::make_string_scalar("BUILDING", chunk_stream, mr); auto mask = cudf::binary_operation( table.column(0), // c_mktsegment is col 0 - *var1.get(), + *static_cast(var1.get()), cudf::binary_operator::EQUAL, cudf::data_type(cudf::type_id::BOOL8), chunk_stream, @@ -192,26 +191,6 @@ co_await ch_out->drain(ctx->executor()); } -// std::tm make_tm(int year, int month, int day) -// { -// std::tm tm{}; -// tm.tm_year = year - 1900; -// tm.tm_mon = month - 1; -// tm.tm_mday = day; -// return tm; -// } - -// int32_t days_since_epoch(int year, int month, int day) -// { -// std::tm tm = make_tm(year, month, day); -// std::tm epoch = make_tm(1970, 1, 1); -// std::time_t time = std::mktime(&tm); -// std::time_t epoch_time = std::mktime(&epoch); -// double diff = std::difftime(time, epoch_time) / (60 * 60 * 24); -// return static_cast(diff); -// } - - // # .filter(pl.col("o_orderdate") < var2) ## var2 = date(1995, 3, 15) rapidsmpf::streaming::Node filter_orders( std::shared_ptr ctx, @@ -231,42 +210,11 @@ ); auto chunk_stream = chunk.stream(); auto table = chunk.table_view(); - - - // std::tm timeinfo = {}; - // timeinfo.tm_year = 1993 - 1900; // years since 1900 - // timeinfo.tm_mon = 3 - 1; // months since January - // timeinfo.tm_mday = 15; - // time_t epoch_secs = std::mktime(&timeinfo); - // int64_t epoch_ms = static_cast(epoch_secs) * 1000; - // auto var2 = cudf::make_fixed_width_scalar( - // epoch_ms, - // chunk_stream, - // mr - // ); - - // auto o_orderdate_int64 = cudf::cast( - // table.column(1), - // cudf::data_type{cudf::type_id::INT64} - // ); - - cudf::data_type dtype = table.column(1).type(); - cudf::type_id type_id = dtype.id(); - std::cout << "Column type_id: " << static_cast(type_id) << std::endl; - - auto days_since_epoch = cudf::timestamp_D{cudf::duration_D{8440}}; - auto var2 = cudf::timestamp_scalar{days_since_epoch}; - // auto cv = table->get_column(6).view(); - - // auto mask = - // cudf::binary_operation(cv, date, cudf::binary_operator::LESS_EQUAL, - // cudf::data_type{cudf::type_id::BOOL8}); - - // auto var2 = cudf::timestamp_scalar(days_since_epoch(1993, 3, 15), true); + auto ms_since_epoch = cudf::timestamp_ms{cudf::duration_ms{795225600000}}; + auto var2 = cudf::timestamp_scalar{ms_since_epoch}; auto mask = cudf::binary_operation( table.column(1), // o_orderdate is col 1 - // *o_orderdate_int64, var2, cudf::binary_operator::LESS, cudf::data_type(cudf::type_id::BOOL8), @@ -312,26 +260,12 @@ rapidsmpf::streaming::Node filter_lineitem( ); auto chunk_stream = chunk.stream(); auto table = chunk.table_view(); - - // std::tm timeinfo = {}; - // timeinfo.tm_year = 1993 - 1900; // years since 1900 - // timeinfo.tm_mon = 3 - 1; // months since January - // timeinfo.tm_mday = 15; - // time_t epoch_secs = std::mktime(&timeinfo); - // int64_t epoch_ms = static_cast(epoch_secs) * 1000; - // auto var2 = cudf::make_fixed_width_scalar( - // epoch_ms, - // chunk_stream, - // mr - // ); - - auto days_since_epoch = cudf::timestamp_D{cudf::duration_D{8440}}; - auto var2 = cudf::timestamp_scalar{days_since_epoch}; + auto ms_since_epoch = cudf::timestamp_ms{cudf::duration_ms{795225600000}}; + auto var2 = cudf::timestamp_scalar{ms_since_epoch}; auto mask = cudf::binary_operation( table.column(1), // l_shipdate is col 1 - // *var2.get(), var2, cudf::binary_operator::GREATER, cudf::data_type(cudf::type_id::BOOL8), @@ -382,6 +316,15 @@ rapidsmpf::streaming::Node filter_lineitem( auto chunk_stream = chunk.stream(); auto sequence_number = msg.sequence_number(); auto table = chunk.table_view(); + + // std::cout << "Number of columns in with columns output: " << table.num_columns() << std::endl; + + // for (int i = 0; i < table.num_columns(); ++i) { + // cudf::data_type dtype = table.column(i).type(); + // cudf::type_id tid = dtype.id(); + // std::cout << "Column " << i << " type_id: " << static_cast(tid) << std::endl; + // } + std::vector> result; result.reserve(4); @@ -945,8 +888,6 @@ rapidsmpf::streaming::Node filter_lineitem( // join channels auto customer_x_orders = ctx->create_channel(); auto customer_x_orders_x_lineitem = ctx->create_channel(); - auto all_joined = ctx->create_channel(); - // read and filter customer // In: "c_mktsegment", "c_custkey" @@ -984,7 +925,7 @@ rapidsmpf::streaming::Node filter_lineitem( )); nodes.push_back(filter_lineitem(ctx, lineitem, filtered_lineitem)); - // join orders into customer + // join customers and orders nodes.push_back( // c_custkey x o_orderkey rapidsmpf::ndsh::inner_join_broadcast( @@ -999,7 +940,7 @@ rapidsmpf::streaming::Node filter_lineitem( ) ); - // join lineitem into customer_x_orders + // join customer_x_orders and lineitem nodes.push_back( // o_orderkey x l_orderkey rapidsmpf::ndsh::inner_join_broadcast( @@ -1007,7 +948,7 @@ rapidsmpf::streaming::Node filter_lineitem( customer_x_orders, filtered_lineitem, customer_x_orders_x_lineitem, - {1}, + {0}, {0}, rapidsmpf::OpID{static_cast(10 * i + op_id++)}, rapidsmpf::ndsh::KeepKeys::YES From 25a493f7efa8fe3eddcea6a65fd5ff7bea13a787 Mon Sep 17 00:00:00 2001 From: Nick Becker Date: Mon, 10 Nov 2025 19:27:59 -0800 Subject: [PATCH 6/7] fix customer/orders key comment --- cpp/benchmarks/streaming/ndsh/q03.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/benchmarks/streaming/ndsh/q03.cpp b/cpp/benchmarks/streaming/ndsh/q03.cpp index 7dc456c54..6e09d56d2 100644 --- a/cpp/benchmarks/streaming/ndsh/q03.cpp +++ b/cpp/benchmarks/streaming/ndsh/q03.cpp @@ -927,7 +927,7 @@ rapidsmpf::streaming::Node filter_lineitem( // join customers and orders nodes.push_back( - // c_custkey x o_orderkey + // c_custkey x o_custkey rapidsmpf::ndsh::inner_join_broadcast( ctx, filtered_customer, From 9eca94e1a2d46621fb9734a2ea4f896d5fc92698 Mon Sep 17 00:00:00 2001 From: Nick Becker Date: Wed, 12 Nov 2025 20:13:55 -0800 Subject: [PATCH 7/7] still not working, but correct group by keys in the final output --- cpp/benchmarks/streaming/ndsh/q03.cpp | 227 +++++++++++++++----------- 1 file changed, 129 insertions(+), 98 deletions(-) diff --git a/cpp/benchmarks/streaming/ndsh/q03.cpp b/cpp/benchmarks/streaming/ndsh/q03.cpp index 6e09d56d2..3bd4b01da 100644 --- a/cpp/benchmarks/streaming/ndsh/q03.cpp +++ b/cpp/benchmarks/streaming/ndsh/q03.cpp @@ -167,22 +167,27 @@ auto chunk_stream = chunk.stream(); auto table = chunk.table_view(); auto var1 = cudf::make_string_scalar("BUILDING", chunk_stream, mr); + auto mask = cudf::binary_operation( table.column(0), // c_mktsegment is col 0 - *static_cast(var1.get()), + *var1, cudf::binary_operator::EQUAL, cudf::data_type(cudf::type_id::BOOL8), chunk_stream, mr ); + + auto filtered = cudf::apply_boolean_mask( + table, + mask->view(), + chunk_stream, mr + ); // std::cout << "Number of columns in filtered customer output: " << table.select({1}).num_columns() << std::endl; co_await ch_out->send( rapidsmpf::streaming::to_message( msg.sequence_number(), std::make_unique( - cudf::apply_boolean_mask( - table.select({1}), mask->view(), chunk_stream, mr - ), + std::move(filtered), chunk_stream ) ) @@ -211,8 +216,11 @@ auto chunk_stream = chunk.stream(); auto table = chunk.table_view(); - auto ms_since_epoch = cudf::timestamp_ms{cudf::duration_ms{795225600000}}; - auto var2 = cudf::timestamp_scalar{ms_since_epoch}; + // auto ms_since_epoch = cudf::timestamp_ms{cudf::duration_ms{795225600000}}; + // auto var2 = cudf::timestamp_scalar{ms_since_epoch}; + auto days_since_epoch = cudf::timestamp_D{cudf::duration_D{9204}}; + auto var2 = cudf::timestamp_scalar{days_since_epoch}; + auto mask = cudf::binary_operation( table.column(1), // o_orderdate is col 1 var2, @@ -221,17 +229,18 @@ chunk_stream, mr ); + auto filtered = cudf::apply_boolean_mask( + table, + mask->view(), + chunk_stream, mr + ); + // std::cout << "Number of columns in filtered orders output: " << table.num_columns() << std::endl; co_await ch_out->send( rapidsmpf::streaming::to_message( msg.sequence_number(), std::make_unique( - cudf::apply_boolean_mask( - table, // still need all columns - mask->view(), - chunk_stream, - mr - ), + std::move(filtered), chunk_stream ) ) @@ -261,8 +270,10 @@ rapidsmpf::streaming::Node filter_lineitem( auto chunk_stream = chunk.stream(); auto table = chunk.table_view(); - auto ms_since_epoch = cudf::timestamp_ms{cudf::duration_ms{795225600000}}; - auto var2 = cudf::timestamp_scalar{ms_since_epoch}; + // auto ms_since_epoch = cudf::timestamp_ms{cudf::duration_ms{795225600000}}; + // auto var2 = cudf::timestamp_scalar{ms_since_epoch}; + auto days_since_epoch = cudf::timestamp_D{cudf::duration_D{9204}}; + auto var2 = cudf::timestamp_scalar{days_since_epoch}; auto mask = cudf::binary_operation( table.column(1), // l_shipdate is col 1 @@ -272,15 +283,17 @@ rapidsmpf::streaming::Node filter_lineitem( chunk_stream, mr ); + auto filtered = cudf::apply_boolean_mask( + table, + mask->view(), + chunk_stream, mr + ); // std::cout << "Number of columns in filtered lineitem output: " << table.select({0, 2, 3}).num_columns() << std::endl; co_await ch_out->send( rapidsmpf::streaming::to_message( msg.sequence_number(), std::make_unique( - cudf::apply_boolean_mask( - // no longer need l_shipdate - table.select({0, 2, 3}), mask->view(), chunk_stream, mr - ), + std::move(filtered), chunk_stream ) ) @@ -298,12 +311,14 @@ rapidsmpf::streaming::Node filter_lineitem( rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; // customer_x_orders_x_lineitem is the input to the with_column op - // "c_custkey", # 0 (customers<-orders on o_custkey) - // "o_orderkey", # 1 (orders<-lineitem on o_orderkey) - // "o_orderdate", # 2 - // "o_shippriority", # 3 - // "l_extendedprice", # 4 (l_shipdate was filtered out) - // "l_discount", # 5 + // "c_mktsegment", # 0 + // "c_custkey", # 1 (customers<-orders on o_custkey) + // "o_orderkey", # 2 (orders<-lineitem on o_orderkey) + // "o_orderdate", # 3 + // "o_shippriority", # 4 + // "l_shipdate", # 5 + // "l_extendedprice", # 6 + // "l_discount", # 7 co_await ctx->executor()->schedule(); while (true) { auto msg = co_await ch_in->receive(); @@ -331,23 +346,23 @@ rapidsmpf::streaming::Node filter_lineitem( // o_orderkey result.push_back( std::make_unique( - table.column(1), chunk_stream, ctx->br()->device_mr() + table.column(2), chunk_stream, ctx->br()->device_mr() ) ); // o_orderdate result.push_back( std::make_unique( - table.column(2), chunk_stream, ctx->br()->device_mr() + table.column(3), chunk_stream, ctx->br()->device_mr() ) ); // o_shippriority result.push_back( std::make_unique( - table.column(3), chunk_stream, ctx->br()->device_mr() + table.column(4), chunk_stream, ctx->br()->device_mr() ) ); - auto extendedprice = table.column(4); - auto discount = table.column(5); + auto extendedprice = table.column(6); + auto discount = table.column(7); std::string udf = R"***( @@ -367,6 +382,7 @@ rapidsmpf::streaming::Node filter_lineitem( ctx->br()->device_mr() ) ); + // Out: [o_orderkey, o_orderdate, o_shippriority, revenue] co_await ch_out->send( rapidsmpf::streaming::to_message( sequence_number, @@ -378,68 +394,8 @@ rapidsmpf::streaming::Node filter_lineitem( } co_await ch_out->drain(ctx->executor()); } - -// change the order of columns from o_orderkey, o_orderdate, o_shippriority, revenue -// to o_orderkey, revenue, o_orderdate, o_shippriority - [[maybe_unused]] rapidsmpf::streaming::Node select_columns( - std::shared_ptr ctx, - std::shared_ptr ch_in, - std::shared_ptr ch_out -) { - rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; - - co_await ctx->executor()->schedule(); - while (true) { - auto msg = co_await ch_in->receive(); - if (msg.empty()) { - break; - } - auto chunk = rapidsmpf::ndsh::to_device( - ctx, msg.release() - ); - auto chunk_stream = chunk.stream(); - auto sequence_number = msg.sequence_number(); - auto table = chunk.table_view(); - std::vector> result; - result.reserve(4); - - // o_orderkey - result.push_back( - std::make_unique( - table.column(0), chunk_stream, ctx->br()->device_mr() - ) - ); - // revenue - result.push_back( - std::make_unique( - table.column(3), chunk_stream, ctx->br()->device_mr() - ) - ); - // o_orderdate - result.push_back( - std::make_unique( - table.column(1), chunk_stream, ctx->br()->device_mr() - ) - ); - // o_shippriority - result.push_back( - std::make_unique( - table.column(2), chunk_stream, ctx->br()->device_mr() - ) - ); - co_await ch_out->send( - rapidsmpf::streaming::to_message( - sequence_number, - std::make_unique( - std::make_unique(std::move(result)), chunk_stream - ) - ) - ); - } - co_await ch_out->drain(ctx->executor()); -} - +// In: [o_orderkey, o_orderdate, o_shippriority, revenue] [[maybe_unused]] rapidsmpf::streaming::Node chunkwise_groupby_agg( [[maybe_unused]] std::shared_ptr ctx, std::shared_ptr ch_in, @@ -460,8 +416,17 @@ rapidsmpf::streaming::Node filter_lineitem( ); auto chunk_stream = chunk.stream(); auto table = chunk.table_view(); + + // std::cout << "Number of columns in groupby input: " << table.num_columns() << std::endl; + + // for (int i = 0; i < table.num_columns(); ++i) { + // cudf::data_type dtype = table.column(i).type(); + // cudf::type_id tid = dtype.id(); + // std::cout << "Column " << i << " type_id: " << static_cast(tid) << std::endl; + // } + auto grouper = cudf::groupby::groupby( - // grup by [o_orderkey, o_orderdate, o_shippriority] + // group by [o_orderkey, o_orderdate, o_shippriority] table.select({0, 1, 2}), cudf::null_policy::EXCLUDE, cudf::sorted::NO ); auto requests = std::vector(); @@ -610,7 +575,69 @@ rapidsmpf::streaming::Node filter_lineitem( } co_await ch_out->drain(ctx->executor()); } + +// change the order of columns from o_orderkey, o_orderdate, o_shippriority, revenue +// to o_orderkey, revenue, o_orderdate, o_shippriority +[[maybe_unused]] rapidsmpf::streaming::Node select_columns( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + + co_await ctx->executor()->schedule(); + while (true) { + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + break; + } + auto chunk = rapidsmpf::ndsh::to_device( + ctx, msg.release() + ); + auto chunk_stream = chunk.stream(); + auto sequence_number = msg.sequence_number(); + auto table = chunk.table_view(); + std::vector> result; + result.reserve(4); + + // o_orderkey + result.push_back( + std::make_unique( + table.column(0), chunk_stream, ctx->br()->device_mr() + ) + ); + // revenue + result.push_back( + std::make_unique( + table.column(3), chunk_stream, ctx->br()->device_mr() + ) + ); + // o_orderdate + result.push_back( + std::make_unique( + table.column(1), chunk_stream, ctx->br()->device_mr() + ) + ); + // o_shippriority + result.push_back( + std::make_unique( + table.column(2), chunk_stream, ctx->br()->device_mr() + ) + ); + co_await ch_out->send( + rapidsmpf::streaming::to_message( + sequence_number, + std::make_unique( + std::make_unique(std::move(result)), chunk_stream + ) + ) + ); + } + co_await ch_out->drain(ctx->executor()); +} + + [[maybe_unused]] rapidsmpf::streaming::Node sort_by( [[maybe_unused]] std::shared_ptr ctx, std::shared_ptr ch_in, @@ -891,7 +918,7 @@ rapidsmpf::streaming::Node filter_lineitem( // read and filter customer // In: "c_mktsegment", "c_custkey" - // Out: "c_custkey" + // Out: "c_mktsegment", "c_custkey" nodes.push_back(read_customer( ctx, customer, @@ -915,7 +942,7 @@ rapidsmpf::streaming::Node filter_lineitem( // read and filter lineitem // In: "l_orderkey", "l_shipdate", "l_extendedprice", "l_discount" - // Out: "l_orderkey", "l_extendedprice", "l_discount" + // Out: "l_orderkey", "l_shipdate", "l_extendedprice", "l_discount" nodes.push_back(read_lineitem( ctx, lineitem, @@ -927,13 +954,15 @@ rapidsmpf::streaming::Node filter_lineitem( // join customers and orders nodes.push_back( - // c_custkey x o_custkey + // Keys c_custkey x o_custkey + // Left: "c_mktsegment", "c_custkey" + // Right: "o_orderkey", "o_orderdate", "o_shippriority", "o_custkey" rapidsmpf::ndsh::inner_join_broadcast( ctx, filtered_customer, filtered_orders, customer_x_orders, - {0}, + {1}, {3}, rapidsmpf::OpID{static_cast(10 * i + op_id++)}, rapidsmpf::ndsh::KeepKeys::YES @@ -941,21 +970,23 @@ rapidsmpf::streaming::Node filter_lineitem( ); // join customer_x_orders and lineitem + // Keys o_orderkey x l_orderkey + // Left: "c_mktsegment", "c_custkey", "o_orderkey", "o_orderdate", "o_shippriority", + // Right: "l_orderkey", "l_shipdate", "l_extendedprice", "l_discount" nodes.push_back( - // o_orderkey x l_orderkey rapidsmpf::ndsh::inner_join_broadcast( ctx, customer_x_orders, filtered_lineitem, customer_x_orders_x_lineitem, - {0}, + {2}, {0}, rapidsmpf::OpID{static_cast(10 * i + op_id++)}, rapidsmpf::ndsh::KeepKeys::YES ) ); - // with columns + // with columns (and eliminating columns) auto groupby_input = ctx->create_channel(); nodes.push_back(with_columns(