Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions cpp/benchmarks/streaming/ndsh/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,27 @@ install(
DESTINATION bin/benchmarks/librapidsmpf
EXCLUDE_FROM_ALL
)

add_executable(q13 "q13.cpp")
set_target_properties(
q13
PROPERTIES RUNTIME_OUTPUT_DIRECTORY "$<BUILD_INTERFACE:${RAPIDSMPF_BINARY_DIR}/benchmarks/ndsh>"
CXX_STANDARD 20
CXX_STANDARD_REQUIRED ON
CUDA_STANDARD 20
CUDA_STANDARD_REQUIRED ON
)
target_compile_options(
q13 PRIVATE "$<$<COMPILE_LANGUAGE:CXX>:${RAPIDSMPF_CXX_FLAGS}>"
"$<$<COMPILE_LANGUAGE:CUDA>:${RAPIDSMPF_CUDA_FLAGS}>"
)
target_link_libraries(
q13 PRIVATE rapidsmpfndsh rapidsmpf::rapidsmpf $<TARGET_NAME_IF_EXISTS:MPI::MPI_C>
$<TARGET_NAME_IF_EXISTS:conda_env> maybe_asan
)
install(
TARGETS q13
COMPONENT benchmarking
DESTINATION bin/benchmarks/librapidsmpf
EXCLUDE_FROM_ALL
)
136 changes: 136 additions & 0 deletions cpp/benchmarks/streaming/ndsh/join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,81 @@ streaming::Message inner_join_chunk(
);
}

/**
* @brief Join a table chunk against a build hash table returning a message of the result.
*
* @param ctx Streaming context
* @param right_chunk Chunk to join
* @param sequence Sequence number of the output
* @param joiner hash_join object, representing the build table.
* @param build_carrier Columns from the build-side table to be included in the output.
* @param right_on Key column indiecs in `right_chunk`.
* @param build_stream Stream the `joiner` will be deallocated on.
* @param build_event Event recording the creation of the `joiner`.
*
* @return Message of `TableChunk` containing the result of the inner join.
*/
streaming::Message left_join_chunk(
std::shared_ptr<streaming::Context> ctx,
streaming::TableChunk&& left_chunk,
std::uint64_t sequence,
cudf::hash_join& joiner,
cudf::table_view build_carrier,
std::vector<cudf::size_type> left_on,
rmm::cuda_stream_view build_stream,
CudaEvent* build_event
) {
CudaEvent event;
left_chunk = to_device(ctx, std::move(left_chunk));
auto chunk_stream = left_chunk.stream();
build_event->stream_wait(chunk_stream);
auto probe_table = left_chunk.table_view();
auto probe_keys = probe_table.select(left_on);
auto [probe_match, build_match] =
joiner.left_join(probe_keys, std::nullopt, chunk_stream, ctx->br()->device_mr());

cudf::column_view build_indices = // right
cudf::device_span<cudf::size_type const>(*build_match);
cudf::column_view probe_indices = // left
cudf::device_span<cudf::size_type const>(*probe_match);
// build_carrier is valid on build_stream, but chunk_stream is
// waiting for build_stream work to be done, so running this on
// chunk_stream is fine.

// For LEFT join, keep all columns from the probe (left) table including keys,
// since they're always valid (unlike right-side keys which may be NULL).
auto result_columns = cudf::gather(
probe_table,
probe_indices,
cudf::out_of_bounds_policy::DONT_CHECK,
chunk_stream,
ctx->br()->device_mr()
)
->release();

// left join build indices could have sentinel values (INT_MIN), so they will be OOB.
std::ranges::move(
cudf::gather(
build_carrier,
build_indices,
cudf::out_of_bounds_policy::NULLIFY,
chunk_stream,
ctx->br()->device_mr()
)
->release(),
std::back_inserter(result_columns)
);
// Deallocation of the join indices will happen on build_stream, so add stream dep
// This also ensure deallocation of the hash_join object waits for completion.
cuda_stream_join(build_stream, chunk_stream, &event);
return streaming::to_message(
sequence,
std::make_unique<streaming::TableChunk>(
std::make_unique<cudf::table>(std::move(result_columns)), chunk_stream
)
);
}

} // namespace

