Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
8671c48
TPCH-derived Q3
wence- Nov 14, 2025
c7736d3
Q1
wence- Nov 17, 2025
b4ef3fe
Parallel grouping
wence- Nov 17, 2025
2e1fdf2
Dup the user's communicator when creating our MPI comm wrapper
wence- Dec 4, 2025
32dcff7
Context creation and options parsing into utils
wence- Dec 4, 2025
e24cfc3
Use refactored context/argparse in q03
wence- Dec 4, 2025
50a050a
And in q1
wence- Dec 4, 2025
f55ddef
Q9
wence- Dec 4, 2025
25e60de
Docstring for main
wence- Dec 4, 2025
195e71a
Docstring
wence- Dec 4, 2025
012e9fb
Make broadcast public
wence- Dec 5, 2025
83e73b1
Whack a load of stuff in
wence- Dec 5, 2025
ed8890a
Fix some bugs
wence- Dec 5, 2025
cec0ef0
Use utils in q3 too
wence- Dec 5, 2025
a7c17da
TODO
wence- Dec 5, 2025
96a4884
WIP: bloom filter
wence- Dec 8, 2025
9558a0a
Bloom filter updates
wence- Dec 10, 2025
d9a6046
Shuffle join option and bloom filter in q3
wence- Dec 10, 2025
2fd1255
More stuff
wence- Dec 10, 2025
fafcce9
Bloom filter ranges
wence- Dec 10, 2025
1dce429
Timing info?
wence- Dec 10, 2025
04e09f5
More timings
wence- Dec 10, 2025
f8e483c
Now?
wence- Dec 10, 2025
c0cd406
Print time in logging output
wence- Dec 10, 2025
f74833c
Avoid an alloc
wence- Dec 10, 2025
c2a0bc3
More
wence- Dec 10, 2025
5d66787
Propagate exceptions from parquet chunk read failures
wence- Dec 10, 2025
b42ca4e
Try merging bloom filters on device
wence- Dec 10, 2025
cf3d990
Remove debug
wence- Dec 11, 2025
eacf67a
Done
wence- Dec 11, 2025
aa6d9cc
Thread safety in parquet write
wence- Dec 11, 2025
0e7f3fd
Fixes
wence- Dec 11, 2025
7a14854
Adapt to upstream changes
wence- Dec 11, 2025
1df954a
Fix timestamp types
wence- Dec 12, 2025
48d8fd0
cmake format
wence- Dec 12, 2025
89ef19c
Loop in cmake
wence- Dec 12, 2025
39550ab
event_loop range only in verbose mode
wence- Dec 12, 2025
78fdb14
Merge remote-tracking branch 'upstream/main' into wence/fea/q03
TomAugspurger Dec 12, 2025
6c96b17
Avoid RAPIDSMPF_FUNC_RANGE macro in .cu file
TomAugspurger Dec 12, 2025
74f36a8
Remove GNUism in RAPIDSMPF_NVTX_FUNC_RANGE
wence- Dec 15, 2025
d07605a
Merge remote-tracking branch 'upstream/main' into wence/fea/q03
wence- Dec 15, 2025
4d5a6aa
Finalize MPI with RAII
wence- Dec 15, 2025
483765b
cmake-format
wence- Dec 15, 2025
09c0cc2
Fixes
wence- Dec 15, 2025
b81e2c2
More fixes
wence- Dec 15, 2025
c415379
Add docstring in cmake
wence- Dec 15, 2025
cc7e5e0
WIP: Streaming Q4 implementation
TomAugspurger Nov 26, 2025
632b797
Merge branch 'wence/fea/q03' into tom/streaming-q4
TomAugspurger Dec 15, 2025
ff504f0
fixup! Merge branch 'wence/fea/q03' into tom/streaming-q4
TomAugspurger Dec 15, 2025
07de58c
Merge remote-tracking branch 'upstream/main' into tom/streaming-q4
TomAugspurger Dec 16, 2025
2030807
Use groupby utilities
TomAugspurger Dec 16, 2025
16be63b
fuse final groupby agg
TomAugspurger Dec 16, 2025
f51ef59
revert fuse
TomAugspurger Dec 16, 2025
a3a0413
Add binary
TomAugspurger Dec 16, 2025
b733c9f
fixup
TomAugspurger Dec 16, 2025
7a2780a
fixup
TomAugspurger Dec 16, 2025
d013062
Use a bloom filter
TomAugspurger Dec 16, 2025
8bcaed0
Merge remote-tracking branch 'upstream/main' into tom/streaming-q4
TomAugspurger Dec 17, 2025
5152b12
Note on why we shuffle
TomAugspurger Dec 17, 2025
ea64499
Streams, events, joins
TomAugspurger Dec 19, 2025
49f78f9
Merge remote-tracking branch 'upstream/main' into tom/streaming-q4
TomAugspurger Dec 22, 2025
a049351
Merge remote-tracking branch 'upstream/main' into tom/streaming-q4
TomAugspurger Jan 14, 2026
8098cd2
lint
TomAugspurger Jan 14, 2026
a75c762
Merge remote-tracking branch 'upstream/main' into tom/streaming-q4
TomAugspurger Jan 14, 2026
041c3e8
Merge remote-tracking branch 'upstream/main' into tom/streaming-q4
TomAugspurger Jan 29, 2026
6ff31f9
Compiling again
TomAugspurger Jan 29, 2026
05782b3
remove duplicate log
TomAugspurger Jan 29, 2026
4ae399c
remove unused event
TomAugspurger Jan 29, 2026
3bea0df
fix while condition
TomAugspurger Jan 29, 2026
672af5d
fixes
TomAugspurger Jan 29, 2026
9c8a21a
revert MPI change
TomAugspurger Jan 29, 2026
7c16d67
docstring fixes
TomAugspurger Jan 29, 2026
2c29dc2
Merge remote-tracking branch 'upstream/main' into tom/streaming-q4
TomAugspurger Jan 30, 2026
372851a
to_device compat
TomAugspurger Jan 30, 2026
202b7f7
clarify hash-partitioning
TomAugspurger Jan 30, 2026
206337c
add run-and-validate
TomAugspurger Jan 30, 2026
17ed006
reuse chunkwise_group_by
TomAugspurger Jan 30, 2026
a9613e2
fixed queries parsing
TomAugspurger Jan 30, 2026
7a135e2
Handle dates
TomAugspurger Jan 30, 2026
026c4ec
reuse chunkwise_sort_by
TomAugspurger Jan 30, 2026
52b3934
simplify
TomAugspurger Jan 30, 2026
c01a377
simplify
TomAugspurger Jan 30, 2026
a2d46be
static casts
TomAugspurger Jan 30, 2026
4ddc853
Use KeepKeys::NO
TomAugspurger Jan 30, 2026
ec8b45c
fix loop
TomAugspurger Jan 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/benchmarks/streaming/ndsh/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ target_link_libraries(
$<TARGET_NAME_IF_EXISTS:conda_env> maybe_asan
)

