From b971af725a3697d8cad4c26ca95bf3af54e157c2 Mon Sep 17 00:00:00 2001 From: Lawrence Mitchell Date: Tue, 4 Nov 2025 14:10:15 +0000 Subject: [PATCH 01/11] Add a python implementation of TPCH query 9 --- .../examples/streaming/ndsh/__init__.py | 4 + .../rapidsmpf/examples/streaming/ndsh/q09.py | 654 ++++++++++++++++++ 2 files changed, 658 insertions(+) create mode 100644 python/rapidsmpf/rapidsmpf/examples/streaming/ndsh/__init__.py create mode 100644 python/rapidsmpf/rapidsmpf/examples/streaming/ndsh/q09.py diff --git a/python/rapidsmpf/rapidsmpf/examples/streaming/ndsh/__init__.py b/python/rapidsmpf/rapidsmpf/examples/streaming/ndsh/__init__.py new file mode 100644 index 000000000..a32042717 --- /dev/null +++ b/python/rapidsmpf/rapidsmpf/examples/streaming/ndsh/__init__.py @@ -0,0 +1,4 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 + +"""Streaming examples of TPC-H derived queries.""" diff --git a/python/rapidsmpf/rapidsmpf/examples/streaming/ndsh/q09.py b/python/rapidsmpf/rapidsmpf/examples/streaming/ndsh/q09.py new file mode 100644 index 000000000..c5dac7538 --- /dev/null +++ b/python/rapidsmpf/rapidsmpf/examples/streaming/ndsh/q09.py @@ -0,0 +1,654 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 +# ruff: noqa: D103 + +"""Example by hand implementation of a derivation of TPC-H Q9.""" + +from __future__ import annotations + +import itertools +import time +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from typing import TYPE_CHECKING + +import click +import nvtx + +import pylibcudf as plc +import rmm +from pylibcudf.experimental._join_streams import join_streams + +from rapidsmpf.buffer.resource import BufferResource +from rapidsmpf.communicator.single import new_communicator as single_process_comm +from rapidsmpf.config import Options, get_environment_variables +from rapidsmpf.rmm_resource_adaptor import RmmResourceAdaptor +from rapidsmpf.streaming.core.channel import Channel +from rapidsmpf.streaming.core.context import Context +from rapidsmpf.streaming.core.message import Message +from rapidsmpf.streaming.core.node import define_py_node, run_streaming_pipeline +from rapidsmpf.streaming.cudf.parquet import read_parquet +from rapidsmpf.streaming.cudf.table_chunk import TableChunk + +if TYPE_CHECKING: + from collections.abc import Sequence + + from rapidsmpf.streaming.core.node import CppNode, PyNode + + +def get_streaming_context(num_streaming_threads: int) -> Context: + env = get_environment_variables() + env["num_streaming_threads"] = str(num_streaming_threads) + options = Options(env) + comm = single_process_comm(options) + # TODO: multi-GPU, memory limiter, spilling. Need to expose TableChunk::make_available. + mr = RmmResourceAdaptor(rmm.mr.CudaAsyncMemoryResource()) + # TODO: explicit memory resources. Need to expose device_mr to python. + br = BufferResource(mr) + # Note: this must be done even if we use the br's memory resource + # everywhere so that cudf uses this MR for internal allocations. + rmm.mr.set_current_device_resource(mr) + return Context(comm, br, options) + + +def reader_options( + files: Sequence[str], columns: list[str] +) -> plc.io.parquet.ParquetReaderOptions: + source = plc.io.SourceInfo(files) + options = plc.io.parquet.ParquetReaderOptions.builder(source).build() + options.set_columns(columns) + return options + + +def read_lineitem( + ctx: Context, + files: Sequence[str], + num_producers: int, + num_rows_per_chunk: int, + ch: Channel[TableChunk], +) -> CppNode | PyNode: + columns = [ + "l_discount", # 0 + "l_extendedprice", # 1 + "l_orderkey", # 2 + "l_partkey", # 3 + "l_quantity", # 4 + "l_suppkey", + ] + return read_parquet( + ctx, ch, num_producers, reader_options(files, columns), num_rows_per_chunk + ) + + +def read_nation( + ctx: Context, + files: Sequence[str], + num_rows_per_chunk: int, + ch: Channel[TableChunk], +) -> CppNode | PyNode: + columns = [ + "n_name", # 0 + "n_nationkey", # 1 + ] + return read_parquet(ctx, ch, 1, reader_options(files, columns), num_rows_per_chunk) + + +def read_orders( + ctx: Context, + files: Sequence[str], + num_producers: int, + num_rows_per_chunk: int, + ch: Channel[TableChunk], +) -> CppNode | PyNode: + columns = [ + "o_orderdate", # 0 + "o_orderkey", # 1 + ] + return read_parquet( + ctx, ch, num_producers, reader_options(files, columns), num_rows_per_chunk + ) + + +def read_part( + ctx: Context, + files: Sequence[str], + num_producers: int, + num_rows_per_chunk: int, + ch: Channel[TableChunk], +) -> CppNode | PyNode: + columns = [ + "p_partkey", # 0 + "p_name", # 1 + ] + return read_parquet( + ctx, ch, num_producers, reader_options(files, columns), num_rows_per_chunk + ) + + +def read_partsupp( + ctx: Context, + files: Sequence[str], + num_producers: int, + num_rows_per_chunk: int, + ch: Channel[TableChunk], +) -> CppNode | PyNode: + columns = [ + "ps_partkey", # 0 + "ps_suppkey", # 1 + "ps_supplycost", # 2 + ] + return read_parquet( + ctx, ch, num_producers, reader_options(files, columns), num_rows_per_chunk + ) + + +def read_supplier( + ctx: Context, + files: Sequence[str], + num_producers: int, + num_rows_per_chunk: int, + ch: Channel[TableChunk], +) -> CppNode | PyNode: + columns = [ + "s_nationkey", # 0 + "s_suppkey", # 1 + ] + return read_parquet( + ctx, ch, num_producers, reader_options(files, columns), num_rows_per_chunk + ) + + +@define_py_node() +async def filter_part( + ctx: Context, ch_in: Channel[TableChunk], ch_out: Channel[TableChunk] +) -> None: + while (msg := await ch_in.recv(ctx)) is not None: + chunk = TableChunk.from_message(msg) + stream = chunk.stream + table = chunk.table_view() + target = plc.Scalar.from_py("green", stream=stream) + chunk = TableChunk.from_pylibcudf_table( + plc.stream_compaction.apply_boolean_mask( + plc.Table(table.columns()[:1]), + plc.strings.find.contains(table.columns()[1], target, stream), + stream, + ), + stream, + exclusive_view=True, + ) + await ch_out.send(ctx, Message(msg.sequence_number, chunk)) + await ch_out.drain(ctx) + + +@define_py_node() +async def select_columns( + ctx: Context, ch_in: Channel[TableChunk], ch_out: Channel[TableChunk] +) -> None: + while (msg := await ch_in.recv(ctx)) is not None: + chunk = TableChunk.from_message(msg) + stream = chunk.stream + columns = chunk.table_view().columns() + name, supplycost, discount, extendedprice, quantity, orderdate = columns + revenue_type = supplycost.type() # float64 + orderdate = plc.datetime.extract_datetime_component( + orderdate, plc.datetime.DatetimeComponent.YEAR, stream + ) + revenue = plc.transform.transform( + [discount, extendedprice, supplycost, quantity], + """ + static __device__ void calculate_amount( + double *amount, double discount, double extprice, double supplycost, double quantity + ) { + *amount = extprice * (1 - discount) - supplycost * quantity; + } + """, + revenue_type, + False, # noqa: FBT003 + plc.types.NullAware.NO, + stream, + ) + await ch_out.send( + ctx, + Message( + msg.sequence_number, + TableChunk.from_pylibcudf_table( + plc.Table([name, orderdate, revenue]), + stream, + exclusive_view=True, + ), + ), + ) + await ch_out.drain(ctx) + + +@define_py_node() +async def broadcast_join( + ctx: Context, + left_ch: Channel[TableChunk], + right_ch: Channel[TableChunk], + ch_out: Channel[TableChunk], + left_on: Sequence[int], + right_on: Sequence[int], + *, + keep_keys: bool, +) -> None: + left_tables: list[TableChunk] = [] + chunk_streams = set() + while (msg := await left_ch.recv(ctx)) is not None: + left_tables.append(TableChunk.from_message(msg)) + chunk_streams.add(left_tables[-1].stream) + build_stream = ctx.get_stream_from_pool() + join_streams(list(chunk_streams), build_stream) + if len(left_tables) == 1: + left = left_tables[0].table_view().columns() + else: + left = plc.concatenate.concatenate( + [t.table_view() for t in left_tables], build_stream + ).columns() + left_keys = plc.Table([left[i] for i in left_on]) + if keep_keys: + left_carrier = plc.Table(left) + else: + left_carrier = plc.Table([c for i, c in enumerate(left) if i not in left_on]) + for s in chunk_streams: + join_streams([build_stream], s) + del left_tables + sequence_number = 0 + chunk_streams.clear() + while (msg := await right_ch.recv(ctx)) is not None: + chunk = TableChunk.from_message(msg) + chunk_streams.add(chunk.stream) + join_streams([build_stream], chunk.stream) + # Safe to access left_carrier on chunk.stream + right_columns = chunk.table_view().columns() + right_keys = plc.Table([right_columns[i] for i in right_on]) + right_carrier = plc.Table( + [c for i, c in enumerate(right_columns) if i not in right_on] + ) + left, right = plc.join.inner_join( + left_keys, right_keys, plc.types.NullEquality.UNEQUAL, chunk.stream + ) + left = plc.copying.gather( + left_carrier, + left, + plc.copying.OutOfBoundsPolicy.DONT_CHECK, + chunk.stream, + ) + right = plc.copying.gather( + right_carrier, + right, + plc.copying.OutOfBoundsPolicy.DONT_CHECK, + chunk.stream, + ) + await ch_out.send( + ctx, + Message( + sequence_number, + TableChunk.from_pylibcudf_table( + plc.Table([*left.columns(), *right.columns()]), + chunk.stream, + exclusive_view=True, + ), + ), + ) + sequence_number += 1 + # Ensure left_carrier and keys are deallocated after table chunks are produced + for s in chunk_streams: + join_streams([build_stream], s) + await ch_out.drain(ctx) + + +@define_py_node() +async def chunkwise_groupby_agg( + ctx: Context, ch_in: Channel[TableChunk], ch_out: Channel[TableChunk] +) -> None: + sequence = 0 + while (msg := await ch_in.recv(ctx)) is not None: + chunk = TableChunk.from_message(msg) + name, date, revenue = chunk.table_view().columns() + stream = chunk.stream + grouper = plc.groupby.GroupBy( + plc.Table([name, date]), + plc.types.NullPolicy.EXCLUDE, + plc.types.Sorted.NO, + ) + reqs = [plc.groupby.GroupByRequest(revenue, [plc.aggregation.sum()])] + (keys, results) = grouper.aggregate(reqs, stream) + del chunk, name, date, revenue + await ch_out.send( + ctx, + Message( + sequence, + TableChunk.from_pylibcudf_table( + plc.Table( + [ + *keys.columns(), + *itertools.chain.from_iterable( + r.columns() for r in results + ), + ] + ), + stream, + exclusive_view=True, + ), + ), + ) + sequence += 1 + await ch_out.drain(ctx) + + +@define_py_node() +async def concatenate( + ctx: Context, ch_in: Channel[TableChunk], ch_out: Channel[TableChunk] +) -> None: + chunks = [] + build_stream = ctx.get_stream_from_pool() + chunk_streams = set() + while (msg := await ch_in.recv(ctx)) is not None: + chunk = TableChunk.from_message(msg) + chunks.append(chunk) + chunk_streams.add(chunk.stream) + join_streams(list(chunk_streams), build_stream) + table = plc.concatenate.concatenate( + [chunk.table_view() for chunk in chunks], build_stream + ) + for s in chunk_streams: + join_streams([build_stream], s) + await ch_out.send( + ctx, + Message( + 0, TableChunk.from_pylibcudf_table(table, build_stream, exclusive_view=True) + ), + ) + await ch_out.drain(ctx) + + +@define_py_node() +async def sort_by_and_round( + ctx: Context, ch_in: Channel[TableChunk], ch_out: Channel[TableChunk] +) -> None: + msg = await ch_in.recv(ctx) + if msg is None: + raise RuntimeError("Expecting a chunk in sort by") + if await ch_in.recv(ctx) is not None: + raise RuntimeError("Only expecting a single chunk") + chunk = TableChunk.from_message(msg) + name, date, revenue = chunk.table_view().columns() + stream = chunk.stream + revenue = plc.round.round(revenue, 2, plc.round.RoundingMethod.HALF_EVEN, stream) + await ch_out.send( + ctx, + Message( + 0, + TableChunk.from_pylibcudf_table( + plc.sorting.sort_by_key( + plc.Table([name, date, revenue]), + plc.Table([name, date]), + [plc.types.Order.ASCENDING, plc.types.Order.DESCENDING], + [plc.types.NullOrder.BEFORE, plc.types.NullOrder.BEFORE], + stream, + ), + stream, + exclusive_view=True, + ), + ), + ) + await ch_out.drain(ctx) + + +@define_py_node() +async def write_parquet( + ctx: Context, ch_in: Channel[TableChunk], filename: Path +) -> None: + msg = await ch_in.recv(ctx) + if msg is None: + raise RuntimeError("Expecting a chunk in write_parquet") + if await ch_in.recv(ctx) is not None: + raise RuntimeError("Only expecting a single chunk in write_parquet") + chunk = TableChunk.from_message(msg) + sink = plc.io.SinkInfo([filename]) + builder = plc.io.parquet.ParquetWriterOptions.builder(sink, chunk.table_view()) + metadata = plc.io.types.TableInputMetadata(chunk.table_view()) + metadata.column_metadata[0].set_name("nation") + metadata.column_metadata[1].set_name("o_year") + metadata.column_metadata[2].set_name("sum_profit") + options = builder.metadata(metadata).build() + plc.io.parquet.write_parquet(options, chunk.stream) + + +def get_files(base: str, suffix: str) -> list[str]: + path = Path(base) + if path.is_dir(): + files = sorted(path.glob(f"*.{suffix}")) + if len(files) == 0: + raise RuntimeError(f"No parquet files found in {path}") + return [str(f) for f in files] + else: + path = path.with_suffix(f".{suffix}") + if not path.exists(): + raise RuntimeError(f"File {path} does not exist") + return [str(path)] + + +def q( + ctx: Context, + num_rows_per_chunk: int, + num_producers_per_read: int, + output: str, + parquet_suffix: str, + lineitem: str, + nation: str, + orders: str, + part: str, + partsupp: str, + supplier: str, +) -> list[CppNode | PyNode]: + lineitem_files = get_files(lineitem, parquet_suffix) + part_files = get_files(part, parquet_suffix) + partsupp_files = get_files(partsupp, parquet_suffix) + supplier_files = get_files(supplier, parquet_suffix) + orders_files = get_files(orders, parquet_suffix) + nation_files = get_files(nation, parquet_suffix) + nodes: list[CppNode | PyNode] = [] + lineitem_ch = Channel[TableChunk]() + part_ch = Channel[TableChunk]() + filtered_part = Channel[TableChunk]() + partsupp_ch = Channel[TableChunk]() + supplier_ch = Channel[TableChunk]() + orders_ch = Channel[TableChunk]() + nation_ch = Channel[TableChunk]() + part_x_partsupp = Channel[TableChunk]() + supplier_x_part_x_partsupp = Channel[TableChunk]() + supplier_x_part_x_partsupp_x_lineitem = Channel[TableChunk]() + supplier_x_part_x_partsupp_x_lineitem_x_orders = Channel[TableChunk]() + all_joined = Channel[TableChunk]() + groupby_input = Channel[TableChunk]() + nodes.append( + read_part(ctx, part_files, num_producers_per_read, num_rows_per_chunk, part_ch) + ) + nodes.append( + read_partsupp( + ctx, partsupp_files, num_producers_per_read, num_rows_per_chunk, partsupp_ch + ) + ) + nodes.append( + read_supplier( + ctx, supplier_files, num_producers_per_read, num_rows_per_chunk, supplier_ch + ) + ) + nodes.append( + read_lineitem( + ctx, lineitem_files, num_producers_per_read, num_rows_per_chunk, lineitem_ch + ) + ) + nodes.append( + read_orders( + ctx, orders_files, num_producers_per_read, num_rows_per_chunk, orders_ch + ) + ) + # Nation is tiny so only launch a single producer + nodes.append(read_nation(ctx, nation_files, num_rows_per_chunk, nation_ch)) + nodes.append(filter_part(ctx, part_ch, filtered_part)) + nodes.append( + broadcast_join( + ctx, filtered_part, partsupp_ch, part_x_partsupp, [0], [0], keep_keys=True + ) + ) + nodes.append( + broadcast_join( + ctx, + supplier_ch, + part_x_partsupp, + supplier_x_part_x_partsupp, + [1], + [1], + keep_keys=True, + ) + ) + nodes.append( + broadcast_join( + ctx, + supplier_x_part_x_partsupp, + lineitem_ch, + supplier_x_part_x_partsupp_x_lineitem, + [2, 1], + [3, 5], + keep_keys=False, + ) + ) + nodes.append( + broadcast_join( + ctx, + supplier_x_part_x_partsupp_x_lineitem, + orders_ch, + supplier_x_part_x_partsupp_x_lineitem_x_orders, + [4], + [1], + keep_keys=False, + ) + ) + nodes.append( + broadcast_join( + ctx, + nation_ch, + supplier_x_part_x_partsupp_x_lineitem_x_orders, + all_joined, + [1], + [0], + keep_keys=False, + ) + ) + nodes.append(select_columns(ctx, all_joined, groupby_input)) + groupby_output = Channel[TableChunk]() + nodes.append(chunkwise_groupby_agg(ctx, groupby_input, groupby_output)) + concat_output = Channel[TableChunk]() + nodes.append(concatenate(ctx, groupby_output, concat_output)) + final_grouped = Channel[TableChunk]() + nodes.append(chunkwise_groupby_agg(ctx, concat_output, final_grouped)) + sorted = Channel[TableChunk]() + nodes.append(sort_by_and_round(ctx, final_grouped, sorted)) + nodes.append(write_parquet(ctx, sorted, Path(output))) + return nodes + + +@click.command() +@click.option( + "--num-iterations", default=2, help="Number of iterations of the query to run" +) +@click.option("--output", default="result.pq", help="Output result file") +@click.option( + "--num-rows-per-chunk", + default=50_000_000, + help="Number of rows read in a single chunk from input tables", +) +@click.option( + "--num-producers-per-read", + default=4, + help="Number of producer tasks for each parquet read", +) +@click.option( + "--num-streaming-threads", + default=8, + help="Number of threads C++ executor should use", +) +@click.option( + "--num-py-streaming-threads", + default=1, + help="Number of threads Python executor should use", +) +@click.option( + "--parquet-suffix", default="parquet", help="Suffix to append to find parquet files" +) +@click.option( + "--lineitem", + default="lineitem", + help="Name of file (with suffix appended) or name of directory containing lineitem files", +) +@click.option( + "--nation", + default="nation", + help="Name of file (with suffix appended) or name of directory containing nation files", +) +@click.option( + "--orders", + default="orders", + help="Name of file (with suffix appended) or name of directory containing orders files", +) +@click.option( + "--part", + default="part", + help="Name of file (with suffix appended) or name of directory containing part files", +) +@click.option( + "--partsupp", + default="partsupp", + help="Name of file (with suffix appended) or name of directory containing partsupp files", +) +@click.option( + "--supplier", + default="supplier", + help="Name of file (with suffix appended) or name of directory containing supplier files", +) +def main( + num_iterations: int, + output: str, + num_rows_per_chunk: int, + num_producers_per_read: int, + num_streaming_threads: int, + num_py_streaming_threads: int, + parquet_suffix: str, + lineitem: str, + nation: str, + orders: str, + part: str, + partsupp: str, + supplier: str, +) -> None: + py_exec = ThreadPoolExecutor(max_workers=num_py_streaming_threads) + ctx = get_streaming_context(num_streaming_threads) + for i in range(num_iterations): + start = time.perf_counter() + nodes = q( + ctx, + num_rows_per_chunk, + num_producers_per_read, + output, + parquet_suffix, + lineitem, + nation, + orders, + part, + partsupp, + supplier, + ) + end = time.perf_counter() + print(f"Iteration {i}: Pipeline construction {end - start:.4g}s") + with nvtx.annotate(message="Q9 iteration", color="blue", domain="rapidsmpf"): + start = time.perf_counter() + run_streaming_pipeline(nodes=nodes, py_executor=py_exec) + end = time.perf_counter() + print(f"Iteration {i}: Pipeline execution {end - start:.4g}s") + + +if __name__ == "__main__": + main() From ba0b07001d30d8830753e949c5abc41219436196 Mon Sep 17 00:00:00 2001 From: Benjamin Zaitlen Date: Thu, 4 Dec 2025 13:57:09 -0800 Subject: [PATCH 02/11] initial attempt at q17 --- cpp/benchmarks/streaming/ndsh/CMakeLists.txt | 21 +- cpp/benchmarks/streaming/ndsh/q17.cpp | 1072 ++++++++++++++++++ 2 files changed, 1092 insertions(+), 1 deletion(-) create mode 100644 cpp/benchmarks/streaming/ndsh/q17.cpp diff --git a/cpp/benchmarks/streaming/ndsh/CMakeLists.txt b/cpp/benchmarks/streaming/ndsh/CMakeLists.txt index 6fa4bd27b..b45f3a11a 100644 --- a/cpp/benchmarks/streaming/ndsh/CMakeLists.txt +++ b/cpp/benchmarks/streaming/ndsh/CMakeLists.txt @@ -53,6 +53,25 @@ target_link_libraries( q09 PRIVATE rapidsmpfndsh rapidsmpf::rapidsmpf $ $ maybe_asan ) + +add_executable(q17 "q17.cpp") +set_target_properties( + q17 + PROPERTIES RUNTIME_OUTPUT_DIRECTORY "$" + CXX_STANDARD 20 + CXX_STANDARD_REQUIRED ON + CUDA_STANDARD 20 + CUDA_STANDARD_REQUIRED ON +) +target_compile_options( + q17 PRIVATE "$<$:${RAPIDSMPF_CXX_FLAGS}>" + "$<$:${RAPIDSMPF_CUDA_FLAGS}>" +) +target_link_libraries( + q17 PRIVATE rapidsmpfndsh rapidsmpf::rapidsmpf $ + $ maybe_asan +) + install( TARGETS rapidsmpfndsh COMPONENT benchmarking @@ -60,7 +79,7 @@ install( EXCLUDE_FROM_ALL ) install( - TARGETS q09 + TARGETS q09 q17 COMPONENT benchmarking DESTINATION bin/benchmarks/librapidsmpf EXCLUDE_FROM_ALL diff --git a/cpp/benchmarks/streaming/ndsh/q17.cpp b/cpp/benchmarks/streaming/ndsh/q17.cpp new file mode 100644 index 000000000..807169c00 --- /dev/null +++ b/cpp/benchmarks/streaming/ndsh/q17.cpp @@ -0,0 +1,1072 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "concatenate.hpp" +#include "join.hpp" +#include "utils.hpp" + +namespace { + +std::string get_table_path( + std::string const& input_directory, std::string const& table_name +) { + auto dir = input_directory.empty() ? "." : input_directory; + auto file_path = dir + "/" + table_name + ".parquet"; + if (std::filesystem::exists(file_path)) { + return file_path; + } + return dir + "/" + table_name + "/"; +} + +rapidsmpf::streaming::Node read_lineitem( + std::shared_ptr ctx, + std::shared_ptr ch_out, + std::size_t num_producers, + cudf::size_type num_rows_per_chunk, + std::string const& input_directory +) { + auto files = rapidsmpf::ndsh::detail::list_parquet_files( + get_table_path(input_directory, "lineitem") + ); + auto options = cudf::io::parquet_reader_options::builder(cudf::io::source_info(files)) + .columns({"l_partkey", "l_quantity", "l_extendedprice"}) + .build(); + return rapidsmpf::streaming::node::read_parquet( + ctx, ch_out, num_producers, options, num_rows_per_chunk + ); +} + +rapidsmpf::streaming::Node read_part( + std::shared_ptr ctx, + std::shared_ptr ch_out, + std::size_t num_producers, + cudf::size_type num_rows_per_chunk, + std::string const& input_directory +) { + auto files = rapidsmpf::ndsh::detail::list_parquet_files( + get_table_path(input_directory, "part") + ); + auto options = cudf::io::parquet_reader_options::builder(cudf::io::source_info(files)) + .columns({"p_partkey", "p_brand", "p_container"}) + .build(); + return rapidsmpf::streaming::node::read_parquet( + ctx, ch_out, num_producers, options, num_rows_per_chunk + ); +} + +rapidsmpf::streaming::Node filter_part( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + auto mr = ctx->br()->device_mr(); + while (true) { + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + break; + } + co_await ctx->executor()->schedule(); + auto chunk = rapidsmpf::ndsh::to_device( + ctx, msg.release() + ); + auto chunk_stream = chunk.stream(); + auto table = chunk.table_view(); + auto p_brand = table.column(1); + auto p_container = table.column(2); + + auto brand_target = cudf::make_string_scalar("Brand#23", chunk_stream, mr); + auto container_target = cudf::make_string_scalar("MED BOX", chunk_stream, mr); + + auto brand_mask = cudf::binary_operation( + p_brand, + *brand_target, + cudf::binary_operator::EQUAL, + cudf::data_type(cudf::type_id::BOOL8), + chunk_stream, + mr + ); + auto container_mask = cudf::binary_operation( + p_container, + *container_target, + cudf::binary_operator::EQUAL, + cudf::data_type(cudf::type_id::BOOL8), + chunk_stream, + mr + ); + auto combined_mask = cudf::binary_operation( + brand_mask->view(), + container_mask->view(), + cudf::binary_operator::LOGICAL_AND, + cudf::data_type(cudf::type_id::BOOL8), + chunk_stream, + mr + ); + + co_await ch_out->send( + rapidsmpf::streaming::to_message( + msg.sequence_number(), + std::make_unique( + cudf::apply_boolean_mask( + table.select({0}), combined_mask->view(), chunk_stream, mr + ), + chunk_stream + ) + ) + ); + } + co_await ch_out->drain(ctx->executor()); +} + +// Node to compute sum and count of quantity per partkey +rapidsmpf::streaming::Node compute_avg_quantity( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + std::uint64_t sequence = 0; + while (true) { + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + break; + } + co_await ctx->executor()->schedule(); + auto chunk = rapidsmpf::ndsh::to_device( + ctx, msg.release() + ); + auto chunk_stream = chunk.stream(); + auto table = chunk.table_view(); + + // table has: p_partkey, l_quantity, l_extendedprice + // Group by p_partkey and compute sum and count of l_quantity + auto grouper = cudf::groupby::groupby( + table.select({0}), cudf::null_policy::EXCLUDE, cudf::sorted::NO + ); + auto requests = std::vector(); + std::vector> sum_aggs; + sum_aggs.push_back(cudf::make_sum_aggregation()); + std::vector> count_aggs; + count_aggs.push_back(cudf::make_count_aggregation()); + requests.push_back( + cudf::groupby::aggregation_request(table.column(1), std::move(sum_aggs)) + ); + requests.push_back( + cudf::groupby::aggregation_request(table.column(1), std::move(count_aggs)) + ); + auto [keys, results] = + grouper.aggregate(requests, chunk_stream, ctx->br()->device_mr()); + + // Output: p_partkey, sum(l_quantity), count(l_quantity) + std::vector> result; + result.push_back(std::move(keys->release()[0])); + result.push_back(std::move(results[0].results[0])); // sum + result.push_back(std::move(results[1].results[0])); // count + + co_await ch_out->send( + rapidsmpf::streaming::to_message( + sequence++, + std::make_unique( + std::make_unique(std::move(result)), chunk_stream + ) + ) + ); + } + co_await ch_out->drain(ctx->executor()); +} + +// Final aggregation after the second join and filter +rapidsmpf::streaming::Node final_aggregation( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out, + rapidsmpf::OpID tag +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + co_await ctx->executor()->schedule(); + + // Concatenated input is expected + auto msg = co_await ch_in->receive(); + auto next = co_await ch_in->receive(); + ctx->comm()->logger().print("Final aggregation"); + RAPIDSMPF_EXPECTS(next.empty(), "Expecting concatenated input at this point"); + + auto chunk = + rapidsmpf::ndsh::to_device(ctx, msg.release()); + auto chunk_stream = chunk.stream(); + auto table = chunk.table_view(); + + std::unique_ptr local_result{nullptr}; + if (!table.is_empty()) { + // table has: key, avg_quantity, l_quantity, l_extendedprice + // Filter: l_quantity < avg_quantity + auto l_quantity = table.column(2); + auto avg_quantity = table.column(1); + auto filter_mask = cudf::binary_operation( + l_quantity, + avg_quantity, + cudf::binary_operator::LESS, + cudf::data_type(cudf::type_id::BOOL8), + chunk_stream, + ctx->br()->device_mr() + ); + auto filtered = cudf::apply_boolean_mask( + table, filter_mask->view(), chunk_stream, ctx->br()->device_mr() + ); + + // Sum l_extendedprice (don't divide yet - we need to sum across ranks first!) + if (filtered->num_rows() > 0) { + auto l_extendedprice = filtered->view().column(3); + auto sum_agg = cudf::make_sum_aggregation(); + auto sum_result = cudf::reduce( + l_extendedprice, + *sum_agg, + cudf::data_type(cudf::type_id::FLOAT64), + chunk_stream, + ctx->br()->device_mr() + ); + + // Store the sum (not yet divided) as a column + auto sum_val = static_cast&>(*sum_result) + .value(chunk_stream); + auto sum_scalar = cudf::make_numeric_scalar( + cudf::data_type(cudf::type_id::FLOAT64), + chunk_stream, + ctx->br()->device_mr() + ); + static_cast*>(sum_scalar.get()) + ->set_value(sum_val, chunk_stream); + + std::vector> result_cols; + result_cols.push_back( + cudf::make_column_from_scalar( + *sum_scalar, 1, chunk_stream, ctx->br()->device_mr() + ) + ); + local_result = std::make_unique(std::move(result_cols)); + } + } + + if (ctx->comm()->nranks() > 1) { + // Gather results from all ranks + rapidsmpf::streaming::AllGather gatherer{ctx, tag}; + if (local_result) { + auto pack = + cudf::pack(local_result->view(), chunk_stream, ctx->br()->device_mr()); + gatherer.insert( + 0, + {rapidsmpf::PackedData( + std::move(pack.metadata), + ctx->br()->move(std::move(pack.gpu_data), chunk_stream) + )} + ); + } + gatherer.insert_finished(); + auto packed_data = + co_await gatherer.extract_all(rapidsmpf::streaming::AllGather::Ordered::NO); + if (ctx->comm()->rank() == 0) { + auto global_result = rapidsmpf::unpack_and_concat( + rapidsmpf::unspill_partitions( + std::move(packed_data), ctx->br(), true, ctx->statistics() + ), + chunk_stream, + ctx->br(), + ctx->statistics() + ); + + // Sum all the partial sums, THEN divide by 7.0 + if (global_result && global_result->num_rows() > 0) { + auto sum_agg = cudf::make_sum_aggregation(); + auto sum_result = cudf::reduce( + global_result->view().column(0), + *sum_agg, + cudf::data_type(cudf::type_id::FLOAT64), + chunk_stream, + ctx->br()->device_mr() + ); + + // Now divide by 7.0 + auto total_sum = static_cast&>(*sum_result) + .value(chunk_stream); + auto avg_yearly_val = total_sum / 7.0; + auto avg_yearly_scalar = cudf::make_numeric_scalar( + cudf::data_type(cudf::type_id::FLOAT64), + chunk_stream, + ctx->br()->device_mr() + ); + static_cast*>(avg_yearly_scalar.get()) + ->set_value(avg_yearly_val, chunk_stream); + + std::vector> result_cols; + result_cols.push_back( + cudf::make_column_from_scalar( + *avg_yearly_scalar, 1, chunk_stream, ctx->br()->device_mr() + ) + ); + + co_await ch_out->send( + rapidsmpf::streaming::to_message( + 0, + std::make_unique( + std::make_unique(std::move(result_cols)), + chunk_stream + ) + ) + ); + } + } + } else { + // Single rank: divide by 7.0 here + if (local_result) { + auto sum_val = + static_cast*>( + cudf::reduce( + local_result->view().column(0), + *cudf::make_sum_aggregation(), + cudf::data_type(cudf::type_id::FLOAT64), + chunk_stream, + ctx->br()->device_mr() + ) + .get() + ) + ->value(chunk_stream); + + auto avg_yearly_val = sum_val / 7.0; + auto avg_yearly_scalar = cudf::make_numeric_scalar( + cudf::data_type(cudf::type_id::FLOAT64), + chunk_stream, + ctx->br()->device_mr() + ); + static_cast*>(avg_yearly_scalar.get()) + ->set_value(avg_yearly_val, chunk_stream); + + std::vector> result_cols; + result_cols.push_back( + cudf::make_column_from_scalar( + *avg_yearly_scalar, 1, chunk_stream, ctx->br()->device_mr() + ) + ); + + co_await ch_out->send( + rapidsmpf::streaming::to_message( + 0, + std::make_unique( + std::make_unique(std::move(result_cols)), + chunk_stream + ) + ) + ); + } + } + co_await ch_out->drain(ctx->executor()); +} + +rapidsmpf::streaming::Node round_result( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + co_await ctx->executor()->schedule(); + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + co_return; + } + ctx->comm()->logger().print("Round result"); + auto chunk = + rapidsmpf::ndsh::to_device(ctx, msg.release()); + auto table = chunk.table_view(); + auto rounded = cudf::round( + table.column(0), + 2, + cudf::rounding_method::HALF_EVEN, + chunk.stream(), + ctx->br()->device_mr() + ); + + std::vector> result_cols; + result_cols.push_back(std::move(rounded)); + + auto result = rapidsmpf::streaming::to_message( + 0, + std::make_unique( + std::make_unique(std::move(result_cols)), chunk.stream() + ) + ); + co_await ch_out->send(std::move(result)); + co_await ch_out->drain(ctx->executor()); +} + +rapidsmpf::streaming::Node write_parquet( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::string output_path +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in}; + co_await ctx->executor()->schedule(); + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + co_return; + } + ctx->comm()->logger().print("write parquet"); + auto chunk = + rapidsmpf::ndsh::to_device(ctx, msg.release()); + auto sink = cudf::io::sink_info(output_path); + auto builder = cudf::io::parquet_writer_options::builder(sink, chunk.table_view()); + auto metadata = cudf::io::table_input_metadata(chunk.table_view()); + metadata.column_metadata[0].set_name("avg_yearly"); + builder = builder.metadata(metadata); + auto options = builder.build(); + cudf::io::write_parquet(options, chunk.stream()); + ctx->comm()->logger().print( + "Wrote chunk with ", + chunk.table_view().num_rows(), + " rows and ", + chunk.table_view().num_columns(), + " columns to ", + output_path + ); +} + +} // namespace + +struct ProgramOptions { + int num_streaming_threads{1}; + cudf::size_type num_rows_per_chunk{100'000'000}; + std::optional spill_device_limit{std::nullopt}; + bool use_shuffle_join = false; + std::string output_file; + std::string input_directory; +}; + +ProgramOptions parse_options(int argc, char** argv) { + ProgramOptions options; + + auto print_usage = [&argv]() { + std::cerr + << "Usage: " << argv[0] << " [options]\n" + << "Options:\n" + << " --num-streaming-threads Number of streaming threads (default: 1)\n" + << " --num-rows-per-chunk Number of rows per chunk (default: " + "100000000)\n" + << " --spill-device-limit Fractional spill device limit (default: " + "None)\n" + << " --use-shuffle-join Use shuffle join (default: false)\n" + << " --output-file Output file path (required)\n" + << " --input-directory Input directory path (required)\n" + << " --help Show this help message\n"; + }; + + static struct option long_options[] = { + {"num-streaming-threads", required_argument, nullptr, 1}, + {"num-rows-per-chunk", required_argument, nullptr, 2}, + {"use-shuffle-join", no_argument, nullptr, 3}, + {"output-file", required_argument, nullptr, 4}, + {"input-directory", required_argument, nullptr, 5}, + {"help", no_argument, nullptr, 6}, + {"spill-device-limit", required_argument, nullptr, 7}, + {nullptr, 0, nullptr, 0} + }; + + int opt; + int option_index = 0; + + bool saw_output_file = false; + bool saw_input_directory = false; + + while ((opt = getopt_long(argc, argv, "", long_options, &option_index)) != -1) { + switch (opt) { + case 1: + options.num_streaming_threads = std::atoi(optarg); + break; + case 2: + options.num_rows_per_chunk = std::atoi(optarg); + break; + case 3: + options.use_shuffle_join = true; + break; + case 4: + options.output_file = optarg; + saw_output_file = true; + break; + case 5: + options.input_directory = optarg; + saw_input_directory = true; + break; + case 6: + print_usage(); + std::exit(0); + case 7: + options.spill_device_limit = std::stod(optarg); + break; + case '?': + if (optopt == 0 && optind > 1) { + std::cerr << "Error: Unknown option '" << argv[optind - 1] << "'\n\n"; + } + print_usage(); + std::exit(1); + default: + print_usage(); + std::exit(1); + } + } + + // Check if required options were provided + if (!saw_output_file || !saw_input_directory) { + if (!saw_output_file) { + std::cerr << "Error: --output-file is required\n"; + } + if (!saw_input_directory) { + std::cerr << "Error: --input-directory is required\n"; + } + std::cerr << std::endl; + print_usage(); + std::exit(1); + } + + return options; +} + +/** + * @brief Run a derived version of TPC-H query 17. + * + * The SQL form of the query is: + * @code{.sql} + * select + * round(sum(l_extendedprice) / 7.0, 2) as avg_yearly + * from + * lineitem, + * part + * where + * p_partkey = l_partkey + * and p_brand = 'Brand#23' + * and p_container = 'MED BOX' + * and l_quantity < ( + * select + * 0.2 * avg(l_quantity) + * from + * lineitem + * where + * l_partkey = p_partkey + * ) + * @endcode{} + */ +int main(int argc, char** argv) { + cudaFree(nullptr); + rapidsmpf::mpi::init(&argc, &argv); + MPI_Comm mpi_comm; + RAPIDSMPF_MPI(MPI_Comm_dup(MPI_COMM_WORLD, &mpi_comm)); + auto cmd_options = parse_options(argc, argv); + auto limit_size = rmm::percent_of_free_device_memory( + static_cast(cmd_options.spill_device_limit.value_or(1) * 100) + ); + rmm::mr::cuda_async_memory_resource mr{}; + auto stats_mr = rapidsmpf::RmmResourceAdaptor(&mr); + rmm::device_async_resource_ref mr_ref(stats_mr); + rmm::mr::set_current_device_resource(&stats_mr); + rmm::mr::set_current_device_resource_ref(mr_ref); + std::unordered_map + memory_available{}; + if (cmd_options.spill_device_limit.has_value()) { + memory_available[rapidsmpf::MemoryType::DEVICE] = rapidsmpf::LimitAvailableMemory{ + &stats_mr, static_cast(limit_size) + }; + } + auto br = std::make_shared( + stats_mr, std::move(memory_available) + ); + auto envvars = rapidsmpf::config::get_environment_variables(); + envvars["num_streaming_threads"] = std::to_string(cmd_options.num_streaming_threads); + auto options = rapidsmpf::config::Options(envvars); + auto stats = std::make_shared(&stats_mr); + { + auto comm = rapidsmpf::ucxx::init_using_mpi(mpi_comm, options); + auto progress = + std::make_shared(comm->logger(), stats); + auto ctx = + std::make_shared(options, comm, br, stats); + comm->logger().print( + "Executor has ", ctx->executor()->thread_count(), " threads" + ); + comm->logger().print("Executor has ", ctx->comm()->nranks(), " ranks"); + + std::string output_path = cmd_options.output_file; + std::vector timings; + for (int i = 0; i < 2; i++) { + rapidsmpf::OpID op_id{0}; + std::vector nodes; + auto start = std::chrono::steady_clock::now(); + { + RAPIDSMPF_NVTX_SCOPED_RANGE("Constructing Q17 pipeline"); + + // Read part and filter + auto part = ctx->create_channel(); + auto filtered_part = ctx->create_channel(); + nodes.push_back(read_part( + ctx, + part, + /* num_tickets */ 4, + cmd_options.num_rows_per_chunk, + cmd_options.input_directory + )); // p_partkey, p_brand, p_container + nodes.push_back(filter_part(ctx, part, filtered_part)); // p_partkey + + // Read lineitem + auto lineitem = ctx->create_channel(); + nodes.push_back(read_lineitem( + ctx, + lineitem, + /* num_tickets */ 4, + cmd_options.num_rows_per_chunk, + cmd_options.input_directory + )); // l_partkey, l_quantity, l_extendedprice + + // Inner join: part x lineitem on p_partkey = l_partkey + auto part_x_lineitem = ctx->create_channel(); + if (cmd_options.use_shuffle_join) { + auto filtered_part_shuffled = ctx->create_channel(); + auto lineitem_shuffled = ctx->create_channel(); + std::uint32_t num_partitions = 16; + nodes.push_back( + rapidsmpf::ndsh::shuffle( + ctx, + filtered_part, + filtered_part_shuffled, + {0}, + num_partitions, + rapidsmpf::OpID{ + static_cast(10 * i + op_id++) + } + ) + ); + nodes.push_back( + rapidsmpf::ndsh::shuffle( + ctx, + lineitem, + lineitem_shuffled, + {0}, + num_partitions, + rapidsmpf::OpID{ + static_cast(10 * i + op_id++) + } + ) + ); + nodes.push_back( + rapidsmpf::ndsh::inner_join_shuffle( + ctx, + filtered_part_shuffled, + lineitem_shuffled, + part_x_lineitem, + {0}, + {0}, + rapidsmpf::ndsh::KeepKeys::YES + ) // p_partkey, l_quantity, l_extendedprice + ); + } else { + nodes.push_back( + rapidsmpf::ndsh::inner_join_broadcast( + ctx, + filtered_part, + lineitem, + part_x_lineitem, + {0}, + {0}, + rapidsmpf::OpID{ + static_cast(10 * i + op_id++) + }, + rapidsmpf::ndsh::KeepKeys::YES + ) // p_partkey, l_quantity, l_extendedprice + ); + } + + // Fanout the join result for two uses + auto part_x_lineitem_for_avg = ctx->create_channel(); + auto part_x_lineitem_for_join = ctx->create_channel(); + nodes.push_back( + rapidsmpf::streaming::node::fanout( + ctx, + part_x_lineitem, + {part_x_lineitem_for_avg, part_x_lineitem_for_join}, + rapidsmpf::streaming::node::FanoutPolicy::UNBOUNDED + ) + ); + + // Compute average quantity grouped by p_partkey + auto avg_quantity_chunks = ctx->create_channel(); + nodes.push_back(compute_avg_quantity( + ctx, part_x_lineitem_for_avg, avg_quantity_chunks + )); + + // Concatenate average quantity results + auto avg_quantity_concatenated = ctx->create_channel(); + nodes.push_back( + rapidsmpf::ndsh::concatenate( + ctx, + avg_quantity_chunks, + avg_quantity_concatenated, + rapidsmpf::ndsh::ConcatOrder::DONT_CARE + ) + ); + + // Final groupby for avg_quantity across all chunks + auto avg_quantity_final = ctx->create_channel(); + nodes.push_back( + []( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out, + rapidsmpf::OpID tag + ) -> rapidsmpf::streaming::Node { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + co_await ctx->executor()->schedule(); + auto msg = co_await ch_in->receive(); + auto next = co_await ch_in->receive(); + ctx->comm()->logger().print("Final avg groupby"); + RAPIDSMPF_EXPECTS(next.empty(), "Expecting concatenated input"); + + auto chunk = rapidsmpf::ndsh::to_device( + ctx, msg.release() + ); + auto chunk_stream = chunk.stream(); + auto table = chunk.table_view(); + + std::unique_ptr local_result{nullptr}; + if (!table.is_empty()) { + // table has: p_partkey, sum(l_quantity), count(l_quantity) + // Group by p_partkey and sum the sums and counts + auto grouper = cudf::groupby::groupby( + table.select({0}), + cudf::null_policy::EXCLUDE, + cudf::sorted::NO + ); + auto requests = + std::vector(); + std::vector> + sum_aggs1; + sum_aggs1.push_back( + cudf::make_sum_aggregation() + ); + std::vector> + sum_aggs2; + sum_aggs2.push_back( + cudf::make_sum_aggregation() + ); + requests.push_back( + cudf::groupby::aggregation_request( + table.column(1), std::move(sum_aggs1) + ) + ); + requests.push_back( + cudf::groupby::aggregation_request( + table.column(2), std::move(sum_aggs2) + ) + ); + auto [keys, results] = grouper.aggregate( + requests, chunk_stream, ctx->br()->device_mr() + ); + + // Now compute mean = sum / count, then multiply by 0.2 + auto sum_col = results[0].results[0]->view(); + auto count_col = results[1].results[0]->view(); + + // mean = sum / count + auto mean_col = cudf::binary_operation( + sum_col, + count_col, + cudf::binary_operator::DIV, + cudf::data_type(cudf::type_id::FLOAT64), + chunk_stream, + ctx->br()->device_mr() + ); + + // avg_quantity = 0.2 * mean + auto scalar_02 = cudf::make_numeric_scalar( + cudf::data_type(cudf::type_id::FLOAT64), + chunk_stream, + ctx->br()->device_mr() + ); + static_cast*>(scalar_02.get()) + ->set_value(0.2, chunk_stream); + auto avg_quantity = cudf::binary_operation( + mean_col->view(), + *scalar_02, + cudf::binary_operator::MUL, + cudf::data_type(cudf::type_id::FLOAT64), + chunk_stream, + ctx->br()->device_mr() + ); + + // Output: p_partkey (as key), avg_quantity + auto result = keys->release(); + result.push_back(std::move(avg_quantity)); + local_result = + std::make_unique(std::move(result)); + } + + if (ctx->comm()->nranks() > 1) { + rapidsmpf::streaming::AllGather gatherer{ctx, tag}; + if (local_result) { + auto pack = cudf::pack( + local_result->view(), + chunk_stream, + ctx->br()->device_mr() + ); + gatherer.insert( + 0, + {rapidsmpf::PackedData( + std::move(pack.metadata), + ctx->br()->move( + std::move(pack.gpu_data), chunk_stream + ) + )} + ); + } + gatherer.insert_finished(); + auto packed_data = co_await gatherer.extract_all( + rapidsmpf::streaming::AllGather::Ordered::NO + ); + if (ctx->comm()->rank() == 0) { + auto global_result = rapidsmpf::unpack_and_concat( + rapidsmpf::unspill_partitions( + std::move(packed_data), + ctx->br(), + true, + ctx->statistics() + ), + chunk_stream, + ctx->br(), + ctx->statistics() + ); + + auto result_view = global_result->view(); + // result_view has: p_partkey, sum, count + // Group by p_partkey and sum both the sums and counts + auto grouper = cudf::groupby::groupby( + result_view.select({0}), + cudf::null_policy::EXCLUDE, + cudf::sorted::NO + ); + auto requests = + std::vector(); + std::vector> + sum_aggs1; + sum_aggs1.push_back( + cudf::make_sum_aggregation< + cudf::groupby_aggregation>() + ); + std::vector> + sum_aggs2; + sum_aggs2.push_back( + cudf::make_sum_aggregation< + cudf::groupby_aggregation>() + ); + requests.push_back( + cudf::groupby::aggregation_request( + result_view.column(1), std::move(sum_aggs1) + ) + ); + requests.push_back( + cudf::groupby::aggregation_request( + result_view.column(2), std::move(sum_aggs2) + ) + ); + auto [keys, results] = grouper.aggregate( + requests, chunk_stream, ctx->br()->device_mr() + ); + global_result.reset(); + + // Compute mean = sum / count, then multiply by 0.2 + auto sum_col = results[0].results[0]->view(); + auto count_col = results[1].results[0]->view(); + auto mean_col = cudf::binary_operation( + sum_col, + count_col, + cudf::binary_operator::DIV, + cudf::data_type(cudf::type_id::FLOAT64), + chunk_stream, + ctx->br()->device_mr() + ); + + auto scalar_02 = cudf::make_numeric_scalar( + cudf::data_type(cudf::type_id::FLOAT64), + chunk_stream, + ctx->br()->device_mr() + ); + static_cast*>( + scalar_02.get() + ) + ->set_value(0.2, chunk_stream); + auto avg_quantity = cudf::binary_operation( + mean_col->view(), + *scalar_02, + cudf::binary_operator::MUL, + cudf::data_type(cudf::type_id::FLOAT64), + chunk_stream, + ctx->br()->device_mr() + ); + + auto result = keys->release(); + result.push_back(std::move(avg_quantity)); + co_await ch_out->send( + rapidsmpf::streaming::to_message( + 0, + std::make_unique< + rapidsmpf::streaming::TableChunk>( + std::make_unique( + std::move(result) + ), + chunk_stream + ) + ) + ); + } + } else { + co_await ch_out->send( + rapidsmpf::streaming::to_message( + 0, + std::make_unique( + std::move(local_result), chunk_stream + ) + ) + ); + } + co_await ch_out->drain(ctx->executor()); + }(ctx, + avg_quantity_concatenated, + avg_quantity_final, + rapidsmpf::OpID{static_cast(10 * i + op_id++)}) + ); + + // Join part_x_lineitem with avg_quantity on p_partkey = key + auto final_join = ctx->create_channel(); + nodes.push_back( + rapidsmpf::ndsh::inner_join_broadcast( + ctx, + avg_quantity_final, + part_x_lineitem_for_join, + final_join, + {0}, // key from avg_quantity + {0}, // p_partkey from part_x_lineitem + rapidsmpf::OpID{static_cast(10 * i + op_id++)}, + rapidsmpf::ndsh::KeepKeys::YES + ) // key, avg_quantity, l_quantity, l_extendedprice + ); + + // Concatenate for final aggregation + auto final_join_concatenated = ctx->create_channel(); + nodes.push_back( + rapidsmpf::ndsh::concatenate( + ctx, + final_join, + final_join_concatenated, + rapidsmpf::ndsh::ConcatOrder::DONT_CARE + ) + ); + + // Final aggregation (filter, sum, divide by 7) + auto aggregated = ctx->create_channel(); + nodes.push_back(final_aggregation( + ctx, + final_join_concatenated, + aggregated, + rapidsmpf::OpID{static_cast(10 * i + op_id++)} + )); + + // Round result + auto rounded = ctx->create_channel(); + nodes.push_back(round_result(ctx, aggregated, rounded)); + + // Write output + nodes.push_back(write_parquet(ctx, rounded, output_path)); + } + auto end = std::chrono::steady_clock::now(); + std::chrono::duration pipeline = end - start; + start = std::chrono::steady_clock::now(); + { + RAPIDSMPF_NVTX_SCOPED_RANGE("Q17 Iteration"); + rapidsmpf::streaming::run_streaming_pipeline(std::move(nodes)); + } + end = std::chrono::steady_clock::now(); + std::chrono::duration compute = end - start; + comm->logger().print( + "Iteration ", i, " pipeline construction time [s]: ", pipeline.count() + ); + comm->logger().print("Iteration ", i, " compute time [s]: ", compute.count()); + timings.push_back(pipeline.count()); + timings.push_back(compute.count()); + ctx->comm()->logger().print(stats->report()); + RAPIDSMPF_MPI(MPI_Barrier(mpi_comm)); + } + if (comm->rank() == 0) { + for (int i = 0; i < 2; i++) { + comm->logger().print( + "Iteration ", + i, + " pipeline construction time [s]: ", + timings[size_t(2 * i)] + ); + comm->logger().print( + "Iteration ", i, " compute time [s]: ", timings[size_t(2 * i + 1)] + ); + } + } + } + + RAPIDSMPF_MPI(MPI_Comm_free(&mpi_comm)); + RAPIDSMPF_MPI(MPI_Finalize()); + return 0; +} From 789574d1e91a88cd0e798bf33e2a714154b2f8f8 Mon Sep 17 00:00:00 2001 From: Benjamin Zaitlen Date: Thu, 4 Dec 2025 18:35:34 -0800 Subject: [PATCH 03/11] a few more cleanups --- cpp/benchmarks/streaming/ndsh/q17.cpp | 252 ++++++++++++++++++-------- 1 file changed, 179 insertions(+), 73 deletions(-) diff --git a/cpp/benchmarks/streaming/ndsh/q17.cpp b/cpp/benchmarks/streaming/ndsh/q17.cpp index 807169c00..603d824d0 100644 --- a/cpp/benchmarks/streaming/ndsh/q17.cpp +++ b/cpp/benchmarks/streaming/ndsh/q17.cpp @@ -239,68 +239,78 @@ rapidsmpf::streaming::Node final_aggregation( rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; co_await ctx->executor()->schedule(); - // Concatenated input is expected - auto msg = co_await ch_in->receive(); - auto next = co_await ch_in->receive(); ctx->comm()->logger().print("Final aggregation"); - RAPIDSMPF_EXPECTS(next.empty(), "Expecting concatenated input at this point"); - auto chunk = - rapidsmpf::ndsh::to_device(ctx, msg.release()); - auto chunk_stream = chunk.stream(); - auto table = chunk.table_view(); + // Process chunks incrementally to avoid OOM + double local_sum = 0.0; + while (true) { + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + break; + } - std::unique_ptr local_result{nullptr}; - if (!table.is_empty()) { - // table has: key, avg_quantity, l_quantity, l_extendedprice - // Filter: l_quantity < avg_quantity - auto l_quantity = table.column(2); - auto avg_quantity = table.column(1); - auto filter_mask = cudf::binary_operation( - l_quantity, - avg_quantity, - cudf::binary_operator::LESS, - cudf::data_type(cudf::type_id::BOOL8), - chunk_stream, - ctx->br()->device_mr() - ); - auto filtered = cudf::apply_boolean_mask( - table, filter_mask->view(), chunk_stream, ctx->br()->device_mr() + auto chunk = rapidsmpf::ndsh::to_device( + ctx, msg.release() ); + auto chunk_stream = chunk.stream(); + auto table = chunk.table_view(); - // Sum l_extendedprice (don't divide yet - we need to sum across ranks first!) - if (filtered->num_rows() > 0) { - auto l_extendedprice = filtered->view().column(3); - auto sum_agg = cudf::make_sum_aggregation(); - auto sum_result = cudf::reduce( - l_extendedprice, - *sum_agg, - cudf::data_type(cudf::type_id::FLOAT64), + if (!table.is_empty() && table.num_columns() >= 4) { + // table has: key, avg_quantity, l_quantity, l_extendedprice + // Filter: l_quantity < avg_quantity + auto l_quantity = table.column(2); + auto avg_quantity = table.column(1); + auto filter_mask = cudf::binary_operation( + l_quantity, + avg_quantity, + cudf::binary_operator::LESS, + cudf::data_type(cudf::type_id::BOOL8), chunk_stream, ctx->br()->device_mr() ); - - // Store the sum (not yet divided) as a column - auto sum_val = static_cast&>(*sum_result) - .value(chunk_stream); - auto sum_scalar = cudf::make_numeric_scalar( - cudf::data_type(cudf::type_id::FLOAT64), - chunk_stream, - ctx->br()->device_mr() + auto filtered = cudf::apply_boolean_mask( + table, filter_mask->view(), chunk_stream, ctx->br()->device_mr() ); - static_cast*>(sum_scalar.get()) - ->set_value(sum_val, chunk_stream); - std::vector> result_cols; - result_cols.push_back( - cudf::make_column_from_scalar( - *sum_scalar, 1, chunk_stream, ctx->br()->device_mr() - ) - ); - local_result = std::make_unique(std::move(result_cols)); + // Sum l_extendedprice for this chunk + if (filtered->num_rows() > 0) { + auto l_extendedprice = filtered->view().column(3); + auto sum_agg = cudf::make_sum_aggregation(); + auto sum_result = cudf::reduce( + l_extendedprice, + *sum_agg, + cudf::data_type(cudf::type_id::FLOAT64), + chunk_stream, + ctx->br()->device_mr() + ); + + // Accumulate the sum + auto chunk_sum = static_cast&>(*sum_result) + .value(chunk_stream); + local_sum += chunk_sum; + } } } + // Create result table with local sum + std::unique_ptr local_result{nullptr}; + auto chunk_stream = rmm::cuda_stream_view{}; + if (local_sum != 0.0) { + auto sum_scalar = cudf::make_numeric_scalar( + cudf::data_type(cudf::type_id::FLOAT64), chunk_stream, ctx->br()->device_mr() + ); + static_cast*>(sum_scalar.get()) + ->set_value(local_sum, chunk_stream); + + std::vector> result_cols; + result_cols.push_back( + cudf::make_column_from_scalar( + *sum_scalar, 1, chunk_stream, ctx->br()->device_mr() + ) + ); + local_result = std::make_unique(std::move(result_cols)); + } + if (ctx->comm()->nranks() > 1) { // Gather results from all ranks rapidsmpf::streaming::AllGather gatherer{ctx, tag}; @@ -358,6 +368,30 @@ rapidsmpf::streaming::Node final_aggregation( ) ); + co_await ch_out->send( + rapidsmpf::streaming::to_message( + 0, + std::make_unique( + std::make_unique(std::move(result_cols)), + chunk_stream + ) + ) + ); + } else { + // No data after filtering - send empty result with 0.0 + auto zero_scalar = cudf::make_numeric_scalar( + cudf::data_type(cudf::type_id::FLOAT64), + chunk_stream, + ctx->br()->device_mr() + ); + static_cast*>(zero_scalar.get()) + ->set_value(0.0, chunk_stream); + std::vector> result_cols; + result_cols.push_back( + cudf::make_column_from_scalar( + *zero_scalar, 1, chunk_stream, ctx->br()->device_mr() + ) + ); co_await ch_out->send( rapidsmpf::streaming::to_message( 0, @@ -369,6 +403,7 @@ rapidsmpf::streaming::Node final_aggregation( ); } } + // Non-zero ranks don't send anything (following q09 pattern) } else { // Single rank: divide by 7.0 here if (local_result) { @@ -401,6 +436,30 @@ rapidsmpf::streaming::Node final_aggregation( ) ); + co_await ch_out->send( + rapidsmpf::streaming::to_message( + 0, + std::make_unique( + std::make_unique(std::move(result_cols)), + chunk_stream + ) + ) + ); + } else { + // No data after filtering - send result with 0.0 + auto zero_scalar = cudf::make_numeric_scalar( + cudf::data_type(cudf::type_id::FLOAT64), + chunk_stream, + ctx->br()->device_mr() + ); + static_cast*>(zero_scalar.get()) + ->set_value(0.0, chunk_stream); + std::vector> result_cols; + result_cols.push_back( + cudf::make_column_from_scalar( + *zero_scalar, 1, chunk_stream, ctx->br()->device_mr() + ) + ); co_await ch_out->send( rapidsmpf::streaming::to_message( 0, @@ -646,7 +705,7 @@ int main(int argc, char** argv) { std::string output_path = cmd_options.output_file; std::vector timings; - for (int i = 0; i < 2; i++) { + for (int i = 0; i < 1; i++) { rapidsmpf::OpID op_id{0}; std::vector nodes; auto start = std::chrono::steady_clock::now(); @@ -973,16 +1032,71 @@ int main(int argc, char** argv) { ) ) ); + } else { + // Non-zero ranks: send empty table with correct schema + // Schema: key (INT64), avg_quantity (FLOAT64) + std::vector> empty_cols; + empty_cols.push_back( + cudf::make_empty_column( + cudf::data_type(cudf::type_id::INT64) + ) + ); + empty_cols.push_back( + cudf::make_empty_column( + cudf::data_type(cudf::type_id::FLOAT64) + ) + ); + co_await ch_out->send( + rapidsmpf::streaming::to_message( + 0, + std::make_unique< + rapidsmpf::streaming::TableChunk>( + std::make_unique( + std::move(empty_cols) + ), + chunk_stream + ) + ) + ); } } else { - co_await ch_out->send( - rapidsmpf::streaming::to_message( - 0, - std::make_unique( - std::move(local_result), chunk_stream + if (local_result) { + co_await ch_out->send( + rapidsmpf::streaming::to_message( + 0, + std::make_unique< + rapidsmpf::streaming::TableChunk>( + std::move(local_result), chunk_stream + ) ) - ) - ); + ); + } else { + // Single rank with no data: send empty table with correct + // schema + std::vector> empty_cols; + empty_cols.push_back( + cudf::make_empty_column( + cudf::data_type(cudf::type_id::INT64) + ) + ); + empty_cols.push_back( + cudf::make_empty_column( + cudf::data_type(cudf::type_id::FLOAT64) + ) + ); + co_await ch_out->send( + rapidsmpf::streaming::to_message( + 0, + std::make_unique< + rapidsmpf::streaming::TableChunk>( + std::make_unique( + std::move(empty_cols) + ), + chunk_stream + ) + ) + ); + } } co_await ch_out->drain(ctx->executor()); }(ctx, @@ -992,12 +1106,14 @@ int main(int argc, char** argv) { ); // Join part_x_lineitem with avg_quantity on p_partkey = key + // avg_quantity_final is small (~199K rows), so broadcast it + // part_x_lineitem is large, so keep it distributed auto final_join = ctx->create_channel(); nodes.push_back( rapidsmpf::ndsh::inner_join_broadcast( ctx, - avg_quantity_final, - part_x_lineitem_for_join, + avg_quantity_final, // Small table - broadcast this + part_x_lineitem_for_join, // Large table - probe with this final_join, {0}, // key from avg_quantity {0}, // p_partkey from part_x_lineitem @@ -1006,22 +1122,12 @@ int main(int argc, char** argv) { ) // key, avg_quantity, l_quantity, l_extendedprice ); - // Concatenate for final aggregation - auto final_join_concatenated = ctx->create_channel(); - nodes.push_back( - rapidsmpf::ndsh::concatenate( - ctx, - final_join, - final_join_concatenated, - rapidsmpf::ndsh::ConcatOrder::DONT_CARE - ) - ); - - // Final aggregation (filter, sum, divide by 7) + // Final aggregation (filter, sum, divide by 7) - processes chunks + // incrementally auto aggregated = ctx->create_channel(); nodes.push_back(final_aggregation( ctx, - final_join_concatenated, + final_join, aggregated, rapidsmpf::OpID{static_cast(10 * i + op_id++)} )); @@ -1052,7 +1158,7 @@ int main(int argc, char** argv) { RAPIDSMPF_MPI(MPI_Barrier(mpi_comm)); } if (comm->rank() == 0) { - for (int i = 0; i < 2; i++) { + for (int i = 0; i < 1; i++) { comm->logger().print( "Iteration ", i, From 9188c69f229b710a07dbb8eae404f66c9c1f2162 Mon Sep 17 00:00:00 2001 From: Benjamin Zaitlen Date: Fri, 5 Dec 2025 20:04:18 -0800 Subject: [PATCH 04/11] fix avg calc --- cpp/benchmarks/streaming/ndsh/q17.cpp | 48 +++++---------------------- 1 file changed, 8 insertions(+), 40 deletions(-) diff --git a/cpp/benchmarks/streaming/ndsh/q17.cpp b/cpp/benchmarks/streaming/ndsh/q17.cpp index 603d824d0..60ecdaf94 100644 --- a/cpp/benchmarks/streaming/ndsh/q17.cpp +++ b/cpp/benchmarks/streaming/ndsh/q17.cpp @@ -239,8 +239,6 @@ rapidsmpf::streaming::Node final_aggregation( rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; co_await ctx->executor()->schedule(); - ctx->comm()->logger().print("Final aggregation"); - // Process chunks incrementally to avoid OOM double local_sum = 0.0; while (true) { @@ -485,7 +483,6 @@ rapidsmpf::streaming::Node round_result( if (msg.empty()) { co_return; } - ctx->comm()->logger().print("Round result"); auto chunk = rapidsmpf::ndsh::to_device(ctx, msg.release()); auto table = chunk.table_view(); @@ -521,7 +518,6 @@ rapidsmpf::streaming::Node write_parquet( if (msg.empty()) { co_return; } - ctx->comm()->logger().print("write parquet"); auto chunk = rapidsmpf::ndsh::to_device(ctx, msg.release()); auto sink = cudf::io::sink_info(output_path); @@ -705,7 +701,7 @@ int main(int argc, char** argv) { std::string output_path = cmd_options.output_file; std::vector timings; - for (int i = 0; i < 1; i++) { + for (int i = 0; i < 2; i++) { rapidsmpf::OpID op_id{0}; std::vector nodes; auto start = std::chrono::steady_clock::now(); @@ -834,7 +830,6 @@ int main(int argc, char** argv) { co_await ctx->executor()->schedule(); auto msg = co_await ch_in->receive(); auto next = co_await ch_in->receive(); - ctx->comm()->logger().print("Final avg groupby"); RAPIDSMPF_EXPECTS(next.empty(), "Expecting concatenated input"); auto chunk = rapidsmpf::ndsh::to_device( @@ -878,40 +873,12 @@ int main(int argc, char** argv) { requests, chunk_stream, ctx->br()->device_mr() ); - // Now compute mean = sum / count, then multiply by 0.2 - auto sum_col = results[0].results[0]->view(); - auto count_col = results[1].results[0]->view(); - - // mean = sum / count - auto mean_col = cudf::binary_operation( - sum_col, - count_col, - cudf::binary_operator::DIV, - cudf::data_type(cudf::type_id::FLOAT64), - chunk_stream, - ctx->br()->device_mr() - ); - - // avg_quantity = 0.2 * mean - auto scalar_02 = cudf::make_numeric_scalar( - cudf::data_type(cudf::type_id::FLOAT64), - chunk_stream, - ctx->br()->device_mr() - ); - static_cast*>(scalar_02.get()) - ->set_value(0.2, chunk_stream); - auto avg_quantity = cudf::binary_operation( - mean_col->view(), - *scalar_02, - cudf::binary_operator::MUL, - cudf::data_type(cudf::type_id::FLOAT64), - chunk_stream, - ctx->br()->device_mr() - ); - - // Output: p_partkey (as key), avg_quantity + // Output: p_partkey (as key), sum(l_quantity), + // count(l_quantity) Don't compute avg here - do it after + // global aggregation auto result = keys->release(); - result.push_back(std::move(avg_quantity)); + result.push_back(std::move(results[0].results[0])); // sum + result.push_back(std::move(results[1].results[0])); // count local_result = std::make_unique(std::move(result)); } @@ -1158,7 +1125,7 @@ int main(int argc, char** argv) { RAPIDSMPF_MPI(MPI_Barrier(mpi_comm)); } if (comm->rank() == 0) { - for (int i = 0; i < 1; i++) { + for (int i = 0; i < 2; i++) { comm->logger().print( "Iteration ", i, @@ -1172,6 +1139,7 @@ int main(int argc, char** argv) { } } + RAPIDSMPF_MPI(MPI_Barrier(mpi_comm)); RAPIDSMPF_MPI(MPI_Comm_free(&mpi_comm)); RAPIDSMPF_MPI(MPI_Finalize()); return 0; From 0e895999906a8a86606a3d531dcc1e9cf734d3af Mon Sep 17 00:00:00 2001 From: Benjamin Zaitlen Date: Sat, 6 Dec 2025 11:11:20 -0800 Subject: [PATCH 05/11] remove q9 python implementations --- .../examples/streaming/ndsh/__init__.py | 4 - .../rapidsmpf/examples/streaming/ndsh/q09.py | 654 ------------------ 2 files changed, 658 deletions(-) delete mode 100644 python/rapidsmpf/rapidsmpf/examples/streaming/ndsh/__init__.py delete mode 100644 python/rapidsmpf/rapidsmpf/examples/streaming/ndsh/q09.py diff --git a/python/rapidsmpf/rapidsmpf/examples/streaming/ndsh/__init__.py b/python/rapidsmpf/rapidsmpf/examples/streaming/ndsh/__init__.py deleted file mode 100644 index a32042717..000000000 --- a/python/rapidsmpf/rapidsmpf/examples/streaming/ndsh/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. -# SPDX-License-Identifier: Apache-2.0 - -"""Streaming examples of TPC-H derived queries.""" diff --git a/python/rapidsmpf/rapidsmpf/examples/streaming/ndsh/q09.py b/python/rapidsmpf/rapidsmpf/examples/streaming/ndsh/q09.py deleted file mode 100644 index c5dac7538..000000000 --- a/python/rapidsmpf/rapidsmpf/examples/streaming/ndsh/q09.py +++ /dev/null @@ -1,654 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. -# SPDX-License-Identifier: Apache-2.0 -# ruff: noqa: D103 - -"""Example by hand implementation of a derivation of TPC-H Q9.""" - -from __future__ import annotations - -import itertools -import time -from concurrent.futures import ThreadPoolExecutor -from pathlib import Path -from typing import TYPE_CHECKING - -import click -import nvtx - -import pylibcudf as plc -import rmm -from pylibcudf.experimental._join_streams import join_streams - -from rapidsmpf.buffer.resource import BufferResource -from rapidsmpf.communicator.single import new_communicator as single_process_comm -from rapidsmpf.config import Options, get_environment_variables -from rapidsmpf.rmm_resource_adaptor import RmmResourceAdaptor -from rapidsmpf.streaming.core.channel import Channel -from rapidsmpf.streaming.core.context import Context -from rapidsmpf.streaming.core.message import Message -from rapidsmpf.streaming.core.node import define_py_node, run_streaming_pipeline -from rapidsmpf.streaming.cudf.parquet import read_parquet -from rapidsmpf.streaming.cudf.table_chunk import TableChunk - -if TYPE_CHECKING: - from collections.abc import Sequence - - from rapidsmpf.streaming.core.node import CppNode, PyNode - - -def get_streaming_context(num_streaming_threads: int) -> Context: - env = get_environment_variables() - env["num_streaming_threads"] = str(num_streaming_threads) - options = Options(env) - comm = single_process_comm(options) - # TODO: multi-GPU, memory limiter, spilling. Need to expose TableChunk::make_available. - mr = RmmResourceAdaptor(rmm.mr.CudaAsyncMemoryResource()) - # TODO: explicit memory resources. Need to expose device_mr to python. - br = BufferResource(mr) - # Note: this must be done even if we use the br's memory resource - # everywhere so that cudf uses this MR for internal allocations. - rmm.mr.set_current_device_resource(mr) - return Context(comm, br, options) - - -def reader_options( - files: Sequence[str], columns: list[str] -) -> plc.io.parquet.ParquetReaderOptions: - source = plc.io.SourceInfo(files) - options = plc.io.parquet.ParquetReaderOptions.builder(source).build() - options.set_columns(columns) - return options - - -def read_lineitem( - ctx: Context, - files: Sequence[str], - num_producers: int, - num_rows_per_chunk: int, - ch: Channel[TableChunk], -) -> CppNode | PyNode: - columns = [ - "l_discount", # 0 - "l_extendedprice", # 1 - "l_orderkey", # 2 - "l_partkey", # 3 - "l_quantity", # 4 - "l_suppkey", - ] - return read_parquet( - ctx, ch, num_producers, reader_options(files, columns), num_rows_per_chunk - ) - - -def read_nation( - ctx: Context, - files: Sequence[str], - num_rows_per_chunk: int, - ch: Channel[TableChunk], -) -> CppNode | PyNode: - columns = [ - "n_name", # 0 - "n_nationkey", # 1 - ] - return read_parquet(ctx, ch, 1, reader_options(files, columns), num_rows_per_chunk) - - -def read_orders( - ctx: Context, - files: Sequence[str], - num_producers: int, - num_rows_per_chunk: int, - ch: Channel[TableChunk], -) -> CppNode | PyNode: - columns = [ - "o_orderdate", # 0 - "o_orderkey", # 1 - ] - return read_parquet( - ctx, ch, num_producers, reader_options(files, columns), num_rows_per_chunk - ) - - -def read_part( - ctx: Context, - files: Sequence[str], - num_producers: int, - num_rows_per_chunk: int, - ch: Channel[TableChunk], -) -> CppNode | PyNode: - columns = [ - "p_partkey", # 0 - "p_name", # 1 - ] - return read_parquet( - ctx, ch, num_producers, reader_options(files, columns), num_rows_per_chunk - ) - - -def read_partsupp( - ctx: Context, - files: Sequence[str], - num_producers: int, - num_rows_per_chunk: int, - ch: Channel[TableChunk], -) -> CppNode | PyNode: - columns = [ - "ps_partkey", # 0 - "ps_suppkey", # 1 - "ps_supplycost", # 2 - ] - return read_parquet( - ctx, ch, num_producers, reader_options(files, columns), num_rows_per_chunk - ) - - -def read_supplier( - ctx: Context, - files: Sequence[str], - num_producers: int, - num_rows_per_chunk: int, - ch: Channel[TableChunk], -) -> CppNode | PyNode: - columns = [ - "s_nationkey", # 0 - "s_suppkey", # 1 - ] - return read_parquet( - ctx, ch, num_producers, reader_options(files, columns), num_rows_per_chunk - ) - - -@define_py_node() -async def filter_part( - ctx: Context, ch_in: Channel[TableChunk], ch_out: Channel[TableChunk] -) -> None: - while (msg := await ch_in.recv(ctx)) is not None: - chunk = TableChunk.from_message(msg) - stream = chunk.stream - table = chunk.table_view() - target = plc.Scalar.from_py("green", stream=stream) - chunk = TableChunk.from_pylibcudf_table( - plc.stream_compaction.apply_boolean_mask( - plc.Table(table.columns()[:1]), - plc.strings.find.contains(table.columns()[1], target, stream), - stream, - ), - stream, - exclusive_view=True, - ) - await ch_out.send(ctx, Message(msg.sequence_number, chunk)) - await ch_out.drain(ctx) - - -@define_py_node() -async def select_columns( - ctx: Context, ch_in: Channel[TableChunk], ch_out: Channel[TableChunk] -) -> None: - while (msg := await ch_in.recv(ctx)) is not None: - chunk = TableChunk.from_message(msg) - stream = chunk.stream - columns = chunk.table_view().columns() - name, supplycost, discount, extendedprice, quantity, orderdate = columns - revenue_type = supplycost.type() # float64 - orderdate = plc.datetime.extract_datetime_component( - orderdate, plc.datetime.DatetimeComponent.YEAR, stream - ) - revenue = plc.transform.transform( - [discount, extendedprice, supplycost, quantity], - """ - static __device__ void calculate_amount( - double *amount, double discount, double extprice, double supplycost, double quantity - ) { - *amount = extprice * (1 - discount) - supplycost * quantity; - } - """, - revenue_type, - False, # noqa: FBT003 - plc.types.NullAware.NO, - stream, - ) - await ch_out.send( - ctx, - Message( - msg.sequence_number, - TableChunk.from_pylibcudf_table( - plc.Table([name, orderdate, revenue]), - stream, - exclusive_view=True, - ), - ), - ) - await ch_out.drain(ctx) - - -@define_py_node() -async def broadcast_join( - ctx: Context, - left_ch: Channel[TableChunk], - right_ch: Channel[TableChunk], - ch_out: Channel[TableChunk], - left_on: Sequence[int], - right_on: Sequence[int], - *, - keep_keys: bool, -) -> None: - left_tables: list[TableChunk] = [] - chunk_streams = set() - while (msg := await left_ch.recv(ctx)) is not None: - left_tables.append(TableChunk.from_message(msg)) - chunk_streams.add(left_tables[-1].stream) - build_stream = ctx.get_stream_from_pool() - join_streams(list(chunk_streams), build_stream) - if len(left_tables) == 1: - left = left_tables[0].table_view().columns() - else: - left = plc.concatenate.concatenate( - [t.table_view() for t in left_tables], build_stream - ).columns() - left_keys = plc.Table([left[i] for i in left_on]) - if keep_keys: - left_carrier = plc.Table(left) - else: - left_carrier = plc.Table([c for i, c in enumerate(left) if i not in left_on]) - for s in chunk_streams: - join_streams([build_stream], s) - del left_tables - sequence_number = 0 - chunk_streams.clear() - while (msg := await right_ch.recv(ctx)) is not None: - chunk = TableChunk.from_message(msg) - chunk_streams.add(chunk.stream) - join_streams([build_stream], chunk.stream) - # Safe to access left_carrier on chunk.stream - right_columns = chunk.table_view().columns() - right_keys = plc.Table([right_columns[i] for i in right_on]) - right_carrier = plc.Table( - [c for i, c in enumerate(right_columns) if i not in right_on] - ) - left, right = plc.join.inner_join( - left_keys, right_keys, plc.types.NullEquality.UNEQUAL, chunk.stream - ) - left = plc.copying.gather( - left_carrier, - left, - plc.copying.OutOfBoundsPolicy.DONT_CHECK, - chunk.stream, - ) - right = plc.copying.gather( - right_carrier, - right, - plc.copying.OutOfBoundsPolicy.DONT_CHECK, - chunk.stream, - ) - await ch_out.send( - ctx, - Message( - sequence_number, - TableChunk.from_pylibcudf_table( - plc.Table([*left.columns(), *right.columns()]), - chunk.stream, - exclusive_view=True, - ), - ), - ) - sequence_number += 1 - # Ensure left_carrier and keys are deallocated after table chunks are produced - for s in chunk_streams: - join_streams([build_stream], s) - await ch_out.drain(ctx) - - -@define_py_node() -async def chunkwise_groupby_agg( - ctx: Context, ch_in: Channel[TableChunk], ch_out: Channel[TableChunk] -) -> None: - sequence = 0 - while (msg := await ch_in.recv(ctx)) is not None: - chunk = TableChunk.from_message(msg) - name, date, revenue = chunk.table_view().columns() - stream = chunk.stream - grouper = plc.groupby.GroupBy( - plc.Table([name, date]), - plc.types.NullPolicy.EXCLUDE, - plc.types.Sorted.NO, - ) - reqs = [plc.groupby.GroupByRequest(revenue, [plc.aggregation.sum()])] - (keys, results) = grouper.aggregate(reqs, stream) - del chunk, name, date, revenue - await ch_out.send( - ctx, - Message( - sequence, - TableChunk.from_pylibcudf_table( - plc.Table( - [ - *keys.columns(), - *itertools.chain.from_iterable( - r.columns() for r in results - ), - ] - ), - stream, - exclusive_view=True, - ), - ), - ) - sequence += 1 - await ch_out.drain(ctx) - - -@define_py_node() -async def concatenate( - ctx: Context, ch_in: Channel[TableChunk], ch_out: Channel[TableChunk] -) -> None: - chunks = [] - build_stream = ctx.get_stream_from_pool() - chunk_streams = set() - while (msg := await ch_in.recv(ctx)) is not None: - chunk = TableChunk.from_message(msg) - chunks.append(chunk) - chunk_streams.add(chunk.stream) - join_streams(list(chunk_streams), build_stream) - table = plc.concatenate.concatenate( - [chunk.table_view() for chunk in chunks], build_stream - ) - for s in chunk_streams: - join_streams([build_stream], s) - await ch_out.send( - ctx, - Message( - 0, TableChunk.from_pylibcudf_table(table, build_stream, exclusive_view=True) - ), - ) - await ch_out.drain(ctx) - - -@define_py_node() -async def sort_by_and_round( - ctx: Context, ch_in: Channel[TableChunk], ch_out: Channel[TableChunk] -) -> None: - msg = await ch_in.recv(ctx) - if msg is None: - raise RuntimeError("Expecting a chunk in sort by") - if await ch_in.recv(ctx) is not None: - raise RuntimeError("Only expecting a single chunk") - chunk = TableChunk.from_message(msg) - name, date, revenue = chunk.table_view().columns() - stream = chunk.stream - revenue = plc.round.round(revenue, 2, plc.round.RoundingMethod.HALF_EVEN, stream) - await ch_out.send( - ctx, - Message( - 0, - TableChunk.from_pylibcudf_table( - plc.sorting.sort_by_key( - plc.Table([name, date, revenue]), - plc.Table([name, date]), - [plc.types.Order.ASCENDING, plc.types.Order.DESCENDING], - [plc.types.NullOrder.BEFORE, plc.types.NullOrder.BEFORE], - stream, - ), - stream, - exclusive_view=True, - ), - ), - ) - await ch_out.drain(ctx) - - -@define_py_node() -async def write_parquet( - ctx: Context, ch_in: Channel[TableChunk], filename: Path -) -> None: - msg = await ch_in.recv(ctx) - if msg is None: - raise RuntimeError("Expecting a chunk in write_parquet") - if await ch_in.recv(ctx) is not None: - raise RuntimeError("Only expecting a single chunk in write_parquet") - chunk = TableChunk.from_message(msg) - sink = plc.io.SinkInfo([filename]) - builder = plc.io.parquet.ParquetWriterOptions.builder(sink, chunk.table_view()) - metadata = plc.io.types.TableInputMetadata(chunk.table_view()) - metadata.column_metadata[0].set_name("nation") - metadata.column_metadata[1].set_name("o_year") - metadata.column_metadata[2].set_name("sum_profit") - options = builder.metadata(metadata).build() - plc.io.parquet.write_parquet(options, chunk.stream) - - -def get_files(base: str, suffix: str) -> list[str]: - path = Path(base) - if path.is_dir(): - files = sorted(path.glob(f"*.{suffix}")) - if len(files) == 0: - raise RuntimeError(f"No parquet files found in {path}") - return [str(f) for f in files] - else: - path = path.with_suffix(f".{suffix}") - if not path.exists(): - raise RuntimeError(f"File {path} does not exist") - return [str(path)] - - -def q( - ctx: Context, - num_rows_per_chunk: int, - num_producers_per_read: int, - output: str, - parquet_suffix: str, - lineitem: str, - nation: str, - orders: str, - part: str, - partsupp: str, - supplier: str, -) -> list[CppNode | PyNode]: - lineitem_files = get_files(lineitem, parquet_suffix) - part_files = get_files(part, parquet_suffix) - partsupp_files = get_files(partsupp, parquet_suffix) - supplier_files = get_files(supplier, parquet_suffix) - orders_files = get_files(orders, parquet_suffix) - nation_files = get_files(nation, parquet_suffix) - nodes: list[CppNode | PyNode] = [] - lineitem_ch = Channel[TableChunk]() - part_ch = Channel[TableChunk]() - filtered_part = Channel[TableChunk]() - partsupp_ch = Channel[TableChunk]() - supplier_ch = Channel[TableChunk]() - orders_ch = Channel[TableChunk]() - nation_ch = Channel[TableChunk]() - part_x_partsupp = Channel[TableChunk]() - supplier_x_part_x_partsupp = Channel[TableChunk]() - supplier_x_part_x_partsupp_x_lineitem = Channel[TableChunk]() - supplier_x_part_x_partsupp_x_lineitem_x_orders = Channel[TableChunk]() - all_joined = Channel[TableChunk]() - groupby_input = Channel[TableChunk]() - nodes.append( - read_part(ctx, part_files, num_producers_per_read, num_rows_per_chunk, part_ch) - ) - nodes.append( - read_partsupp( - ctx, partsupp_files, num_producers_per_read, num_rows_per_chunk, partsupp_ch - ) - ) - nodes.append( - read_supplier( - ctx, supplier_files, num_producers_per_read, num_rows_per_chunk, supplier_ch - ) - ) - nodes.append( - read_lineitem( - ctx, lineitem_files, num_producers_per_read, num_rows_per_chunk, lineitem_ch - ) - ) - nodes.append( - read_orders( - ctx, orders_files, num_producers_per_read, num_rows_per_chunk, orders_ch - ) - ) - # Nation is tiny so only launch a single producer - nodes.append(read_nation(ctx, nation_files, num_rows_per_chunk, nation_ch)) - nodes.append(filter_part(ctx, part_ch, filtered_part)) - nodes.append( - broadcast_join( - ctx, filtered_part, partsupp_ch, part_x_partsupp, [0], [0], keep_keys=True - ) - ) - nodes.append( - broadcast_join( - ctx, - supplier_ch, - part_x_partsupp, - supplier_x_part_x_partsupp, - [1], - [1], - keep_keys=True, - ) - ) - nodes.append( - broadcast_join( - ctx, - supplier_x_part_x_partsupp, - lineitem_ch, - supplier_x_part_x_partsupp_x_lineitem, - [2, 1], - [3, 5], - keep_keys=False, - ) - ) - nodes.append( - broadcast_join( - ctx, - supplier_x_part_x_partsupp_x_lineitem, - orders_ch, - supplier_x_part_x_partsupp_x_lineitem_x_orders, - [4], - [1], - keep_keys=False, - ) - ) - nodes.append( - broadcast_join( - ctx, - nation_ch, - supplier_x_part_x_partsupp_x_lineitem_x_orders, - all_joined, - [1], - [0], - keep_keys=False, - ) - ) - nodes.append(select_columns(ctx, all_joined, groupby_input)) - groupby_output = Channel[TableChunk]() - nodes.append(chunkwise_groupby_agg(ctx, groupby_input, groupby_output)) - concat_output = Channel[TableChunk]() - nodes.append(concatenate(ctx, groupby_output, concat_output)) - final_grouped = Channel[TableChunk]() - nodes.append(chunkwise_groupby_agg(ctx, concat_output, final_grouped)) - sorted = Channel[TableChunk]() - nodes.append(sort_by_and_round(ctx, final_grouped, sorted)) - nodes.append(write_parquet(ctx, sorted, Path(output))) - return nodes - - -@click.command() -@click.option( - "--num-iterations", default=2, help="Number of iterations of the query to run" -) -@click.option("--output", default="result.pq", help="Output result file") -@click.option( - "--num-rows-per-chunk", - default=50_000_000, - help="Number of rows read in a single chunk from input tables", -) -@click.option( - "--num-producers-per-read", - default=4, - help="Number of producer tasks for each parquet read", -) -@click.option( - "--num-streaming-threads", - default=8, - help="Number of threads C++ executor should use", -) -@click.option( - "--num-py-streaming-threads", - default=1, - help="Number of threads Python executor should use", -) -@click.option( - "--parquet-suffix", default="parquet", help="Suffix to append to find parquet files" -) -@click.option( - "--lineitem", - default="lineitem", - help="Name of file (with suffix appended) or name of directory containing lineitem files", -) -@click.option( - "--nation", - default="nation", - help="Name of file (with suffix appended) or name of directory containing nation files", -) -@click.option( - "--orders", - default="orders", - help="Name of file (with suffix appended) or name of directory containing orders files", -) -@click.option( - "--part", - default="part", - help="Name of file (with suffix appended) or name of directory containing part files", -) -@click.option( - "--partsupp", - default="partsupp", - help="Name of file (with suffix appended) or name of directory containing partsupp files", -) -@click.option( - "--supplier", - default="supplier", - help="Name of file (with suffix appended) or name of directory containing supplier files", -) -def main( - num_iterations: int, - output: str, - num_rows_per_chunk: int, - num_producers_per_read: int, - num_streaming_threads: int, - num_py_streaming_threads: int, - parquet_suffix: str, - lineitem: str, - nation: str, - orders: str, - part: str, - partsupp: str, - supplier: str, -) -> None: - py_exec = ThreadPoolExecutor(max_workers=num_py_streaming_threads) - ctx = get_streaming_context(num_streaming_threads) - for i in range(num_iterations): - start = time.perf_counter() - nodes = q( - ctx, - num_rows_per_chunk, - num_producers_per_read, - output, - parquet_suffix, - lineitem, - nation, - orders, - part, - partsupp, - supplier, - ) - end = time.perf_counter() - print(f"Iteration {i}: Pipeline construction {end - start:.4g}s") - with nvtx.annotate(message="Q9 iteration", color="blue", domain="rapidsmpf"): - start = time.perf_counter() - run_streaming_pipeline(nodes=nodes, py_executor=py_exec) - end = time.perf_counter() - print(f"Iteration {i}: Pipeline execution {end - start:.4g}s") - - -if __name__ == "__main__": - main() From 62cff0e4f5168502d132cb818eb1e0be8bbab6b0 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Tue, 9 Dec 2025 16:19:16 -0800 Subject: [PATCH 06/11] cmake --- cpp/benchmarks/streaming/ndsh/CMakeLists.txt | 55 +++++++------------- 1 file changed, 19 insertions(+), 36 deletions(-) diff --git a/cpp/benchmarks/streaming/ndsh/CMakeLists.txt b/cpp/benchmarks/streaming/ndsh/CMakeLists.txt index b45f3a11a..0ef5e80a9 100644 --- a/cpp/benchmarks/streaming/ndsh/CMakeLists.txt +++ b/cpp/benchmarks/streaming/ndsh/CMakeLists.txt @@ -36,42 +36,25 @@ target_link_libraries( $ maybe_asan ) -add_executable(q09 "q09.cpp") -set_target_properties( - q09 - PROPERTIES RUNTIME_OUTPUT_DIRECTORY "$" - CXX_STANDARD 20 - CXX_STANDARD_REQUIRED ON - CUDA_STANDARD 20 - CUDA_STANDARD_REQUIRED ON -) -target_compile_options( - q09 PRIVATE "$<$:${RAPIDSMPF_CXX_FLAGS}>" - "$<$:${RAPIDSMPF_CUDA_FLAGS}>" -) -target_link_libraries( - q09 PRIVATE rapidsmpfndsh rapidsmpf::rapidsmpf $ - $ maybe_asan -) - -add_executable(q17 "q17.cpp") -set_target_properties( - q17 - PROPERTIES RUNTIME_OUTPUT_DIRECTORY "$" - CXX_STANDARD 20 - CXX_STANDARD_REQUIRED ON - CUDA_STANDARD 20 - CUDA_STANDARD_REQUIRED ON -) -target_compile_options( - q17 PRIVATE "$<$:${RAPIDSMPF_CXX_FLAGS}>" - "$<$:${RAPIDSMPF_CUDA_FLAGS}>" -) -target_link_libraries( - q17 PRIVATE rapidsmpfndsh rapidsmpf::rapidsmpf $ - $ maybe_asan -) - +foreach(query IN ITEMS q09 q17) + add_executable(${query} "${query}.cpp") + set_target_properties( + ${query} + PROPERTIES RUNTIME_OUTPUT_DIRECTORY "$" + CXX_STANDARD 20 + CXX_STANDARD_REQUIRED ON + CUDA_STANDARD 20 + CUDA_STANDARD_REQUIRED ON + ) + target_compile_options( + ${query} PRIVATE "$<$:${RAPIDSMPF_CXX_FLAGS}>" + "$<$:${RAPIDSMPF_CUDA_FLAGS}>" + ) + target_link_libraries( + ${query} PRIVATE rapidsmpfndsh rapidsmpf::rapidsmpf $ + $ maybe_asan + ) +endforeach() install( TARGETS rapidsmpfndsh COMPONENT benchmarking From ed723dced0837d024f93eb3443c373cff18f11b9 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Tue, 9 Dec 2025 17:56:33 -0800 Subject: [PATCH 07/11] Push part filter into read_parquet --- cpp/benchmarks/streaming/ndsh/q17.cpp | 145 +++++++++++++++++--------- 1 file changed, 97 insertions(+), 48 deletions(-) diff --git a/cpp/benchmarks/streaming/ndsh/q17.cpp b/cpp/benchmarks/streaming/ndsh/q17.cpp index 60ecdaf94..dc2deea42 100644 --- a/cpp/benchmarks/streaming/ndsh/q17.cpp +++ b/cpp/benchmarks/streaming/ndsh/q17.cpp @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -103,18 +104,85 @@ rapidsmpf::streaming::Node read_part( auto options = cudf::io::parquet_reader_options::builder(cudf::io::source_info(files)) .columns({"p_partkey", "p_brand", "p_container"}) .build(); + + // Build the filter expression: p_brand = 'Brand#23' AND p_container = 'MED BOX' + auto owner = new std::vector; + auto filter_stream = ctx->br()->stream_pool().get_stream(); + + // 0: column_reference for p_brand + owner->push_back(std::make_shared(1)); + + // 1: column_reference for p_container + owner->push_back(std::make_shared(2)); + + // 2, 3: string_scalars + owner->push_back( + std::make_shared("Brand#23", true, filter_stream) + ); + owner->push_back( + std::make_shared("MED BOX", true, filter_stream) + ); + + // 4, 5: literals + owner->push_back( + std::make_shared( + *std::any_cast>(owner->at(2)) + ) + ); + owner->push_back( + std::make_shared( + *std::any_cast>(owner->at(3)) + ) + ); + + // 6: operation (EQUAL, p_brand, "Brand#23") + owner->push_back( + std::make_shared( + cudf::ast::ast_operator::EQUAL, + *std::any_cast>(owner->at(0)), + *std::any_cast>(owner->at(4)) + ) + ); + + // 7: operation (EQUAL, p_container, "MED BOX") + owner->push_back( + std::make_shared( + cudf::ast::ast_operator::EQUAL, + *std::any_cast>(owner->at(1)), + *std::any_cast>(owner->at(5)) + ) + ); + + // 8: operation (LOGICAL_AND, brand_eq, container_eq) + owner->push_back( + std::make_shared( + cudf::ast::ast_operator::LOGICAL_AND, + *std::any_cast>(owner->at(6)), + *std::any_cast>(owner->at(7)) + ) + ); + + auto filter = std::make_unique( + filter_stream, + *std::any_cast>(owner->back()), + rapidsmpf::OwningWrapper(static_cast(owner), [](void* p) { + delete static_cast*>(p); + }) + ); + return rapidsmpf::streaming::node::read_parquet( - ctx, ch_out, num_producers, options, num_rows_per_chunk + ctx, ch_out, num_producers, options, num_rows_per_chunk, std::move(filter) ); } -rapidsmpf::streaming::Node filter_part( +// Select specific columns from the input table +rapidsmpf::streaming::Node select_columns( std::shared_ptr ctx, std::shared_ptr ch_in, - std::shared_ptr ch_out + std::shared_ptr ch_out, + std::vector indices ) { rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; - auto mr = ctx->br()->device_mr(); while (true) { auto msg = co_await ch_in->receive(); if (msg.empty()) { @@ -125,46 +193,25 @@ rapidsmpf::streaming::Node filter_part( ctx, msg.release() ); auto chunk_stream = chunk.stream(); + auto sequence_number = msg.sequence_number(); auto table = chunk.table_view(); - auto p_brand = table.column(1); - auto p_container = table.column(2); - - auto brand_target = cudf::make_string_scalar("Brand#23", chunk_stream, mr); - auto container_target = cudf::make_string_scalar("MED BOX", chunk_stream, mr); - - auto brand_mask = cudf::binary_operation( - p_brand, - *brand_target, - cudf::binary_operator::EQUAL, - cudf::data_type(cudf::type_id::BOOL8), - chunk_stream, - mr - ); - auto container_mask = cudf::binary_operation( - p_container, - *container_target, - cudf::binary_operator::EQUAL, - cudf::data_type(cudf::type_id::BOOL8), - chunk_stream, - mr - ); - auto combined_mask = cudf::binary_operation( - brand_mask->view(), - container_mask->view(), - cudf::binary_operator::LOGICAL_AND, - cudf::data_type(cudf::type_id::BOOL8), - chunk_stream, - mr - ); + std::vector> result; + result.reserve(indices.size()); + for (auto idx : indices) { + result.push_back( + std::make_unique( + table.column(idx), chunk_stream, ctx->br()->device_mr() + ) + ); + } + + auto result_table = std::make_unique(std::move(result)); co_await ch_out->send( rapidsmpf::streaming::to_message( - msg.sequence_number(), + sequence_number, std::make_unique( - cudf::apply_boolean_mask( - table.select({0}), combined_mask->view(), chunk_stream, mr - ), - chunk_stream + std::move(result_table), chunk_stream ) ) ); @@ -708,17 +755,19 @@ int main(int argc, char** argv) { { RAPIDSMPF_NVTX_SCOPED_RANGE("Constructing Q17 pipeline"); - // Read part and filter + // Read part with filter pushed down, then project to just p_partkey auto part = ctx->create_channel(); - auto filtered_part = ctx->create_channel(); + auto projected_part = ctx->create_channel(); nodes.push_back(read_part( ctx, part, /* num_tickets */ 4, cmd_options.num_rows_per_chunk, cmd_options.input_directory - )); // p_partkey, p_brand, p_container - nodes.push_back(filter_part(ctx, part, filtered_part)); // p_partkey + )); // p_partkey, p_brand, p_container (filtered) + nodes.push_back( + select_columns(ctx, part, projected_part, {0}) + ); // p_partkey // Read lineitem auto lineitem = ctx->create_channel(); @@ -733,14 +782,14 @@ int main(int argc, char** argv) { // Inner join: part x lineitem on p_partkey = l_partkey auto part_x_lineitem = ctx->create_channel(); if (cmd_options.use_shuffle_join) { - auto filtered_part_shuffled = ctx->create_channel(); + auto projected_part_shuffled = ctx->create_channel(); auto lineitem_shuffled = ctx->create_channel(); std::uint32_t num_partitions = 16; nodes.push_back( rapidsmpf::ndsh::shuffle( ctx, - filtered_part, - filtered_part_shuffled, + projected_part, + projected_part_shuffled, {0}, num_partitions, rapidsmpf::OpID{ @@ -763,7 +812,7 @@ int main(int argc, char** argv) { nodes.push_back( rapidsmpf::ndsh::inner_join_shuffle( ctx, - filtered_part_shuffled, + projected_part_shuffled, lineitem_shuffled, part_x_lineitem, {0}, @@ -775,7 +824,7 @@ int main(int argc, char** argv) { nodes.push_back( rapidsmpf::ndsh::inner_join_broadcast( ctx, - filtered_part, + projected_part, lineitem, part_x_lineitem, {0}, From 587780c14bf373f3613d2a4a41064a4935387c29 Mon Sep 17 00:00:00 2001 From: niranda perera Date: Wed, 10 Dec 2025 16:19:08 -0800 Subject: [PATCH 08/11] addressing my comments Signed-off-by: niranda perera --- cpp/benchmarks/streaming/ndsh/q17.cpp | 607 ++++++++++---------------- 1 file changed, 237 insertions(+), 370 deletions(-) diff --git a/cpp/benchmarks/streaming/ndsh/q17.cpp b/cpp/benchmarks/streaming/ndsh/q17.cpp index dc2deea42..60c1ea520 100644 --- a/cpp/benchmarks/streaming/ndsh/q17.cpp +++ b/cpp/benchmarks/streaming/ndsh/q17.cpp @@ -245,24 +245,20 @@ rapidsmpf::streaming::Node compute_avg_quantity( table.select({0}), cudf::null_policy::EXCLUDE, cudf::sorted::NO ); auto requests = std::vector(); - std::vector> sum_aggs; - sum_aggs.push_back(cudf::make_sum_aggregation()); - std::vector> count_aggs; - count_aggs.push_back(cudf::make_count_aggregation()); + std::vector> aggs; + aggs.push_back(cudf::make_sum_aggregation()); + aggs.push_back(cudf::make_count_aggregation()); requests.push_back( - cudf::groupby::aggregation_request(table.column(1), std::move(sum_aggs)) - ); - requests.push_back( - cudf::groupby::aggregation_request(table.column(1), std::move(count_aggs)) + cudf::groupby::aggregation_request(table.column(1), std::move(aggs)) ); auto [keys, results] = grouper.aggregate(requests, chunk_stream, ctx->br()->device_mr()); // Output: p_partkey, sum(l_quantity), count(l_quantity) - std::vector> result; - result.push_back(std::move(keys->release()[0])); - result.push_back(std::move(results[0].results[0])); // sum - result.push_back(std::move(results[1].results[0])); // count + auto result = keys->release(); + for (auto&& r : results) { + std::ranges::move(r.results, std::back_inserter(result)); + } co_await ch_out->send( rapidsmpf::streaming::to_message( @@ -276,6 +272,16 @@ rapidsmpf::streaming::Node compute_avg_quantity( co_await ch_out->drain(ctx->executor()); } +template +std::unique_ptr column_from_value( + const T& value, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr +) { + rmm::device_uvector vec(1, stream, mr); + vec.set_element_async(0, value, stream); + + return std::make_unique(std::move(vec), {}, 0); +} + // Final aggregation after the second join and filter rapidsmpf::streaming::Node final_aggregation( std::shared_ptr ctx, @@ -338,25 +344,16 @@ rapidsmpf::streaming::Node final_aggregation( } // Create result table with local sum - std::unique_ptr local_result{nullptr}; auto chunk_stream = rmm::cuda_stream_view{}; - if (local_sum != 0.0) { - auto sum_scalar = cudf::make_numeric_scalar( - cudf::data_type(cudf::type_id::FLOAT64), chunk_stream, ctx->br()->device_mr() - ); - static_cast*>(sum_scalar.get()) - ->set_value(local_sum, chunk_stream); - - std::vector> result_cols; - result_cols.push_back( - cudf::make_column_from_scalar( - *sum_scalar, 1, chunk_stream, ctx->br()->device_mr() - ) - ); - local_result = std::make_unique(std::move(result_cols)); - } if (ctx->comm()->nranks() > 1) { + std::unique_ptr local_result{nullptr}; + if (local_sum > 0.0) { + local_result = std::make_unique( + column_from_value(local_sum, chunk_stream, ctx->br()->device_mr()) + ); + } + // Gather results from all ranks rapidsmpf::streaming::AllGather gatherer{ctx, tag}; if (local_result) { @@ -398,50 +395,27 @@ rapidsmpf::streaming::Node final_aggregation( auto total_sum = static_cast&>(*sum_result) .value(chunk_stream); auto avg_yearly_val = total_sum / 7.0; - auto avg_yearly_scalar = cudf::make_numeric_scalar( - cudf::data_type(cudf::type_id::FLOAT64), - chunk_stream, - ctx->br()->device_mr() - ); - static_cast*>(avg_yearly_scalar.get()) - ->set_value(avg_yearly_val, chunk_stream); - - std::vector> result_cols; - result_cols.push_back( - cudf::make_column_from_scalar( - *avg_yearly_scalar, 1, chunk_stream, ctx->br()->device_mr() - ) - ); co_await ch_out->send( rapidsmpf::streaming::to_message( 0, std::make_unique( - std::make_unique(std::move(result_cols)), + std::make_unique(column_from_value( + avg_yearly_val, chunk_stream, ctx->br()->device_mr() + )), chunk_stream ) ) ); } else { // No data after filtering - send empty result with 0.0 - auto zero_scalar = cudf::make_numeric_scalar( - cudf::data_type(cudf::type_id::FLOAT64), - chunk_stream, - ctx->br()->device_mr() - ); - static_cast*>(zero_scalar.get()) - ->set_value(0.0, chunk_stream); - std::vector> result_cols; - result_cols.push_back( - cudf::make_column_from_scalar( - *zero_scalar, 1, chunk_stream, ctx->br()->device_mr() - ) - ); co_await ch_out->send( rapidsmpf::streaming::to_message( 0, std::make_unique( - std::make_unique(std::move(result_cols)), + std::make_unique(column_from_value( + 0.0, chunk_stream, ctx->br()->device_mr() + )), chunk_stream ) ) @@ -451,70 +425,20 @@ rapidsmpf::streaming::Node final_aggregation( // Non-zero ranks don't send anything (following q09 pattern) } else { // Single rank: divide by 7.0 here - if (local_result) { - auto sum_val = - static_cast*>( - cudf::reduce( - local_result->view().column(0), - *cudf::make_sum_aggregation(), - cudf::data_type(cudf::type_id::FLOAT64), - chunk_stream, - ctx->br()->device_mr() - ) - .get() - ) - ->value(chunk_stream); - - auto avg_yearly_val = sum_val / 7.0; - auto avg_yearly_scalar = cudf::make_numeric_scalar( - cudf::data_type(cudf::type_id::FLOAT64), - chunk_stream, - ctx->br()->device_mr() - ); - static_cast*>(avg_yearly_scalar.get()) - ->set_value(avg_yearly_val, chunk_stream); - std::vector> result_cols; - result_cols.push_back( - cudf::make_column_from_scalar( - *avg_yearly_scalar, 1, chunk_stream, ctx->br()->device_mr() - ) - ); + auto avg_yearly_val = local_sum / 7.0; - co_await ch_out->send( - rapidsmpf::streaming::to_message( - 0, - std::make_unique( - std::make_unique(std::move(result_cols)), - chunk_stream - ) - ) - ); - } else { - // No data after filtering - send result with 0.0 - auto zero_scalar = cudf::make_numeric_scalar( - cudf::data_type(cudf::type_id::FLOAT64), - chunk_stream, - ctx->br()->device_mr() - ); - static_cast*>(zero_scalar.get()) - ->set_value(0.0, chunk_stream); - std::vector> result_cols; - result_cols.push_back( - cudf::make_column_from_scalar( - *zero_scalar, 1, chunk_stream, ctx->br()->device_mr() - ) - ); - co_await ch_out->send( - rapidsmpf::streaming::to_message( - 0, - std::make_unique( - std::make_unique(std::move(result_cols)), - chunk_stream - ) + co_await ch_out->send( + rapidsmpf::streaming::to_message( + 0, + std::make_unique( + std::make_unique(column_from_value( + avg_yearly_val, chunk_stream, ctx->br()->device_mr() + )), + chunk_stream ) - ); - } + ) + ); } co_await ch_out->drain(ctx->executor()); } @@ -554,6 +478,195 @@ rapidsmpf::streaming::Node round_result( co_await ch_out->drain(ctx->executor()); } +rapidsmpf::streaming::Node groupby_avg_quantity( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out, + rapidsmpf::OpID tag +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + co_await ctx->executor()->schedule(); + auto msg = co_await ch_in->receive(); + auto next = co_await ch_in->receive(); + RAPIDSMPF_EXPECTS(next.empty(), "Expecting concatenated input"); + + auto chunk = + rapidsmpf::ndsh::to_device(ctx, msg.release()); + auto chunk_stream = chunk.stream(); + auto table = chunk.table_view(); + + std::unique_ptr local_result{nullptr}; + if (!table.is_empty()) { + // table has: p_partkey, sum(l_quantity), count(l_quantity) + // Group by p_partkey and sum the sums and counts + auto grouper = cudf::groupby::groupby( + table.select({0}), cudf::null_policy::EXCLUDE, cudf::sorted::NO + ); + auto requests = std::vector(); + std::vector> sum_aggs1; + sum_aggs1.push_back(cudf::make_sum_aggregation()); + std::vector> sum_aggs2; + sum_aggs2.push_back(cudf::make_sum_aggregation()); + requests.push_back( + cudf::groupby::aggregation_request(table.column(1), std::move(sum_aggs1)) + ); + requests.push_back( + cudf::groupby::aggregation_request(table.column(2), std::move(sum_aggs2)) + ); + auto [keys, results] = + grouper.aggregate(requests, chunk_stream, ctx->br()->device_mr()); + + // Output: p_partkey (as key), sum(l_quantity), + // count(l_quantity) Don't compute avg here - do it after + // global aggregation + auto result = keys->release(); + result.push_back(std::move(results[0].results[0])); // sum + result.push_back(std::move(results[1].results[0])); // count + local_result = std::make_unique(std::move(result)); + } + + if (ctx->comm()->nranks() > 1) { + rapidsmpf::streaming::AllGather gatherer{ctx, tag}; + if (local_result) { + auto pack = + cudf::pack(local_result->view(), chunk_stream, ctx->br()->device_mr()); + gatherer.insert( + 0, + {rapidsmpf::PackedData( + std::move(pack.metadata), + ctx->br()->move(std::move(pack.gpu_data), chunk_stream) + )} + ); + } + gatherer.insert_finished(); + auto packed_data = + co_await gatherer.extract_all(rapidsmpf::streaming::AllGather::Ordered::NO); + if (ctx->comm()->rank() == 0) { + auto global_result = rapidsmpf::unpack_and_concat( + rapidsmpf::unspill_partitions( + std::move(packed_data), ctx->br(), true, ctx->statistics() + ), + chunk_stream, + ctx->br(), + ctx->statistics() + ); + + auto result_view = global_result->view(); + // result_view has: p_partkey, sum, count + // Group by p_partkey and sum both the sums and counts + auto grouper = cudf::groupby::groupby( + result_view.select({0}), cudf::null_policy::EXCLUDE, cudf::sorted::NO + ); + auto requests = std::vector(); + std::vector> sum_aggs1; + sum_aggs1.push_back(cudf::make_sum_aggregation()); + std::vector> sum_aggs2; + sum_aggs2.push_back(cudf::make_sum_aggregation()); + requests.push_back( + cudf::groupby::aggregation_request( + result_view.column(1), std::move(sum_aggs1) + ) + ); + requests.push_back( + cudf::groupby::aggregation_request( + result_view.column(2), std::move(sum_aggs2) + ) + ); + auto [keys, results] = + grouper.aggregate(requests, chunk_stream, ctx->br()->device_mr()); + global_result.reset(); + + // Compute mean = sum / count, then multiply by 0.2 + auto sum_col = results[0].results[0]->view(); + auto count_col = results[1].results[0]->view(); + auto mean_col = cudf::binary_operation( + sum_col, + count_col, + cudf::binary_operator::DIV, + cudf::data_type(cudf::type_id::FLOAT64), + chunk_stream, + ctx->br()->device_mr() + ); + + auto scalar_02 = cudf::make_numeric_scalar( + cudf::data_type(cudf::type_id::FLOAT64), + chunk_stream, + ctx->br()->device_mr() + ); + static_cast*>(scalar_02.get()) + ->set_value(0.2, chunk_stream); + auto avg_quantity = cudf::binary_operation( + mean_col->view(), + *scalar_02, + cudf::binary_operator::MUL, + cudf::data_type(cudf::type_id::FLOAT64), + chunk_stream, + ctx->br()->device_mr() + ); + + auto result = keys->release(); + result.push_back(std::move(avg_quantity)); + co_await ch_out->send( + rapidsmpf::streaming::to_message( + 0, + std::make_unique( + std::make_unique(std::move(result)), chunk_stream + ) + ) + ); + } else { + // Non-zero ranks: send empty table with correct schema + // Schema: key (INT64), avg_quantity (FLOAT64) + std::vector> empty_cols; + empty_cols.push_back( + cudf::make_empty_column(cudf::data_type(cudf::type_id::INT64)) + ); + empty_cols.push_back( + cudf::make_empty_column(cudf::data_type(cudf::type_id::FLOAT64)) + ); + co_await ch_out->send( + rapidsmpf::streaming::to_message( + 0, + std::make_unique( + std::make_unique(std::move(empty_cols)), chunk_stream + ) + ) + ); + } + } else { + if (local_result) { + // TODO: should we calculate the avg here????? + co_await ch_out->send( + rapidsmpf::streaming::to_message( + 0, + std::make_unique( + std::move(local_result), chunk_stream + ) + ) + ); + } else { + // Single rank with no data: send empty table with correct + // schema + std::vector> empty_cols; + empty_cols.push_back( + cudf::make_empty_column(cudf::data_type(cudf::type_id::INT64)) + ); + empty_cols.push_back( + cudf::make_empty_column(cudf::data_type(cudf::type_id::FLOAT64)) + ); + co_await ch_out->send( + rapidsmpf::streaming::to_message( + 0, + std::make_unique( + std::make_unique(std::move(empty_cols)), chunk_stream + ) + ) + ); + } + } + co_await ch_out->drain(ctx->executor()); +} + rapidsmpf::streaming::Node write_parquet( std::shared_ptr ctx, std::shared_ptr ch_in, @@ -868,258 +981,12 @@ int main(int argc, char** argv) { // Final groupby for avg_quantity across all chunks auto avg_quantity_final = ctx->create_channel(); - nodes.push_back( - []( - std::shared_ptr ctx, - std::shared_ptr ch_in, - std::shared_ptr ch_out, - rapidsmpf::OpID tag - ) -> rapidsmpf::streaming::Node { - rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; - co_await ctx->executor()->schedule(); - auto msg = co_await ch_in->receive(); - auto next = co_await ch_in->receive(); - RAPIDSMPF_EXPECTS(next.empty(), "Expecting concatenated input"); - - auto chunk = rapidsmpf::ndsh::to_device( - ctx, msg.release() - ); - auto chunk_stream = chunk.stream(); - auto table = chunk.table_view(); - - std::unique_ptr local_result{nullptr}; - if (!table.is_empty()) { - // table has: p_partkey, sum(l_quantity), count(l_quantity) - // Group by p_partkey and sum the sums and counts - auto grouper = cudf::groupby::groupby( - table.select({0}), - cudf::null_policy::EXCLUDE, - cudf::sorted::NO - ); - auto requests = - std::vector(); - std::vector> - sum_aggs1; - sum_aggs1.push_back( - cudf::make_sum_aggregation() - ); - std::vector> - sum_aggs2; - sum_aggs2.push_back( - cudf::make_sum_aggregation() - ); - requests.push_back( - cudf::groupby::aggregation_request( - table.column(1), std::move(sum_aggs1) - ) - ); - requests.push_back( - cudf::groupby::aggregation_request( - table.column(2), std::move(sum_aggs2) - ) - ); - auto [keys, results] = grouper.aggregate( - requests, chunk_stream, ctx->br()->device_mr() - ); - - // Output: p_partkey (as key), sum(l_quantity), - // count(l_quantity) Don't compute avg here - do it after - // global aggregation - auto result = keys->release(); - result.push_back(std::move(results[0].results[0])); // sum - result.push_back(std::move(results[1].results[0])); // count - local_result = - std::make_unique(std::move(result)); - } - - if (ctx->comm()->nranks() > 1) { - rapidsmpf::streaming::AllGather gatherer{ctx, tag}; - if (local_result) { - auto pack = cudf::pack( - local_result->view(), - chunk_stream, - ctx->br()->device_mr() - ); - gatherer.insert( - 0, - {rapidsmpf::PackedData( - std::move(pack.metadata), - ctx->br()->move( - std::move(pack.gpu_data), chunk_stream - ) - )} - ); - } - gatherer.insert_finished(); - auto packed_data = co_await gatherer.extract_all( - rapidsmpf::streaming::AllGather::Ordered::NO - ); - if (ctx->comm()->rank() == 0) { - auto global_result = rapidsmpf::unpack_and_concat( - rapidsmpf::unspill_partitions( - std::move(packed_data), - ctx->br(), - true, - ctx->statistics() - ), - chunk_stream, - ctx->br(), - ctx->statistics() - ); - - auto result_view = global_result->view(); - // result_view has: p_partkey, sum, count - // Group by p_partkey and sum both the sums and counts - auto grouper = cudf::groupby::groupby( - result_view.select({0}), - cudf::null_policy::EXCLUDE, - cudf::sorted::NO - ); - auto requests = - std::vector(); - std::vector> - sum_aggs1; - sum_aggs1.push_back( - cudf::make_sum_aggregation< - cudf::groupby_aggregation>() - ); - std::vector> - sum_aggs2; - sum_aggs2.push_back( - cudf::make_sum_aggregation< - cudf::groupby_aggregation>() - ); - requests.push_back( - cudf::groupby::aggregation_request( - result_view.column(1), std::move(sum_aggs1) - ) - ); - requests.push_back( - cudf::groupby::aggregation_request( - result_view.column(2), std::move(sum_aggs2) - ) - ); - auto [keys, results] = grouper.aggregate( - requests, chunk_stream, ctx->br()->device_mr() - ); - global_result.reset(); - - // Compute mean = sum / count, then multiply by 0.2 - auto sum_col = results[0].results[0]->view(); - auto count_col = results[1].results[0]->view(); - auto mean_col = cudf::binary_operation( - sum_col, - count_col, - cudf::binary_operator::DIV, - cudf::data_type(cudf::type_id::FLOAT64), - chunk_stream, - ctx->br()->device_mr() - ); - - auto scalar_02 = cudf::make_numeric_scalar( - cudf::data_type(cudf::type_id::FLOAT64), - chunk_stream, - ctx->br()->device_mr() - ); - static_cast*>( - scalar_02.get() - ) - ->set_value(0.2, chunk_stream); - auto avg_quantity = cudf::binary_operation( - mean_col->view(), - *scalar_02, - cudf::binary_operator::MUL, - cudf::data_type(cudf::type_id::FLOAT64), - chunk_stream, - ctx->br()->device_mr() - ); - - auto result = keys->release(); - result.push_back(std::move(avg_quantity)); - co_await ch_out->send( - rapidsmpf::streaming::to_message( - 0, - std::make_unique< - rapidsmpf::streaming::TableChunk>( - std::make_unique( - std::move(result) - ), - chunk_stream - ) - ) - ); - } else { - // Non-zero ranks: send empty table with correct schema - // Schema: key (INT64), avg_quantity (FLOAT64) - std::vector> empty_cols; - empty_cols.push_back( - cudf::make_empty_column( - cudf::data_type(cudf::type_id::INT64) - ) - ); - empty_cols.push_back( - cudf::make_empty_column( - cudf::data_type(cudf::type_id::FLOAT64) - ) - ); - co_await ch_out->send( - rapidsmpf::streaming::to_message( - 0, - std::make_unique< - rapidsmpf::streaming::TableChunk>( - std::make_unique( - std::move(empty_cols) - ), - chunk_stream - ) - ) - ); - } - } else { - if (local_result) { - co_await ch_out->send( - rapidsmpf::streaming::to_message( - 0, - std::make_unique< - rapidsmpf::streaming::TableChunk>( - std::move(local_result), chunk_stream - ) - ) - ); - } else { - // Single rank with no data: send empty table with correct - // schema - std::vector> empty_cols; - empty_cols.push_back( - cudf::make_empty_column( - cudf::data_type(cudf::type_id::INT64) - ) - ); - empty_cols.push_back( - cudf::make_empty_column( - cudf::data_type(cudf::type_id::FLOAT64) - ) - ); - co_await ch_out->send( - rapidsmpf::streaming::to_message( - 0, - std::make_unique< - rapidsmpf::streaming::TableChunk>( - std::make_unique( - std::move(empty_cols) - ), - chunk_stream - ) - ) - ); - } - } - co_await ch_out->drain(ctx->executor()); - }(ctx, - avg_quantity_concatenated, - avg_quantity_final, - rapidsmpf::OpID{static_cast(10 * i + op_id++)}) - ); + nodes.push_back(groupby_avg_quantity( + ctx, + avg_quantity_concatenated, + avg_quantity_final, + rapidsmpf::OpID{static_cast(10 * i + op_id++)} + )); // Join part_x_lineitem with avg_quantity on p_partkey = key // avg_quantity_final is small (~199K rows), so broadcast it From 0c5511713180c241d61d369737bf33f0fa66f576 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Thu, 11 Dec 2025 08:12:43 -0800 Subject: [PATCH 09/11] Use column name references in AST expressions Avoid including the filter columns in the output table. --- cpp/benchmarks/streaming/ndsh/q17.cpp | 72 +++++---------------------- 1 file changed, 13 insertions(+), 59 deletions(-) diff --git a/cpp/benchmarks/streaming/ndsh/q17.cpp b/cpp/benchmarks/streaming/ndsh/q17.cpp index 60c1ea520..f8b0fa0fc 100644 --- a/cpp/benchmarks/streaming/ndsh/q17.cpp +++ b/cpp/benchmarks/streaming/ndsh/q17.cpp @@ -102,18 +102,16 @@ rapidsmpf::streaming::Node read_part( get_table_path(input_directory, "part") ); auto options = cudf::io::parquet_reader_options::builder(cudf::io::source_info(files)) - .columns({"p_partkey", "p_brand", "p_container"}) + .columns({"p_partkey"}) .build(); // Build the filter expression: p_brand = 'Brand#23' AND p_container = 'MED BOX' auto owner = new std::vector; auto filter_stream = ctx->br()->stream_pool().get_stream(); - // 0: column_reference for p_brand - owner->push_back(std::make_shared(1)); - - // 1: column_reference for p_container - owner->push_back(std::make_shared(2)); + // 0, 1: column references + owner->push_back(std::make_shared("p_brand")); + owner->push_back(std::make_shared("p_container")); // 2, 3: string_scalars owner->push_back( @@ -139,7 +137,9 @@ rapidsmpf::streaming::Node read_part( owner->push_back( std::make_shared( cudf::ast::ast_operator::EQUAL, - *std::any_cast>(owner->at(0)), + *std::any_cast>( + owner->at(0) + ), *std::any_cast>(owner->at(4)) ) ); @@ -148,7 +148,9 @@ rapidsmpf::streaming::Node read_part( owner->push_back( std::make_shared( cudf::ast::ast_operator::EQUAL, - *std::any_cast>(owner->at(1)), + *std::any_cast>( + owner->at(1) + ), *std::any_cast>(owner->at(5)) ) ); @@ -175,50 +177,6 @@ rapidsmpf::streaming::Node read_part( ); } -// Select specific columns from the input table -rapidsmpf::streaming::Node select_columns( - std::shared_ptr ctx, - std::shared_ptr ch_in, - std::shared_ptr ch_out, - std::vector indices -) { - rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; - while (true) { - auto msg = co_await ch_in->receive(); - if (msg.empty()) { - break; - } - co_await ctx->executor()->schedule(); - auto chunk = rapidsmpf::ndsh::to_device( - ctx, msg.release() - ); - auto chunk_stream = chunk.stream(); - auto sequence_number = msg.sequence_number(); - auto table = chunk.table_view(); - - std::vector> result; - result.reserve(indices.size()); - for (auto idx : indices) { - result.push_back( - std::make_unique( - table.column(idx), chunk_stream, ctx->br()->device_mr() - ) - ); - } - - auto result_table = std::make_unique(std::move(result)); - co_await ch_out->send( - rapidsmpf::streaming::to_message( - sequence_number, - std::make_unique( - std::move(result_table), chunk_stream - ) - ) - ); - } - co_await ch_out->drain(ctx->executor()); -} - // Node to compute sum and count of quantity per partkey rapidsmpf::streaming::Node compute_avg_quantity( std::shared_ptr ctx, @@ -870,17 +828,13 @@ int main(int argc, char** argv) { // Read part with filter pushed down, then project to just p_partkey auto part = ctx->create_channel(); - auto projected_part = ctx->create_channel(); nodes.push_back(read_part( ctx, part, /* num_tickets */ 4, cmd_options.num_rows_per_chunk, cmd_options.input_directory - )); // p_partkey, p_brand, p_container (filtered) - nodes.push_back( - select_columns(ctx, part, projected_part, {0}) - ); // p_partkey + )); // p_partkey (filtered) // Read lineitem auto lineitem = ctx->create_channel(); @@ -901,7 +855,7 @@ int main(int argc, char** argv) { nodes.push_back( rapidsmpf::ndsh::shuffle( ctx, - projected_part, + part, projected_part_shuffled, {0}, num_partitions, @@ -937,7 +891,7 @@ int main(int argc, char** argv) { nodes.push_back( rapidsmpf::ndsh::inner_join_broadcast( ctx, - projected_part, + part, lineitem, part_x_lineitem, {0}, From f513b47024c942ca17ca2937b2ae8f5deae64c50 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Thu, 11 Dec 2025 08:20:38 -0800 Subject: [PATCH 10/11] compile fixes --- cpp/benchmarks/streaming/ndsh/q17.cpp | 35 ++++++++++++++++----------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/cpp/benchmarks/streaming/ndsh/q17.cpp b/cpp/benchmarks/streaming/ndsh/q17.cpp index f8b0fa0fc..ade21c2f8 100644 --- a/cpp/benchmarks/streaming/ndsh/q17.cpp +++ b/cpp/benchmarks/streaming/ndsh/q17.cpp @@ -237,7 +237,9 @@ std::unique_ptr column_from_value( rmm::device_uvector vec(1, stream, mr); vec.set_element_async(0, value, stream); - return std::make_unique(std::move(vec), {}, 0); + return std::make_unique( + cudf::data_type{cudf::type_to_id()}, 1, vec.release(), rmm::device_buffer{}, 0 + ); } // Final aggregation after the second join and filter @@ -307,9 +309,11 @@ rapidsmpf::streaming::Node final_aggregation( if (ctx->comm()->nranks() > 1) { std::unique_ptr local_result{nullptr}; if (local_sum > 0.0) { - local_result = std::make_unique( + std::vector> columns; + columns.push_back( column_from_value(local_sum, chunk_stream, ctx->br()->device_mr()) ); + local_result = std::make_unique(std::move(columns)); } // Gather results from all ranks @@ -354,27 +358,29 @@ rapidsmpf::streaming::Node final_aggregation( .value(chunk_stream); auto avg_yearly_val = total_sum / 7.0; + std::vector> cols1; + cols1.push_back(column_from_value( + avg_yearly_val, chunk_stream, ctx->br()->device_mr() + )); co_await ch_out->send( rapidsmpf::streaming::to_message( 0, std::make_unique( - std::make_unique(column_from_value( - avg_yearly_val, chunk_stream, ctx->br()->device_mr() - )), - chunk_stream + std::make_unique(std::move(cols1)), chunk_stream ) ) ); } else { // No data after filtering - send empty result with 0.0 + std::vector> cols2; + cols2.push_back( + column_from_value(0.0, chunk_stream, ctx->br()->device_mr()) + ); co_await ch_out->send( rapidsmpf::streaming::to_message( 0, std::make_unique( - std::make_unique(column_from_value( - 0.0, chunk_stream, ctx->br()->device_mr() - )), - chunk_stream + std::make_unique(std::move(cols2)), chunk_stream ) ) ); @@ -386,14 +392,15 @@ rapidsmpf::streaming::Node final_aggregation( auto avg_yearly_val = local_sum / 7.0; + std::vector> cols3; + cols3.push_back( + column_from_value(avg_yearly_val, chunk_stream, ctx->br()->device_mr()) + ); co_await ch_out->send( rapidsmpf::streaming::to_message( 0, std::make_unique( - std::make_unique(column_from_value( - avg_yearly_val, chunk_stream, ctx->br()->device_mr() - )), - chunk_stream + std::make_unique(std::move(cols3)), chunk_stream ) ) ); From f67178fb0ded1393a928eb0777927d32dcf83f7f Mon Sep 17 00:00:00 2001 From: Benjamin Zaitlen Date: Fri, 12 Dec 2025 06:15:52 -0800 Subject: [PATCH 11/11] fix calculation for single rank execution --- cpp/benchmarks/streaming/ndsh/q17.cpp | 47 +++++++++++++++++++++++++-- 1 file changed, 44 insertions(+), 3 deletions(-) diff --git a/cpp/benchmarks/streaming/ndsh/q17.cpp b/cpp/benchmarks/streaming/ndsh/q17.cpp index ade21c2f8..e504063ca 100644 --- a/cpp/benchmarks/streaming/ndsh/q17.cpp +++ b/cpp/benchmarks/streaming/ndsh/q17.cpp @@ -600,18 +600,59 @@ rapidsmpf::streaming::Node groupby_avg_quantity( } } else { if (local_result) { - // TODO: should we calculate the avg here????? + // Single-rank: need to compute avg = 0.2 * (sum / count) just like multi-rank + // local_result has: p_partkey, sum(l_quantity), count(l_quantity) + auto result_view = local_result->view(); + auto sum_col = result_view.column(1); + auto count_col = result_view.column(2); + + // Compute mean = sum / count + auto mean_col = cudf::binary_operation( + sum_col, + count_col, + cudf::binary_operator::DIV, + cudf::data_type(cudf::type_id::FLOAT64), + chunk_stream, + ctx->br()->device_mr() + ); + + // Multiply by 0.2 + auto scalar_02 = cudf::make_numeric_scalar( + cudf::data_type(cudf::type_id::FLOAT64), + chunk_stream, + ctx->br()->device_mr() + ); + static_cast*>(scalar_02.get()) + ->set_value(0.2, chunk_stream); + auto avg_quantity = cudf::binary_operation( + mean_col->view(), + *scalar_02, + cudf::binary_operator::MUL, + cudf::data_type(cudf::type_id::FLOAT64), + chunk_stream, + ctx->br()->device_mr() + ); + + // Output: p_partkey (as key), avg_quantity + std::vector> result; + result.push_back( + std::make_unique( + result_view.column(0), chunk_stream, ctx->br()->device_mr() + ) + ); + result.push_back(std::move(avg_quantity)); + co_await ch_out->send( rapidsmpf::streaming::to_message( 0, std::make_unique( - std::move(local_result), chunk_stream + std::make_unique(std::move(result)), chunk_stream ) ) ); } else { // Single rank with no data: send empty table with correct - // schema + // schema (key, avg_quantity) std::vector> empty_cols; empty_cols.push_back( cudf::make_empty_column(cudf::data_type(cudf::type_id::INT64))