streaming::Node inner_join_broadcast(
Expand Down Expand Up @@ -347,6 +422,67 @@ streaming::Node inner_join_shuffle(
co_await ch_out->drain(ctx->executor());
}

streaming::Node left_join_shuffle(
std::shared_ptr<streaming::Context> ctx,
std::shared_ptr<streaming::Channel> left,
std::shared_ptr<streaming::Channel> right,
std::shared_ptr<streaming::Channel> ch_out,
std::vector<cudf::size_type> left_on,
std::vector<cudf::size_type> right_on
) {
streaming::ShutdownAtExit c{left, right, ch_out};
ctx->comm()->logger().print("Left shuffle join");
co_await ctx->executor()->schedule();
CudaEvent build_event;
while (true) {
// Requirement: two shuffles kick out partitions in the same order
auto left_msg = co_await left->receive();
auto right_msg = co_await right->receive();
if (left_msg.empty()) {
RAPIDSMPF_EXPECTS(
right_msg.empty(), "Left does not have same number of partitions as right"
);
break;
}
RAPIDSMPF_EXPECTS(
left_msg.sequence_number() == right_msg.sequence_number(),
"Mismatching sequence numbers"
);

// use right as build table
auto build_chunk = to_device(ctx, right_msg.release<streaming::TableChunk>());
auto build_stream = build_chunk.stream();
auto joiner = cudf::hash_join(
build_chunk.table_view().select(right_on),
cudf::null_equality::UNEQUAL,
build_stream
);
build_event.record(build_stream);

// drop key columns from build table.
std::vector<cudf::size_type> to_keep;
std::ranges::copy_if(
std::ranges::iota_view(0, build_chunk.table_view().num_columns()),
std::back_inserter(to_keep),
[&](auto i) { return std::ranges::find(right_on, i) == right_on.end(); }
);
auto build_carrier = build_chunk.table_view().select(to_keep);

auto sequence = left_msg.sequence_number();
co_await ch_out->send(left_join_chunk(
ctx,
left_msg.release<streaming::TableChunk>(),
sequence,
joiner,
build_carrier,
left_on,
build_stream,
&build_event
));
}
co_await ch_out->drain(ctx->executor());
}

streaming::Node shuffle(
std::shared_ptr<streaming::Context> ctx,
std::shared_ptr<streaming::Channel> ch_in,
Expand Down
26 changes: 26 additions & 0 deletions cpp/benchmarks/streaming/ndsh/join.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,32 @@ streaming::Node inner_join_shuffle(
KeepKeys keep_keys = KeepKeys::YES
);

/**
* @brief Perform a streaming inner join between two tables.
*
* @note This performs a shuffle join, the left and right channels are required to provide
* hash-partitioned data in-order.
*
* @param ctx Streaming context.
* @param left Channel of `TableChunk`s in hash-partitioned order.
* @param right Channel of `TableChunk`s in matching hash-partitioned order.
* @param ch_out Output channel of `TableChunk`s.
* @param left_on Column indices of the keys in the left table.
* @param right_on Column indices of the keys in the right table.
* @param keep_keys Does the result contain the key columns, or only "carrier" value
* columns
*
* @return Coroutine representing the completion of the join.
*/
streaming::Node left_join_shuffle(
std::shared_ptr<streaming::Context> ctx,
std::shared_ptr<streaming::Channel> left,
std::shared_ptr<streaming::Channel> right,
std::shared_ptr<streaming::Channel> ch_out,
std::vector<cudf::size_type> left_on,
std::vector<cudf::size_type> right_on
);

/**
* @brief Shuffle the input channel by hash-partitioning on given key columns.
*
Expand Down
Loading