diff --git a/cpp/benchmarks/streaming/ndsh/CMakeLists.txt b/cpp/benchmarks/streaming/ndsh/CMakeLists.txt index 6327cd3bc..9b7be0a58 100644 --- a/cpp/benchmarks/streaming/ndsh/CMakeLists.txt +++ b/cpp/benchmarks/streaming/ndsh/CMakeLists.txt @@ -41,7 +41,7 @@ target_link_libraries( $ maybe_asan ) -set(RAPIDSMPFNDSH_QUERIES q01 q03 q09 q21 bench_read) +set(RAPIDSMPFNDSH_QUERIES q01 q03 q04 q09 q21 bench_read) foreach(query IN ITEMS ${RAPIDSMPFNDSH_QUERIES}) add_executable(${query} "${query}.cpp") diff --git a/cpp/benchmarks/streaming/ndsh/join.cpp b/cpp/benchmarks/streaming/ndsh/join.cpp index 4b3665e16..f647f3065 100644 --- a/cpp/benchmarks/streaming/ndsh/join.cpp +++ b/cpp/benchmarks/streaming/ndsh/join.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -144,6 +145,75 @@ streaming::Node broadcast( co_await ch_out->drain(ctx->executor()); } +/** + * @brief Join a table chunk against a build hash table returning a message of the result. + * + * @param ctx Streaming context + * @param left_chunk Chunk to join. Used as the probe table in a filtered join. + * @param right_chunk Chunk to join. Used as the build table in a filtered join. + * @param left_carrier Columns from `left_chunk` to include in the output. + * @param left_on Key column indices in `left_chunk`. + * @param right_on Key column indices in `right_chunk`. + * @param sequence Sequence number of the output + * @param left_event Event recording the availability of `left_chunk`. + * + * @return Message of `TableChunk` containing the result of the semi join. + */ +streaming::Message semi_join_chunk( + std::shared_ptr ctx, + streaming::TableChunk const& left_chunk, + streaming::TableChunk&& right_chunk, + cudf::table_view left_carrier, + std::vector left_on, + std::vector right_on, + std::uint64_t sequence, + CudaEvent* left_event +) { + auto chunk_stream = right_chunk.stream(); + + left_event->stream_wait(chunk_stream); + + // At this point, both left_chunk and right_chunk are valid on + // either stream. We'll do everything from here out on the + // right_chunk.stream(), so that we don't introduce false dependencies + // between the different chunks. + + auto joiner = cudf::filtered_join( + right_chunk.table_view().select(right_on), + cudf::null_equality::UNEQUAL, + cudf::set_as_build_table::RIGHT, + chunk_stream + ); + + auto match = joiner.semi_join( + left_chunk.table_view().select(left_on), chunk_stream, ctx->br()->device_mr() + ); + + ctx->comm()->logger().debug( + "semi_join_chunk: left.num_rows()=", left_chunk.table_view().num_rows() + ); + ctx->comm()->logger().debug("semi_join_chunk: match.size()=", match->size()); + + cudf::column_view indices = cudf::device_span(*match); + auto result_columns = cudf::gather( + left_carrier, + indices, + cudf::out_of_bounds_policy::DONT_CHECK, + chunk_stream, + ctx->br()->device_mr() + ) + ->release(); + + auto result_table = std::make_unique(std::move(result_columns)); + // Deallocation of the join indices will happen on chunk_stream, so add stream dep + cuda_stream_join(left_chunk.stream(), chunk_stream); + + return streaming::to_message( + sequence, + std::make_unique(std::move(result_table), chunk_stream) + ); +} + /** * @brief Join a table chunk against a build hash table returning a message of the result. * @@ -345,6 +415,131 @@ streaming::Node inner_join_shuffle( co_await ch_out->drain(ctx->executor()); } +streaming::Node left_semi_join_broadcast_left( + std::shared_ptr ctx, + std::shared_ptr left, + std::shared_ptr right, + std::shared_ptr ch_out, + std::vector left_on, + std::vector right_on, + OpID tag, + KeepKeys keep_keys +) { + streaming::ShutdownAtExit c{left, right, ch_out}; + co_await ctx->executor()->schedule(); + ctx->comm()->logger().print("Left semi broadcast join ", static_cast(tag)); + auto left_table = co_await (co_await broadcast(ctx, left, tag)) + .release() + .make_available(ctx); + ctx->comm()->logger().print( + "Left (probe) table has ", left_table.table_view().num_rows(), " rows" + ); + CudaEvent left_event; + left_event.record(left_table.stream()); + + cudf::table_view left_carrier; + if (keep_keys == KeepKeys::YES) { + left_carrier = left_table.table_view(); + } else { + std::vector to_keep; + std::ranges::copy_if( + std::ranges::iota_view(0, left_table.table_view().num_columns()), + std::back_inserter(to_keep), + [&](auto i) { return std::ranges::find(left_on, i) == left_on.end(); } + ); + left_carrier = left_table.table_view().select(to_keep); + } + + while (!ch_out->is_shutdown()) { + auto right_msg = co_await right->receive(); + if (right_msg.empty()) { + break; + } + // The ``right`` table has been hash-partitioned (via a shuffle) on + // the join key. Thanks to the hash-partitioning, we don't need to worry + // about deduplicating matches across partitions. Anything that matches + // in the semi-join belongs in the output. + auto right_chunk = + co_await right_msg.release().make_available(ctx); + co_await ch_out->send(semi_join_chunk( + ctx, + left_table, + std::move(right_chunk), + left_carrier, + left_on, + right_on, + right_msg.sequence_number(), + &left_event + )); + } + + co_await ch_out->drain(ctx->executor()); +} + +streaming::Node left_semi_join_shuffle( + std::shared_ptr ctx, + std::shared_ptr left, + std::shared_ptr right, + std::shared_ptr ch_out, + std::vector left_on, + std::vector right_on, + KeepKeys keep_keys +) { + streaming::ShutdownAtExit c{left, right, ch_out}; + ctx->comm()->logger().print("Shuffle left semi join"); + + co_await ctx->executor()->schedule(); + CudaEvent left_event; + + while (!ch_out->is_shutdown()) { + // Requirement: two shuffles kick out partitions in the same order + auto left_msg = co_await left->receive(); + auto right_msg = co_await right->receive(); + + if (left_msg.empty()) { + RAPIDSMPF_EXPECTS( + right_msg.empty(), "Left does not have same number of partitions as right" + ); + break; + } + RAPIDSMPF_EXPECTS( + left_msg.sequence_number() == right_msg.sequence_number(), + "Mismatching sequence numbers" + ); + + auto left_chunk = + co_await left_msg.release().make_available(ctx); + auto right_chunk = + co_await right_msg.release().make_available(ctx); + + left_event.record(left_chunk.stream()); + + cudf::table_view left_carrier; + if (keep_keys == KeepKeys::YES) { + left_carrier = left_chunk.table_view(); + } else { + std::vector to_keep; + std::ranges::copy_if( + std::ranges::iota_view(0, left_chunk.table_view().num_columns()), + std::back_inserter(to_keep), + [&](auto i) { return std::ranges::find(left_on, i) == left_on.end(); } + ); + left_carrier = left_chunk.table_view().select(to_keep); + } + + co_await ch_out->send(semi_join_chunk( + ctx, + left_chunk, + std::move(right_chunk), + left_carrier, + left_on, + right_on, + left_msg.sequence_number(), + &left_event + )); + } +} + streaming::Node shuffle( std::shared_ptr ctx, std::shared_ptr ch_in, @@ -360,6 +555,7 @@ streaming::Node shuffle( while (true) { auto msg = co_await ch_in->receive(); if (msg.empty()) { + ctx->comm()->logger().debug("Shuffle: no more input"); break; } auto chunk = co_await msg.release().make_available(ctx); diff --git a/cpp/benchmarks/streaming/ndsh/join.hpp b/cpp/benchmarks/streaming/ndsh/join.hpp index 67e701817..253ce0b3c 100644 --- a/cpp/benchmarks/streaming/ndsh/join.hpp +++ b/cpp/benchmarks/streaming/ndsh/join.hpp @@ -1,5 +1,5 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 */ @@ -120,6 +120,67 @@ enum class KeepKeys : bool { KeepKeys keep_keys = KeepKeys::YES ); +/** + * @brief Perform a streaming left semi join between two tables. + * + * @note This performs a broadcast join, broadcasting the table represented by the `left` + * channel to all ranks, and then streaming through the chunks of the `right` channel. + * The `right` channel is required to provide hash-partitioned data in-order. + * All of the chunks from the `left` channel must fit in memory at once. + * + * @param ctx Streaming context. + * @param left Channel of `TableChunk`s. + * @param right Channel of `TableChunk`s in hash-partitioned order (shuffled). + * @param ch_out Output channel of `TableChunk`s. + * @param left_on Column indices of the keys in the left table. + * @param right_on Column indices of the keys in the right table. + * @param tag Disambiguating tag for the broadcast of the left table. + * @param keep_keys Does the result contain the key columns, or only "carrier" value + * columns + * + * @return Coroutine representing the completion of the join. + */ +streaming::Node left_semi_join_broadcast_left( + std::shared_ptr ctx, + // We will always choose left as build table and do "broadcast" joins + std::shared_ptr left, + std::shared_ptr right, + std::shared_ptr ch_out, + std::vector left_on, + std::vector right_on, + OpID tag, + KeepKeys keep_keys +); + +/** + * @brief Perform a streaming left semi join between two tables. + * + * @note This performs a shuffle join, the left and right channels are required to provide + * hash-partitioned data in-order. + * + * @param ctx Streaming context. + * @param left Channel of `TableChunk`s in hash-partitioned order. + * @param right Channel of `TableChunk`s in matching hash-partitioned order. + * @param ch_out Output channel of `TableChunk`s. + * @param left_on Column indices of the keys in the left table. + * @param right_on Column indices of the keys in the right table. + * @param tag Disambiguating tag for the broadcast of the left table. + * @param keep_keys Does the result contain the key columns, or only "carrier" value + * columns + * + * @return Coroutine representing the completion of the join. + */ + +streaming::Node left_semi_join_shuffle( + std::shared_ptr ctx, + std::shared_ptr left, + std::shared_ptr right, + std::shared_ptr ch_out, + std::vector left_on, + std::vector right_on, + KeepKeys keep_keys = KeepKeys::YES +); + /** * @brief Shuffle the input channel by hash-partitioning on given key columns. * diff --git a/cpp/benchmarks/streaming/ndsh/q04.cpp b/cpp/benchmarks/streaming/ndsh/q04.cpp new file mode 100644 index 000000000..e5b6b63d1 --- /dev/null +++ b/cpp/benchmarks/streaming/ndsh/q04.cpp @@ -0,0 +1,533 @@ +/** + + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "bloom_filter.hpp" +#include "concatenate.hpp" +#include "groupby.hpp" +#include "join.hpp" +#include "parquet_writer.hpp" +#include "sort.hpp" +#include "utils.hpp" + +namespace { + +std::vector chunkwise_groupby_requests() { + auto requests = std::vector(); + std::vector()>> aggs; + // count(*) + aggs.emplace_back([]() { + return cudf::make_count_aggregation( + cudf::null_policy::INCLUDE + ); + }); + requests.emplace_back(0, std::move(aggs)); + return requests; +} + +std::vector final_groupby_requests() { + auto requests = std::vector(); + std::vector()>> aggs; + // sum of partial counts + aggs.emplace_back([]() { + return cudf::make_sum_aggregation(); + }); + requests.emplace_back(1, std::move(aggs)); // column 1 is order_count + return requests; +} + +rapidsmpf::streaming::Node read_lineitem( + std::shared_ptr ctx, + std::shared_ptr ch_out, + std::size_t num_producers, + cudf::size_type num_rows_per_chunk, + std::string const& input_directory +) { + auto files = rapidsmpf::ndsh::detail::list_parquet_files( + rapidsmpf::ndsh::detail::get_table_path(input_directory, "lineitem") + ); + auto options = cudf::io::parquet_reader_options::builder(cudf::io::source_info(files)) + .columns({ + "l_commitdate", // used in filter + "l_receiptdate", // used in filter + "l_orderkey", // used in join + }) + .build(); + + return rapidsmpf::streaming::node::read_parquet( + ctx, ch_out, num_producers, options, num_rows_per_chunk + ); +} + +rapidsmpf::streaming::Node read_orders( + std::shared_ptr ctx, + std::shared_ptr ch_out, + std::size_t num_producers, + cudf::size_type num_rows_per_chunk, + std::string const& input_directory, + bool use_date32 +) { + auto files = rapidsmpf::ndsh::detail::list_parquet_files( + rapidsmpf::ndsh::detail::get_table_path(input_directory, "orders") + ); + auto options = cudf::io::parquet_reader_options::builder(cudf::io::source_info(files)) + .columns({ + "o_orderkey", // used in join + "o_orderpriority", // used in group by + }) + .build(); + + auto stream = ctx->br()->stream_pool().get_stream(); + // 1993-07-01 <= o_orderdate < 1993-10-01 + constexpr auto start_date = cuda::std::chrono::year_month_day( + cuda::std::chrono::year(1993), + cuda::std::chrono::month(7), + cuda::std::chrono::day(1) + ); + constexpr auto end_date = cuda::std::chrono::year_month_day( + cuda::std::chrono::year(1993), + cuda::std::chrono::month(10), + cuda::std::chrono::day(1) + ); + auto filter = use_date32 + ? rapidsmpf::ndsh::make_date_range_filter( + stream, start_date, end_date, "o_orderdate" + ) + : rapidsmpf::ndsh::make_date_range_filter( + stream, start_date, end_date, "o_orderdate" + ); + + return rapidsmpf::streaming::node::read_parquet( + ctx, ch_out, num_producers, options, num_rows_per_chunk, std::move(filter) + ); +} + +rapidsmpf::streaming::Node filter_lineitem( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + co_await ctx->executor()->schedule(); + auto mr = ctx->br()->device_mr(); + + while (!ch_out->is_shutdown()) { + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + break; + } + auto chunk = + co_await msg.release().make_available(ctx); + auto chunk_stream = chunk.stream(); + auto table = chunk.table_view(); + + auto l_commitdate = table.column(0); + auto l_receiptdate = table.column(1); + auto mask = cudf::binary_operation( + l_commitdate, + l_receiptdate, + cudf::binary_operator::LESS, + cudf::data_type(cudf::type_id::BOOL8), + chunk_stream, + mr + ); + auto filtered_table = + cudf::apply_boolean_mask(table.select({2}), mask->view(), chunk_stream, mr); + co_await ch_out->send( + rapidsmpf::streaming::to_message( + msg.sequence_number(), + std::make_unique( + std::move(filtered_table), chunk_stream + ) + ) + ); + } + co_await ch_out->drain(ctx->executor()); +} + +[[maybe_unused]] +rapidsmpf::streaming::Node fanout_bounded( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch1_out, + std::vector ch1_cols, + std::shared_ptr ch2_out +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch1_out, ch2_out}; + co_await ctx->executor()->schedule(); + + while (true) { + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + break; + } + auto chunk = + co_await msg.release().make_available( + ctx + ); // Here, we know that copying ch1_cols (a single col) is better than + // copying + // ch2_cols (the whole table) + std::vector> tasks; + if (!ch1_out->is_shutdown()) { + auto msg1 = rapidsmpf::streaming::to_message( + msg.sequence_number(), + std::make_unique( + std::make_unique( + chunk.table_view().select(ch1_cols), + chunk.stream(), + ctx->br()->device_mr() + ), + chunk.stream() + ) + ); + tasks.push_back(ch1_out->send(std::move(msg1))); + } + if (!ch2_out->is_shutdown()) { + // TODO: We know here that ch2 wants the whole table. + tasks.push_back(ch2_out->send( + rapidsmpf::streaming::to_message( + msg.sequence_number(), + std::make_unique(std::move(chunk)) + ) + )); + } + if (!std::ranges::any_of( + rapidsmpf::streaming::coro_results( + co_await coro::when_all(std::move(tasks)) + ), + std::identity{} + )) + { + ctx->comm()->logger().print("Breaking after ", msg.sequence_number()); + break; + }; + } + + rapidsmpf::streaming::coro_results( + co_await coro::when_all( + ch1_out->drain(ctx->executor()), ch2_out->drain(ctx->executor()) + ) + ); +} + +} // namespace + +/** + * @brief Run a derived version of TPCH-query 4. + * + * The SQL form of the query is: + * @code{.sql} + * + * SELECT + * o_orderpriority, + * count(*) as order_count + * FROM + * orders + * where + * o_orderdate >= TIMESTAMP '1993-07-01' + * and o_orderdate < TIMESTAMP '1993-07-01' + INTERVAL '3' MONTH + * and EXISTS ( + * SELECT + * * + * FROM + * lineitem + * WHERE + * l_orderkey = o_orderkey + * and l_commitdate < l_receiptdate + * ) + * GROUP BY + * o_orderpriority + * ORDER BY + * o_orderpriority + * @endcode{} + * + * The "exists" clause is translated into a left-semi join in libcudf. + */ +int main(int argc, char** argv) { + cudaFree(nullptr); + + rapidsmpf::ndsh::FinalizeMPI finalize{}; + cudaFree(nullptr); + // work around https://github.com/rapidsai/cudf/issues/20849 + cudf::initialize(); + auto mr = rmm::mr::cuda_async_memory_resource{}; + auto stats_wrapper = rapidsmpf::RmmResourceAdaptor(&mr); + auto arguments = rapidsmpf::ndsh::parse_arguments(argc, argv); + auto ctx = rapidsmpf::ndsh::create_context(arguments, &stats_wrapper); + std::string output_path = arguments.output_file; + std::vector timings; + + // Detect date column types from parquet metadata before timed section + auto const orders_types = + rapidsmpf::ndsh::detail::get_column_types(arguments.input_directory, "orders"); + bool const orders_use_date32 = + orders_types.at("o_orderdate").id() == cudf::type_id::TIMESTAMP_DAYS; + + int l2size; + int device; + RAPIDSMPF_CUDA_TRY(cudaGetDevice(&device)); + RAPIDSMPF_CUDA_TRY(cudaDeviceGetAttribute(&l2size, cudaDevAttrL2CacheSize, device)); + auto const num_filter_blocks = rapidsmpf::ndsh::BloomFilter::fitting_num_blocks( + static_cast(l2size) + ); + + for (int i = 0; i < arguments.num_iterations; i++) { + rapidsmpf::OpID op_id{0}; + std::vector nodes; + auto start = std::chrono::steady_clock::now(); + { + RAPIDSMPF_NVTX_SCOPED_RANGE("Constructing Q4 pipeline"); + // Convention for channel names: express the *output*. + /* Lineitem Table */ + // [l_commitdate, l_receiptdate, l_orderkey] + auto lineitem = ctx->create_channel(); + // [l_orderkey] + auto filtered_lineitem = ctx->create_channel(); + // [l_orderkey] + auto filtered_lineitem_shuffled = ctx->create_channel(); + + /* Orders Table */ + // [o_orderkey, o_orderpriority] + auto order = ctx->create_channel(); + + // [o_orderpriority] + auto orders_x_lineitem = ctx->create_channel(); + // [o_orderpriority, order_count] + auto grouped_chunkwise = ctx->create_channel(); + + nodes.push_back(read_lineitem( + ctx, lineitem, 4, arguments.num_rows_per_chunk, arguments.input_directory + )); + nodes.push_back( + filter_lineitem(ctx, lineitem, filtered_lineitem) + ); // l_orderkey + nodes.push_back(read_orders( + ctx, + order, + 4, + arguments.num_rows_per_chunk, + arguments.input_directory, + orders_use_date32 + )); + + // Fanout filtered orders: one for bloom filter, one for join + auto bloom_filter_input = ctx->create_channel(); + auto orders_for_join = ctx->create_channel(); + nodes.push_back( + fanout_bounded(ctx, order, bloom_filter_input, {0}, orders_for_join) + ); + + // Build bloom filter from filtered orders' o_orderkey + auto bloom_filter_output = ctx->create_channel(); + nodes.push_back( + rapidsmpf::ndsh::build_bloom_filter( + ctx, + bloom_filter_input, + bloom_filter_output, + static_cast(10 * i + op_id++), + cudf::DEFAULT_HASH_SEED, + num_filter_blocks + ) + ); + + // Apply bloom filter to filtered lineitem before shuffling + auto bloom_filtered_lineitem = ctx->create_channel(); + nodes.push_back( + rapidsmpf::ndsh::apply_bloom_filter( + ctx, + bloom_filter_output, + filtered_lineitem, + bloom_filtered_lineitem, + {0} + ) + ); + + // We unconditionally shuffle the filtered lineitem table. This is + // necessary to correctly handle duplicates in the left-semi join. + // Failing to shuffle (hash partition) the right table on the join + // key could allow a record to match multiple times from the + // multiple partitions of the right table. + + // TODO: configurable num_partitions + std::uint32_t num_partitions = 16; + nodes.push_back( + rapidsmpf::ndsh::shuffle( + ctx, + bloom_filtered_lineitem, + filtered_lineitem_shuffled, + {0}, + num_partitions, + static_cast(10 * i + op_id++) + ) + ); + + if (arguments.use_shuffle_join) { + auto filtered_order_shuffled = ctx->create_channel(); + nodes.push_back( + rapidsmpf::ndsh::shuffle( + ctx, + orders_for_join, + filtered_order_shuffled, + {0}, + num_partitions, + static_cast(10 * i + op_id++) + ) + ); + + nodes.push_back( + rapidsmpf::ndsh::left_semi_join_shuffle( + ctx, + filtered_order_shuffled, + filtered_lineitem_shuffled, + orders_x_lineitem, + {0}, + {0} + ) + ); + } else { + nodes.push_back( + rapidsmpf::ndsh::left_semi_join_broadcast_left( + ctx, + orders_for_join, + filtered_lineitem_shuffled, + orders_x_lineitem, + {0}, + {0}, + static_cast(10 * i + op_id++), + rapidsmpf::ndsh::KeepKeys::NO + ) + ); + } + + nodes.push_back( + rapidsmpf::ndsh::chunkwise_group_by( + ctx, + orders_x_lineitem, + grouped_chunkwise, + {0}, + chunkwise_groupby_requests(), + cudf::null_policy::INCLUDE + ) + ); + auto final_groupby_input = ctx->create_channel(); + if (ctx->comm()->nranks() > 1) { + nodes.push_back( + rapidsmpf::ndsh::broadcast( + ctx, + grouped_chunkwise, + final_groupby_input, + static_cast(10 * i + op_id++), + rapidsmpf::streaming::AllGather::Ordered::NO + ) + ); + } else { + nodes.push_back( + rapidsmpf::ndsh::concatenate( + ctx, grouped_chunkwise, final_groupby_input + ) + ); + } + if (ctx->comm()->rank() == 0) { + auto final_groupby_output = ctx->create_channel(); + nodes.push_back( + rapidsmpf::ndsh::chunkwise_group_by( + ctx, + final_groupby_input, + final_groupby_output, + {0}, + final_groupby_requests(), + cudf::null_policy::INCLUDE + ) + ); + auto sorted_output = ctx->create_channel(); + nodes.push_back( + rapidsmpf::ndsh::chunkwise_sort_by( + ctx, + final_groupby_output, + sorted_output, + {0}, + {0, 1}, + {cudf::order::ASCENDING}, + {cudf::null_order::BEFORE} + ) + ); + nodes.push_back( + rapidsmpf::ndsh::write_parquet( + ctx, + sorted_output, + cudf::io::sink_info(output_path), + {"o_orderpriority", "order_count"} + ) + ); + } else { + nodes.push_back(rapidsmpf::ndsh::sink_channel(ctx, final_groupby_input)); + } + } + auto end = std::chrono::steady_clock::now(); + std::chrono::duration pipeline = end - start; + start = std::chrono::steady_clock::now(); + { + RAPIDSMPF_NVTX_SCOPED_RANGE("Q4 Iteration"); + rapidsmpf::streaming::run_streaming_pipeline(std::move(nodes)); + } + end = std::chrono::steady_clock::now(); + std::chrono::duration compute = end - start; + timings.push_back(pipeline.count()); + timings.push_back(compute.count()); + ctx->comm()->logger().print(ctx->statistics()->report()); + ctx->statistics()->clear(); + } + + if (ctx->comm()->rank() == 0) { + for (int i = 0; i < arguments.num_iterations; i++) { + ctx->comm()->logger().print( + "Iteration ", + i, + " pipeline construction time [s]: ", + timings[size_t(2 * i)] + ); + ctx->comm()->logger().print( + "Iteration ", i, " compute time [s]: ", timings[size_t(2 * i + 1)] + ); + } + } + return 0; +} diff --git a/cpp/benchmarks/streaming/ndsh/utils.hpp b/cpp/benchmarks/streaming/ndsh/utils.hpp index a3d9b09d9..c125a1421 100644 --- a/cpp/benchmarks/streaming/ndsh/utils.hpp +++ b/cpp/benchmarks/streaming/ndsh/utils.hpp @@ -128,6 +128,101 @@ std::unique_ptr make_date_filter( ); } +/** + * @brief Create a date range filter expression. + * + * Creates a filter that checks if a date column falls within a half-open range. + * The operation will be equivalent to + * " >= DATE '' AND < DATE ''". + * + * @tparam timestamp_type The timestamp type to use for the filter scalars + * (e.g., cudf::timestamp_D or cudf::timestamp_ms) + * @param stream CUDA stream to use + * @param start_date The start date (inclusive) of the range + * @param end_date The end date (exclusive) of the range + * @param column_name The name of the column to compare + * @return Filter expression with proper lifetime management + */ +template +std::unique_ptr make_date_range_filter( + rmm::cuda_stream_view stream, + cuda::std::chrono::year_month_day start_date, + cuda::std::chrono::year_month_day end_date, + std::string const& column_name +) { + auto owner = new std::vector; + + // 0: column_reference + owner->push_back(std::make_shared(column_name)); + + // 1, 2: Scalars for start and end dates + owner->push_back( + std::make_shared>( + cuda::std::chrono::sys_days(start_date).time_since_epoch(), true, stream + ) + ); + owner->push_back( + std::make_shared>( + cuda::std::chrono::sys_days(end_date).time_since_epoch(), true, stream + ) + ); + + // 3, 4: Literals for start and end dates + owner->push_back( + std::make_shared( + *std::any_cast>>( + owner->at(1) + ) + ) + ); + owner->push_back( + std::make_shared( + *std::any_cast>>( + owner->at(2) + ) + ) + ); + + // 5: (GE, column, literal) + owner->push_back( + std::make_shared( + cudf::ast::ast_operator::GREATER_EQUAL, + *std::any_cast>( + owner->at(0) + ), + *std::any_cast>(owner->at(3)) + ) + ); + + // 6: (LT, column, literal) + owner->push_back( + std::make_shared( + cudf::ast::ast_operator::LESS, + *std::any_cast>( + owner->at(0) + ), + *std::any_cast>(owner->at(4)) + ) + ); + + // 7: (AND, GE, LT) + owner->push_back( + std::make_shared( + cudf::ast::ast_operator::LOGICAL_AND, + *std::any_cast>(owner->at(5)), + *std::any_cast>(owner->at(6)) + ) + ); + + return std::make_unique( + stream, + *std::any_cast>(owner->back()), + OwningWrapper(static_cast(owner), [](void* p) { + delete static_cast*>(p); + }) + ); +} + /** * @brief Sink messages into a channel and discard them. * diff --git a/cpp/scripts/ndsh.py b/cpp/scripts/ndsh.py index 400d1757a..ced13cb10 100755 --- a/cpp/scripts/ndsh.py +++ b/cpp/scripts/ndsh.py @@ -595,6 +595,21 @@ def cmd_run(args: argparse.Namespace) -> int: return int(failed > 0) +def cmd_run_and_validate(args: argparse.Namespace) -> int: + """Execute the 'run-and-validate' subcommand.""" + # First run the benchmarks + run_result = cmd_run(args) + if run_result != 0: + print("\nRun phase failed, skipping validation.") + return run_result + + # Set up paths for validation based on run output + args.results_path = args.output_dir / "output" + args.expected_path = args.output_dir / "expected" + + return cmd_validate(args) + + def cmd_validate(args: argparse.Namespace) -> int: """Execute the 'validate' subcommand.""" if not args.results_path.exists(): @@ -615,6 +630,17 @@ def cmd_validate(args: argparse.Namespace) -> int: print(f"No qDD.parquet files found in results directory: {args.results_path}") return 1 + # Filter to specific queries if requested + if args.queries: + results_files = { + name: path + for name, path in results_files.items() + if int(name.lstrip("q")) in args.queries + } + if not results_files: + print(f"No matching result files found for queries: {args.queries}") + return 1 + print(f"\nValidating {len(results_files)} query(ies):") # Validate each matching pair @@ -681,13 +707,9 @@ def main(): subparsers = parser.add_subparsers(dest="command", required=True) - # 'run' subcommand - run_parser = subparsers.add_parser( - "run", - help="Run benchmarks and generate expected results", - description="Run C++ benchmark binaries and generate expected results via DuckDB.", - ) - run_parser.add_argument( + # Parent parser for run-related arguments + run_parent = argparse.ArgumentParser(add_help=False) + run_parent.add_argument( "--benchmark-dir", type=Path, help="Directory containing benchmark binaries (q04, q09, etc.)", @@ -695,7 +717,7 @@ def main(): "cpp/build/benchmarks/ndsh" ), ) - run_parser.add_argument( + run_parent.add_argument( "--sql-dir", type=Path, help="Directory containing SQL query files (q04.sql, q09.sql, etc.)", @@ -703,104 +725,133 @@ def main(): "cpp/benchmarks/streaming/ndsh/sql" ), ) - run_parser.add_argument( + run_parent.add_argument( "--input-dir", type=Path, required=True, help="Directory containing TPC-H input parquet files", ) - run_parser.add_argument( + run_parent.add_argument( "--output-dir", type=Path, required=True, help="Directory for output files", ) - parser.add_argument( + run_parent.add_argument( "-q", "--queries", help="Comma-separated list of SQL query numbers to run or the string 'all'", type=query_type, default="all", ) - run_parser.add_argument( + run_parent.add_argument( "--benchmark-args", type=str, default="", help="Additional arguments to pass to benchmark binaries (space-separated)", ) - run_parser.add_argument( + run_parent.add_argument( "--reuse-expected", action="store_true", help="Skip generating expected results if the expected file already exists", ) - run_parser.add_argument( + run_parent.add_argument( "--reuse-output", action="store_true", help="Skip running the benchmark if the output file already exists", ) - run_parser.add_argument( + run_parent.add_argument( "--generate-data", action="store_true", help="Generate data for the benchmarks", ) - # 'validate' subcommand - validate_parser = subparsers.add_parser( - "validate", - help="Compare results against expected", - description="Validate benchmark results by comparing parquet files against expected results.", - ) - validate_parser.add_argument( - "--results-path", - type=Path, - required=True, - help="Directory containing benchmark result parquet files (qDD.parquet)", - ) - validate_parser.add_argument( - "--expected-path", - type=Path, - required=True, - help="Directory containing expected parquet files (qDD.parquet)", - ) - validate_parser.add_argument( + # Parent parser for validation comparison options (not the paths) + validate_options_parent = argparse.ArgumentParser(add_help=False) + validate_options_parent.add_argument( "-d", "--decimal", type=int, default=2, help="Number of decimal places to compare for floating point values (default: 2)", ) - validate_parser.add_argument( + validate_options_parent.add_argument( "--ignore-timezone", action="store_true", help="Ignore differences in timezone and precision for timestamp types", ) - validate_parser.add_argument( + validate_options_parent.add_argument( "--ignore-string-type", action="store_true", help="Ignore differences between string and large_string types", ) - validate_parser.add_argument( + validate_options_parent.add_argument( "--ignore-integer-sign", action="store_true", help="Ignore differences between signed and unsigned integer types", ) - validate_parser.add_argument( + validate_options_parent.add_argument( "--ignore-integer-size", action="store_true", help="Ignore differences in integer bit width (e.g., int32 vs int64)", ) - validate_parser.add_argument( + validate_options_parent.add_argument( "--ignore-decimal-int", action="store_true", help="Ignore differences between decimal and integer types", ) + # 'run' subcommand - inherits from run_parent + subparsers.add_parser( + "run", + parents=[run_parent], + help="Run benchmarks and generate expected results", + description="Run C++ benchmark binaries and generate expected results via DuckDB.", + ) + + # 'validate' subcommand - inherits comparison options, adds its own paths + validate_parser = subparsers.add_parser( + "validate", + parents=[validate_options_parent], + help="Compare results against expected", + description="Validate benchmark results by comparing parquet files against expected results.", + ) + validate_parser.add_argument( + "--results-path", + type=Path, + required=True, + help="Directory containing benchmark result parquet files (qDD.parquet)", + ) + validate_parser.add_argument( + "--expected-path", + type=Path, + required=True, + help="Directory containing expected parquet files (qDD.parquet)", + ) + validate_parser.add_argument( + "-q", + "--queries", + help="Comma-separated list of SQL query numbers to validate or the string 'all'", + type=query_type, + default="all", + ) + + # 'run-and-validate' subcommand - inherits from BOTH parents + subparsers.add_parser( + "run-and-validate", + parents=[run_parent, validate_options_parent], + help="Run benchmarks and validate results in one step", + description="Run C++ benchmark binaries, generate expected results via DuckDB, and validate.", + ) + args = parser.parse_args() if args.command == "run": sys.exit(cmd_run(args)) elif args.command == "validate": sys.exit(cmd_validate(args)) + elif args.command == "run-and-validate": + sys.exit(cmd_run_and_validate(args)) if __name__ == "__main__":