Conversation
| ); | ||
| } | ||
|
|
||
| rapidsmpf::streaming::Node filter_part( |
There was a problem hiding this comment.
I think that the filter can be done in the read_parquet with something similar to https://github.com/rapidsai/rapidsmpf/pull/710/changes#diff-9743b2e766c061cb2f29e446eb28ac8761389f601474ecfd11cac9971deb81f7R262-R343. I can try implementing that if you'd like, since I just did it for query 4.
| // Select specific columns from the input table | ||
| rapidsmpf::streaming::Node select_columns( | ||
| std::shared_ptr<rapidsmpf::streaming::Context> ctx, | ||
| std::shared_ptr<rapidsmpf::streaming::Channel> ch_in, | ||
| std::shared_ptr<rapidsmpf::streaming::Channel> ch_out, | ||
| std::vector<cudf::size_type> indices | ||
| ) { |
There was a problem hiding this comment.
maybe this column selection should be a part of post processing in the read_parquet node. Having a node to this immediately after a read_parquet seems a bit redundant.
There was a problem hiding this comment.
Maybe we could apply the selection here
There was a problem hiding this comment.
Then we might not need to do the column copy in L204
There was a problem hiding this comment.
I was wondering about this too, but I wasn't sure whether cudf had a built-in way of saying "read these columns for the filter, but don't include them in the output". If it doesn't have a way of doing that, then I agree doing it after the read but before the return is probably worth doing.
There was a problem hiding this comment.
@TomAugspurger I think the filter is for rows. It's more of a post-processing step in read_parquet node AFAIU.
There was a problem hiding this comment.
Yep it's a row filter. But in some cases (like this one) we only need a column for the filter. If cudf doesn't have a way to include a column for the purpose of filtering, but exclude it from the result table, then we can update our wrapper to perform that selection.
There was a problem hiding this comment.
You can do it in the read_parquet by using column_name_reference in the filter rather than column_reference.
| std::vector<std::unique_ptr<cudf::groupby_aggregation>> sum_aggs; | ||
| sum_aggs.push_back(cudf::make_sum_aggregation<cudf::groupby_aggregation>()); | ||
| std::vector<std::unique_ptr<cudf::groupby_aggregation>> count_aggs; | ||
| count_aggs.push_back(cudf::make_count_aggregation<cudf::groupby_aggregation>()); | ||
| requests.push_back( | ||
| cudf::groupby::aggregation_request(table.column(1), std::move(sum_aggs)) | ||
| ); | ||
| requests.push_back( | ||
| cudf::groupby::aggregation_request(table.column(1), std::move(count_aggs)) | ||
| ); |
There was a problem hiding this comment.
I think this could be simplified as follows, since both sum and count are happening on col 1.
| std::vector<std::unique_ptr<cudf::groupby_aggregation>> sum_aggs; | |
| sum_aggs.push_back(cudf::make_sum_aggregation<cudf::groupby_aggregation>()); | |
| std::vector<std::unique_ptr<cudf::groupby_aggregation>> count_aggs; | |
| count_aggs.push_back(cudf::make_count_aggregation<cudf::groupby_aggregation>()); | |
| requests.push_back( | |
| cudf::groupby::aggregation_request(table.column(1), std::move(sum_aggs)) | |
| ); | |
| requests.push_back( | |
| cudf::groupby::aggregation_request(table.column(1), std::move(count_aggs)) | |
| ); | |
| std::vector<std::unique_ptr<cudf::groupby_aggregation>> aggs; | |
| aggs.push_back(cudf::make_sum_aggregation<cudf::groupby_aggregation>()); | |
| aggs.push_back(cudf::make_count_aggregation<cudf::groupby_aggregation>()); | |
| requests.push_back( | |
| cudf::groupby::aggregation_request(table.column(1), std::move(aggs)) | |
| ); |
There was a problem hiding this comment.
then results will have both sum and count in a single vector
| std::vector<std::unique_ptr<cudf::column>> result; | ||
| result.push_back(std::move(keys->release()[0])); | ||
| result.push_back(std::move(results[0].results[0])); // sum | ||
| result.push_back(std::move(results[1].results[0])); // count |
There was a problem hiding this comment.
in Q09 we push back to the vector given by keys->release()
auto result = keys->release();
for (auto&& r : results) {
std::ranges::move(r.results, std::back_inserter(result));
}I think this is a neater way
| auto chunk_stream = chunk.stream(); | ||
| auto table = chunk.table_view(); | ||
|
|
||
| if (!table.is_empty() && table.num_columns() >= 4) { |
There was a problem hiding this comment.
can there be a table.num_columns() != 4 scenario?
| auto sum_scalar = cudf::make_numeric_scalar( | ||
| cudf::data_type(cudf::type_id::FLOAT64), chunk_stream, ctx->br()->device_mr() | ||
| ); | ||
| static_cast<cudf::numeric_scalar<double>*>(sum_scalar.get()) | ||
| ->set_value(local_sum, chunk_stream); | ||
|
|
||
| std::vector<std::unique_ptr<cudf::column>> result_cols; | ||
| result_cols.push_back( | ||
| cudf::make_column_from_scalar( | ||
| *sum_scalar, 1, chunk_stream, ctx->br()->device_mr() | ||
| ) | ||
| ); |
There was a problem hiding this comment.
I think we could use a rmm::device_uvector here. This will avoid an extra allocation IINM.
| auto sum_scalar = cudf::make_numeric_scalar( | |
| cudf::data_type(cudf::type_id::FLOAT64), chunk_stream, ctx->br()->device_mr() | |
| ); | |
| static_cast<cudf::numeric_scalar<double>*>(sum_scalar.get()) | |
| ->set_value(local_sum, chunk_stream); | |
| std::vector<std::unique_ptr<cudf::column>> result_cols; | |
| result_cols.push_back( | |
| cudf::make_column_from_scalar( | |
| *sum_scalar, 1, chunk_stream, ctx->br()->device_mr() | |
| ) | |
| ); | |
| rmm::device_uvector<double> vec(1, chunk_stream, ctx->br()->device_mr()); | |
| vec.set_element_async(0, local_sum, chunk_stream); | |
| std::vector<std::unique_ptr<cudf::column>> result_cols { | |
| std::make_unique<cudf::column>(std::move(vec), {}, 0) | |
| }; |
| auto sum_val = | ||
| static_cast<cudf::numeric_scalar<double>*>( | ||
| cudf::reduce( | ||
| local_result->view().column(0), | ||
| *cudf::make_sum_aggregation<cudf::reduce_aggregation>(), | ||
| cudf::data_type(cudf::type_id::FLOAT64), | ||
| chunk_stream, | ||
| ctx->br()->device_mr() | ||
| ) | ||
| .get() | ||
| ) | ||
| ->value(chunk_stream); |
There was a problem hiding this comment.
this is the same as local_sum in a single rank case, isnt it? Seems a bit redundant
There was a problem hiding this comment.
Also, I think we can remove the if (local_result) {...} else {...} branch and simply use the local_sum value here.
| auto avg_yearly_val = sum_val / 7.0; | ||
| auto avg_yearly_scalar = cudf::make_numeric_scalar( | ||
| cudf::data_type(cudf::type_id::FLOAT64), | ||
| chunk_stream, | ||
| ctx->br()->device_mr() | ||
| ); | ||
| static_cast<cudf::numeric_scalar<double>*>(avg_yearly_scalar.get()) | ||
| ->set_value(avg_yearly_val, chunk_stream); | ||
|
|
||
| std::vector<std::unique_ptr<cudf::column>> result_cols; | ||
| result_cols.push_back( | ||
| cudf::make_column_from_scalar( | ||
| *avg_yearly_scalar, 1, chunk_stream, ctx->br()->device_mr() | ||
| ) | ||
| ); |
There was a problem hiding this comment.
maybe we need a util to create a column from a single value of type T
| auto avg_yearly_scalar = cudf::make_numeric_scalar( | ||
| cudf::data_type(cudf::type_id::FLOAT64), | ||
| chunk_stream, | ||
| ctx->br()->device_mr() | ||
| ); | ||
| static_cast<cudf::numeric_scalar<double>*>(avg_yearly_scalar.get()) | ||
| ->set_value(avg_yearly_val, chunk_stream); | ||
|
|
||
| std::vector<std::unique_ptr<cudf::column>> result_cols; | ||
| result_cols.push_back( | ||
| cudf::make_column_from_scalar( | ||
| *avg_yearly_scalar, 1, chunk_stream, ctx->br()->device_mr() | ||
| ) | ||
| ); |
There was a problem hiding this comment.
move to a separate util
| []( | ||
| std::shared_ptr<rapidsmpf::streaming::Context> ctx, | ||
| std::shared_ptr<rapidsmpf::streaming::Channel> ch_in, | ||
| std::shared_ptr<rapidsmpf::streaming::Channel> ch_out, | ||
| rapidsmpf::OpID tag | ||
| ) -> rapidsmpf::streaming::Node { |
There was a problem hiding this comment.
let's move this to a separate method
Signed-off-by: niranda perera <niranda.perera@gmail.com>
Avoid including the filter columns in the output table.
|
@nirandaperera I pushed a couple changes
I noticed that we're getting a different (presumably incorrect) result now. I'm not sure how long it's been like this. Running this script: Detailsshows the different result for |
|
I checked out an earlier commit (c21cb30) and confirmed that it gets the same (probably incorrect) 223947 as HEAD. So the recent changes didn't cause a regression there. |
|
Shoot! I was testing against SF3K and thought was getting the correct results. I'll double check |
|
It's also possible I've messed up the expected result. I haven't looked carefully. |
|
@TomAugspurger and I went through a variety of scale factors: 10, 100, 1K, 3K and validated that all worked multi-gpu. However, with a single GPU run the values were incorrect. We suspect something my be incorrect with reading/distributing parquet files or assumption with distribution are only valid for N+1 ranks |
|
@TomAugspurger I think I resolved the issue in f67178f -- carved out a special case for single rank computing |
Resolved conflict in cpp/benchmarks/streaming/ndsh/CMakeLists.txt by combining query lists from both branches (q01, q03, q09, q17).
No description provided.