set(RAPIDSMPFNDSH_QUERIES q01 q03 q09 q21 bench_read)
set(RAPIDSMPFNDSH_QUERIES q01 q03 q04 q09 q21 bench_read)

foreach(query IN ITEMS ${RAPIDSMPFNDSH_QUERIES})
add_executable(${query} "${query}.cpp")
Expand Down
196 changes: 196 additions & 0 deletions cpp/benchmarks/streaming/ndsh/join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <cudf/concatenate.hpp>
#include <cudf/contiguous_split.hpp>
#include <cudf/copying.hpp>
#include <cudf/join/filtered_join.hpp>
#include <cudf/join/hash_join.hpp>
#include <cudf/table/table.hpp>
#include <cudf/table/table_view.hpp>
Expand Down Expand Up @@ -144,6 +145,75 @@ streaming::Node broadcast(
co_await ch_out->drain(ctx->executor());
}

/**
* @brief Join a table chunk against a build hash table returning a message of the result.
*
* @param ctx Streaming context
* @param left_chunk Chunk to join. Used as the probe table in a filtered join.
* @param right_chunk Chunk to join. Used as the build table in a filtered join.
* @param left_carrier Columns from `left_chunk` to include in the output.
* @param left_on Key column indices in `left_chunk`.
* @param right_on Key column indices in `right_chunk`.
* @param sequence Sequence number of the output
* @param left_event Event recording the availability of `left_chunk`.
*
* @return Message of `TableChunk` containing the result of the semi join.
*/
streaming::Message semi_join_chunk(
std::shared_ptr<streaming::Context> ctx,
streaming::TableChunk const& left_chunk,
streaming::TableChunk&& right_chunk,
Comment on lines +164 to +165
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you take left_chunk by const ref, but right_chunk by rvalue reference (i.e. caller must move it).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really sure (I'm reading up on the semantics of the two now).

We use this streaming:TableChunk&& type for the other functions (inner_join_chunk) so I suspect I was trying to match that. But when doing a broadcast left semi join, the left_chunk is reused many times, one per chunk, which I think means it can't be moved.

cudf::table_view left_carrier,
std::vector<cudf::size_type> left_on,
std::vector<cudf::size_type> right_on,
std::uint64_t sequence,
CudaEvent* left_event
) {
auto chunk_stream = right_chunk.stream();

left_event->stream_wait(chunk_stream);

// At this point, both left_chunk and right_chunk are valid on
// either stream. We'll do everything from here out on the
// right_chunk.stream(), so that we don't introduce false dependencies
// between the different chunks.

auto joiner = cudf::filtered_join(
right_chunk.table_view().select(right_on),
cudf::null_equality::UNEQUAL,
cudf::set_as_build_table::RIGHT,
chunk_stream
);

auto match = joiner.semi_join(
left_chunk.table_view().select(left_on), chunk_stream, ctx->br()->device_mr()
);

ctx->comm()->logger().debug(
"semi_join_chunk: left.num_rows()=", left_chunk.table_view().num_rows()
);
ctx->comm()->logger().debug("semi_join_chunk: match.size()=", match->size());

cudf::column_view indices = cudf::device_span<cudf::size_type const>(*match);
auto result_columns = cudf::gather(
left_carrier,
indices,
cudf::out_of_bounds_policy::DONT_CHECK,
chunk_stream,
ctx->br()->device_mr()
)
->release();

auto result_table = std::make_unique<cudf::table>(std::move(result_columns));
// Deallocation of the join indices will happen on chunk_stream, so add stream dep
cuda_stream_join(left_chunk.stream(), chunk_stream);

return streaming::to_message(
sequence,
std::make_unique<streaming::TableChunk>(std::move(result_table), chunk_stream)
);
}

/**
* @brief Join a table chunk against a build hash table returning a message of the result.
*
Expand Down Expand Up @@ -345,6 +415,131 @@ streaming::Node inner_join_shuffle(
co_await ch_out->drain(ctx->executor());
}

streaming::Node left_semi_join_broadcast_left(
std::shared_ptr<streaming::Context> ctx,
std::shared_ptr<streaming::Channel> left,
std::shared_ptr<streaming::Channel> right,
std::shared_ptr<streaming::Channel> ch_out,
std::vector<cudf::size_type> left_on,
std::vector<cudf::size_type> right_on,
OpID tag,
KeepKeys keep_keys
) {
streaming::ShutdownAtExit c{left, right, ch_out};
co_await ctx->executor()->schedule();
ctx->comm()->logger().print("Left semi broadcast join ", static_cast<int>(tag));
auto left_table = co_await (co_await broadcast(ctx, left, tag))
.release<streaming::TableChunk>()
.make_available(ctx);
ctx->comm()->logger().print(
"Left (probe) table has ", left_table.table_view().num_rows(), " rows"
);
CudaEvent left_event;
left_event.record(left_table.stream());

cudf::table_view left_carrier;
if (keep_keys == KeepKeys::YES) {
left_carrier = left_table.table_view();
} else {
std::vector<cudf::size_type> to_keep;
std::ranges::copy_if(
std::ranges::iota_view(0, left_table.table_view().num_columns()),
std::back_inserter(to_keep),
[&](auto i) { return std::ranges::find(left_on, i) == left_on.end(); }
);
left_carrier = left_table.table_view().select(to_keep);
}

while (!ch_out->is_shutdown()) {
auto right_msg = co_await right->receive();
if (right_msg.empty()) {
break;
}
// The ``right`` table has been hash-partitioned (via a shuffle) on
// the join key. Thanks to the hash-partitioning, we don't need to worry
// about deduplicating matches across partitions. Anything that matches
// in the semi-join belongs in the output.
auto right_chunk =
co_await right_msg.release<streaming::TableChunk>().make_available(ctx);
co_await ch_out->send(semi_join_chunk(
ctx,
left_table,
std::move(right_chunk),
left_carrier,
left_on,
right_on,
right_msg.sequence_number(),
&left_event
));
}

co_await ch_out->drain(ctx->executor());
}

streaming::Node left_semi_join_shuffle(
std::shared_ptr<streaming::Context> ctx,
std::shared_ptr<streaming::Channel> left,
std::shared_ptr<streaming::Channel> right,
std::shared_ptr<streaming::Channel> ch_out,
std::vector<cudf::size_type> left_on,
std::vector<cudf::size_type> right_on,
KeepKeys keep_keys
) {
streaming::ShutdownAtExit c{left, right, ch_out};
ctx->comm()->logger().print("Shuffle left semi join");

co_await ctx->executor()->schedule();
CudaEvent left_event;

while (!ch_out->is_shutdown()) {
// Requirement: two shuffles kick out partitions in the same order
auto left_msg = co_await left->receive();
auto right_msg = co_await right->receive();

if (left_msg.empty()) {
RAPIDSMPF_EXPECTS(
right_msg.empty(), "Left does not have same number of partitions as right"
);
break;
}
RAPIDSMPF_EXPECTS(
left_msg.sequence_number() == right_msg.sequence_number(),
"Mismatching sequence numbers"
);

auto left_chunk =
co_await left_msg.release<streaming::TableChunk>().make_available(ctx);
auto right_chunk =
co_await right_msg.release<streaming::TableChunk>().make_available(ctx);

left_event.record(left_chunk.stream());

cudf::table_view left_carrier;
if (keep_keys == KeepKeys::YES) {
left_carrier = left_chunk.table_view();
} else {
std::vector<cudf::size_type> to_keep;
std::ranges::copy_if(
std::ranges::iota_view(0, left_chunk.table_view().num_columns()),
std::back_inserter(to_keep),
[&](auto i) { return std::ranges::find(left_on, i) == left_on.end(); }
);
left_carrier = left_chunk.table_view().select(to_keep);
}

co_await ch_out->send(semi_join_chunk(
ctx,
left_chunk,
std::move(right_chunk),
left_carrier,
left_on,
right_on,
left_msg.sequence_number(),
&left_event
));
}
}

streaming::Node shuffle(
std::shared_ptr<streaming::Context> ctx,
std::shared_ptr<streaming::Channel> ch_in,
Expand All @@ -360,6 +555,7 @@ streaming::Node shuffle(
while (true) {
auto msg = co_await ch_in->receive();
if (msg.empty()) {
ctx->comm()->logger().debug("Shuffle: no more input");
break;
}
auto chunk = co_await msg.release<streaming::TableChunk>().make_available(ctx);
Expand Down
63 changes: 62 additions & 1 deletion cpp/benchmarks/streaming/ndsh/join.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/

Expand Down Expand Up @@ -120,6 +120,67 @@ enum class KeepKeys : bool {
KeepKeys keep_keys = KeepKeys::YES
);

/**
* @brief Perform a streaming left semi join between two tables.
*
* @note This performs a broadcast join, broadcasting the table represented by the `left`
* channel to all ranks, and then streaming through the chunks of the `right` channel.
* The `right` channel is required to provide hash-partitioned data in-order.
* All of the chunks from the `left` channel must fit in memory at once.
*
* @param ctx Streaming context.
* @param left Channel of `TableChunk`s.
* @param right Channel of `TableChunk`s in hash-partitioned order (shuffled).
* @param ch_out Output channel of `TableChunk`s.
* @param left_on Column indices of the keys in the left table.
* @param right_on Column indices of the keys in the right table.
* @param tag Disambiguating tag for the broadcast of the left table.
* @param keep_keys Does the result contain the key columns, or only "carrier" value
* columns
*
* @return Coroutine representing the completion of the join.
*/
streaming::Node left_semi_join_broadcast_left(
std::shared_ptr<streaming::Context> ctx,
// We will always choose left as build table and do "broadcast" joins
std::shared_ptr<streaming::Channel> left,
std::shared_ptr<streaming::Channel> right,
std::shared_ptr<streaming::Channel> ch_out,
std::vector<cudf::size_type> left_on,
std::vector<cudf::size_type> right_on,
OpID tag,
KeepKeys keep_keys
);

/**
* @brief Perform a streaming left semi join between two tables.
*
* @note This performs a shuffle join, the left and right channels are required to provide
* hash-partitioned data in-order.
*
* @param ctx Streaming context.
* @param left Channel of `TableChunk`s in hash-partitioned order.
* @param right Channel of `TableChunk`s in matching hash-partitioned order.
* @param ch_out Output channel of `TableChunk`s.
* @param left_on Column indices of the keys in the left table.
* @param right_on Column indices of the keys in the right table.
* @param tag Disambiguating tag for the broadcast of the left table.
* @param keep_keys Does the result contain the key columns, or only "carrier" value
* columns
*
* @return Coroutine representing the completion of the join.
*/

streaming::Node left_semi_join_shuffle(
std::shared_ptr<streaming::Context> ctx,
std::shared_ptr<streaming::Channel> left,
std::shared_ptr<streaming::Channel> right,
std::shared_ptr<streaming::Channel> ch_out,
std::vector<cudf::size_type> left_on,
std::vector<cudf::size_type> right_on,
KeepKeys keep_keys = KeepKeys::YES
);

/**
* @brief Shuffle the input channel by hash-partitioning on given key columns.
*
Expand Down
Loading