From 25d5e2c16f007c370aaae6b64999f4b12f672f1c Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 9 Dec 2025 15:11:28 -0800 Subject: [PATCH 1/7] add read_parquet_uniform --- .../rapidsmpf/streaming/cudf/parquet.hpp | 29 +++ cpp/src/streaming/cudf/parquet.cpp | 185 ++++++++++++++++++ .../rapidsmpf/streaming/cudf/parquet.pyi | 9 + .../rapidsmpf/streaming/cudf/parquet.pyx | 66 +++++++ 4 files changed, 289 insertions(+) diff --git a/cpp/include/rapidsmpf/streaming/cudf/parquet.hpp b/cpp/include/rapidsmpf/streaming/cudf/parquet.hpp index 47dd7623d..2b68ec0f1 100644 --- a/cpp/include/rapidsmpf/streaming/cudf/parquet.hpp +++ b/cpp/include/rapidsmpf/streaming/cudf/parquet.hpp @@ -57,5 +57,34 @@ Node read_parquet( cudf::size_type num_rows_per_chunk, std::unique_ptr filter = nullptr ); + +/** + * @brief Asynchronously read parquet files with uniform chunk distribution. + * + * Unlike read_parquet which targets a specific number of rows per chunk, this function + * targets a specific total number of chunks and distributes them uniformly across ranks. + * + * When target_num_chunks <= num_files: Files are grouped and read completely (file-aligned). + * When target_num_chunks > num_files: Files are split into slices, aligned to row groups. + * + * @note This is a collective operation, all ranks must participate with identical parameters. + * + * @param ctx The execution context to use. + * @param ch_out Channel to which `TableChunk`s are sent. + * @param num_producers Number of concurrent producer tasks. + * @param options Template reader options (same as read_parquet). + * @param target_num_chunks Target total number of chunks to create across all ranks. + * @param filter Optional filter expression to apply to the read. + * + * @return Streaming node representing the asynchronous read. + */ +Node read_parquet_uniform( + std::shared_ptr ctx, + std::shared_ptr ch_out, + std::size_t num_producers, + cudf::io::parquet_reader_options options, + std::size_t target_num_chunks, + std::unique_ptr filter = nullptr +); } // namespace node } // namespace rapidsmpf::streaming diff --git a/cpp/src/streaming/cudf/parquet.cpp b/cpp/src/streaming/cudf/parquet.cpp index 8ac1eefd7..43f8e1017 100644 --- a/cpp/src/streaming/cudf/parquet.cpp +++ b/cpp/src/streaming/cudf/parquet.cpp @@ -245,4 +245,189 @@ Node read_parquet( ); } } + +Node read_parquet_uniform( + std::shared_ptr ctx, + std::shared_ptr ch_out, + std::size_t num_producers, + cudf::io::parquet_reader_options options, + std::size_t target_num_chunks, + std::unique_ptr filter +) { + ShutdownAtExit c{ch_out}; + co_await ctx->executor()->schedule(); + auto size = static_cast(ctx->comm()->nranks()); + auto rank = static_cast(ctx->comm()->rank()); + auto source = options.get_source(); + + RAPIDSMPF_EXPECTS( + source.type() == cudf::io::io_type::FILEPATH, "Only implemented for file sources" + ); + RAPIDSMPF_EXPECTS( + size == 1 || !options.get_num_rows().has_value(), + "Reading subset of rows not yet supported in multi-rank execution" + ); + RAPIDSMPF_EXPECTS( + size == 1 || options.get_skip_rows() == 0, + "Skipping rows not yet supported in multi-rank execution" + ); + + auto files = source.filepaths(); + RAPIDSMPF_EXPECTS(files.size() > 0, "Must have at least one file to read"); + RAPIDSMPF_EXPECTS( + !options.get_filter().has_value(), + "Do not set filter on options, use the filter argument" + ); + + if (filter != nullptr) { + options.set_filter(filter->filter); + cuda_stream_join( + std::ranges::transform_view( + std::ranges::iota_view( + std::size_t{0}, ctx->br()->stream_pool().get_pool_size() + ), + [&](auto i) { return ctx->br()->stream_pool().get_stream(i); } + ), + std::ranges::single_view(filter->stream) + ); + } + + std::uint64_t sequence_number = 0; + std::vector> chunks_per_producer(num_producers); + auto const num_files = files.size(); + + // Determine mode: split files or group files + bool split_mode = (target_num_chunks > num_files); + + if (split_mode) { + // SPLIT MODE: Each file divided into multiple slices + // Total chunks = splits_per_file * num_files + std::size_t splits_per_file = (target_num_chunks + num_files - 1) / num_files; + std::size_t total_chunks = splits_per_file * num_files; + + // Calculate which chunk IDs this rank owns + std::size_t chunks_per_rank = (total_chunks + size - 1) / size; + std::size_t chunk_start = rank * chunks_per_rank; + std::size_t chunk_end = std::min(chunk_start + chunks_per_rank, total_chunks); + + // Map chunk IDs to (file_idx, split_idx) + for (std::size_t chunk_id = chunk_start; chunk_id < chunk_end; ++chunk_id) { + std::size_t file_idx = chunk_id / splits_per_file; + std::size_t split_idx = chunk_id % splits_per_file; + + if (file_idx >= num_files) continue; // Past the end + + const auto& filepath = files[file_idx]; + + // Read metadata to determine row groups + auto metadata = cudf::io::read_parquet_metadata(cudf::io::source_info(filepath)); + auto total_rows = metadata.num_rows(); + auto num_row_groups = metadata.num_rowgroups(); + + // Determine slice boundaries + std::int64_t skip_rows, num_rows; + + if (splits_per_file <= static_cast(num_row_groups)) { + // Align to row groups - use row-based splitting for now + // TODO: Extract row group boundaries when API is available + std::int64_t rows_per_split = total_rows / static_cast(splits_per_file); + std::int64_t extra_rows = total_rows % static_cast(splits_per_file); + + skip_rows = static_cast(split_idx) * rows_per_split + + std::min(static_cast(split_idx), extra_rows); + num_rows = rows_per_split + (split_idx < static_cast(extra_rows) ? 1 : 0); + } else { + // More splits than row groups - split by rows + std::int64_t rows_per_split = total_rows / static_cast(splits_per_file); + std::int64_t extra_rows = total_rows % static_cast(splits_per_file); + + skip_rows = static_cast(split_idx) * rows_per_split + + std::min(static_cast(split_idx), extra_rows); + num_rows = rows_per_split + (split_idx < static_cast(extra_rows) ? 1 : 0); + } + + chunks_per_producer[sequence_number % num_producers].emplace_back( + ChunkDesc{sequence_number, skip_rows, num_rows, cudf::io::source_info(filepath)} + ); + sequence_number++; + } + } else { + // GROUP MODE: Multiple files per chunk (file-aligned) + std::size_t files_per_chunk = (num_files + target_num_chunks - 1) / target_num_chunks; + + // Calculate which chunk IDs this rank owns + std::size_t chunks_per_rank = (target_num_chunks + size - 1) / size; + std::size_t chunk_start = rank * chunks_per_rank; + std::size_t chunk_end = std::min(chunk_start + chunks_per_rank, target_num_chunks); + + // Map chunk IDs to file ranges + for (std::size_t chunk_id = chunk_start; chunk_id < chunk_end; ++chunk_id) { + std::size_t file_start = chunk_id * files_per_chunk; + std::size_t file_end = std::min(file_start + files_per_chunk, num_files); + + if (file_start >= num_files) continue; // Past the end + + // Collect files for this chunk + std::vector chunk_files; + for (std::size_t file_idx = file_start; file_idx < file_end; ++file_idx) { + chunk_files.push_back(files[file_idx]); + } + + // Calculate total rows across all files in this chunk + // We need to read metadata to know how many rows to read + auto source = cudf::io::source_info(chunk_files); + auto metadata = cudf::io::read_parquet_metadata(source); + auto total_rows = metadata.num_rows(); + + chunks_per_producer[sequence_number % num_producers].emplace_back( + ChunkDesc{sequence_number, 0, total_rows, source} + ); + sequence_number++; + } + } + + // Handle empty case + if (std::ranges::all_of(chunks_per_producer, [](auto&& v) { return v.empty(); })) { + // No chunks to read - drain and return + co_await ch_out->drain(ctx->executor()); + if (filter != nullptr) { + cuda_stream_join( + std::ranges::single_view(filter->stream), + std::ranges::transform_view( + std::ranges::iota_view( + std::size_t{0}, ctx->br()->stream_pool().get_pool_size() + ), + [&](auto i) { return ctx->br()->stream_pool().get_stream(i); } + ) + ); + } + co_return; + } + + // Launch producers + std::vector read_tasks; + read_tasks.reserve(1 + num_producers); + auto lineariser = Lineariser(ctx, ch_out, num_producers); + auto queues = lineariser.get_queues(); + for (std::size_t i = 0; i < num_producers; i++) { + read_tasks.push_back( + produce_chunks(ctx, queues[i], chunks_per_producer[i], options) + ); + } + read_tasks.push_back(lineariser.drain()); + coro_results(co_await coro::when_all(std::move(read_tasks))); + + co_await ch_out->drain(ctx->executor()); + if (filter != nullptr) { + cuda_stream_join( + std::ranges::single_view(filter->stream), + std::ranges::transform_view( + std::ranges::iota_view( + std::size_t{0}, ctx->br()->stream_pool().get_pool_size() + ), + [&](auto i) { return ctx->br()->stream_pool().get_stream(i); } + ) + ); + } +} } // namespace rapidsmpf::streaming::node diff --git a/python/rapidsmpf/rapidsmpf/streaming/cudf/parquet.pyi b/python/rapidsmpf/rapidsmpf/streaming/cudf/parquet.pyi index 3e0fea3f0..efab84bc0 100644 --- a/python/rapidsmpf/rapidsmpf/streaming/cudf/parquet.pyi +++ b/python/rapidsmpf/rapidsmpf/streaming/cudf/parquet.pyi @@ -21,3 +21,12 @@ def read_parquet( num_rows_per_chunk: int, filter: Filter | None = None, ) -> CppNode: ... + +def read_parquet_uniform( + ctx: Context, + ch_out: Channel[TableChunk], + num_producers: int, + options: ParquetReaderOptions, + target_num_chunks: int, + filter: Filter | None = None, +) -> CppNode: ... diff --git a/python/rapidsmpf/rapidsmpf/streaming/cudf/parquet.pyx b/python/rapidsmpf/rapidsmpf/streaming/cudf/parquet.pyx index ec11b0b58..32c80fc9d 100644 --- a/python/rapidsmpf/rapidsmpf/streaming/cudf/parquet.pyx +++ b/python/rapidsmpf/rapidsmpf/streaming/cudf/parquet.pyx @@ -36,6 +36,16 @@ cdef extern from "" nogil: unique_ptr[cpp_Filter], ) except + + cdef cpp_Node cpp_read_parquet_uniform \ + "rapidsmpf::streaming::node::read_parquet_uniform"( + shared_ptr[cpp_Context] ctx, + shared_ptr[cpp_Channel] ch_out, + size_t num_producers, + parquet_reader_options options, + size_t target_num_chunks, + unique_ptr[cpp_Filter], + ) except + + cdef class Filter: """ @@ -136,3 +146,59 @@ def read_parquet( return CppNode.from_handle( make_unique[cpp_Node](move(_ret)), owner=None ) + + +def read_parquet_uniform( + Context ctx not None, + Channel ch_out not None, + size_t num_producers, + ParquetReaderOptions options not None, + size_t target_num_chunks, + Filter filter = None, +): + """ + Create a streaming node to read from parquet with uniform chunk distribution. + + Unlike read_parquet which targets a specific number of rows per chunk, + this function targets a total number of chunks distributed uniformly. + + When target_num_chunks <= num_files: Files are grouped and read completely. + When target_num_chunks > num_files: Files are split, aligned to row groups. + + Parameters + ---------- + ctx + Streaming execution context + ch_out + Output channel to receive the TableChunks. + num_producers + Number of concurrent producers of output chunks. + options + Reader options + target_num_chunks + Target total number of chunks to create across all ranks. + filter + Optional filter object. If provided, is consumed by this function + and not subsequently usable. + + Notes + ----- + This is a collective operation, all ranks participating via the + execution context's communicator must call it with the same options. + """ + cdef cpp_Node _ret + cdef unique_ptr[cpp_Filter] c_filter + if filter is not None: + c_filter = move(filter.release_handle()) + with nogil: + _ret = cpp_read_parquet_uniform( + ctx._handle, + ch_out._handle, + num_producers, + options.c_obj, + target_num_chunks, + move(c_filter) + ) + return CppNode.from_handle( + make_unique[cpp_Node](move(_ret)), owner=None + ) From 43b3771ca7c17c82feb9c36ec97fb12bd090a305 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 9 Dec 2025 15:12:44 -0800 Subject: [PATCH 2/7] precommit --- .../rapidsmpf/rapidsmpf/streaming/cudf/parquet.pyx | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python/rapidsmpf/rapidsmpf/streaming/cudf/parquet.pyx b/python/rapidsmpf/rapidsmpf/streaming/cudf/parquet.pyx index 32c80fc9d..e5597832c 100644 --- a/python/rapidsmpf/rapidsmpf/streaming/cudf/parquet.pyx +++ b/python/rapidsmpf/rapidsmpf/streaming/cudf/parquet.pyx @@ -159,12 +159,6 @@ def read_parquet_uniform( """ Create a streaming node to read from parquet with uniform chunk distribution. - Unlike read_parquet which targets a specific number of rows per chunk, - this function targets a total number of chunks distributed uniformly. - - When target_num_chunks <= num_files: Files are grouped and read completely. - When target_num_chunks > num_files: Files are split, aligned to row groups. - Parameters ---------- ctx @@ -183,6 +177,12 @@ def read_parquet_uniform( Notes ----- + Unlike read_parquet which targets a specific number of rows per chunk, + this function targets a total number of chunks distributed uniformly. + + When target_num_chunks <= num_files: Files are grouped and read completely. + When target_num_chunks > num_files: Files are split, aligned to row groups. + This is a collective operation, all ranks participating via the execution context's communicator must call it with the same options. """ From bb7c39f7b52f41251ebf8b759fb7fb1608dc50ed Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 9 Dec 2025 18:13:11 -0800 Subject: [PATCH 3/7] avoid reading parquet metadata a bit more --- cpp/src/streaming/cudf/parquet.cpp | 127 +++++++++++++++++------------ 1 file changed, 76 insertions(+), 51 deletions(-) diff --git a/cpp/src/streaming/cudf/parquet.cpp b/cpp/src/streaming/cudf/parquet.cpp index 43f8e1017..7e707d03e 100644 --- a/cpp/src/streaming/cudf/parquet.cpp +++ b/cpp/src/streaming/cudf/parquet.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -78,7 +79,9 @@ Node produce_chunks( for (auto& chunk : chunks) { cudf::io::parquet_reader_options chunk_options{options}; chunk_options.set_skip_rows(chunk.skip_rows); - chunk_options.set_num_rows(chunk.num_rows); + if (chunk.num_rows >= 0) { + chunk_options.set_num_rows(chunk.num_rows); + } chunk_options.set_source(chunk.source); auto stream = ctx->br()->stream_pool().get_stream(); auto ticket = co_await ch_out->acquire(); @@ -259,7 +262,7 @@ Node read_parquet_uniform( auto size = static_cast(ctx->comm()->nranks()); auto rank = static_cast(ctx->comm()->rank()); auto source = options.get_source(); - + RAPIDSMPF_EXPECTS( source.type() == cudf::io::io_type::FILEPATH, "Only implemented for file sources" ); @@ -271,14 +274,14 @@ Node read_parquet_uniform( size == 1 || options.get_skip_rows() == 0, "Skipping rows not yet supported in multi-rank execution" ); - + auto files = source.filepaths(); RAPIDSMPF_EXPECTS(files.size() > 0, "Must have at least one file to read"); RAPIDSMPF_EXPECTS( !options.get_filter().has_value(), "Do not set filter on options, use the filter argument" ); - + if (filter != nullptr) { options.set_filter(filter->filter); cuda_stream_join( @@ -291,101 +294,123 @@ Node read_parquet_uniform( std::ranges::single_view(filter->stream) ); } - + std::uint64_t sequence_number = 0; std::vector> chunks_per_producer(num_producers); auto const num_files = files.size(); - + // Determine mode: split files or group files bool split_mode = (target_num_chunks > num_files); - + if (split_mode) { // SPLIT MODE: Each file divided into multiple slices // Total chunks = splits_per_file * num_files std::size_t splits_per_file = (target_num_chunks + num_files - 1) / num_files; std::size_t total_chunks = splits_per_file * num_files; - + // Calculate which chunk IDs this rank owns std::size_t chunks_per_rank = (total_chunks + size - 1) / size; std::size_t chunk_start = rank * chunks_per_rank; std::size_t chunk_end = std::min(chunk_start + chunks_per_rank, total_chunks); - + + // Read metadata once per unique file this rank needs + std::unordered_map> file_metadata_cache; + for (std::size_t chunk_id = chunk_start; chunk_id < chunk_end; ++chunk_id) { + std::size_t file_idx = chunk_id / splits_per_file; + if (file_idx >= num_files) + continue; + + if (file_metadata_cache.find(file_idx) == file_metadata_cache.end()) { + auto metadata = cudf::io::read_parquet_metadata( + cudf::io::source_info(files[file_idx]) + ); + file_metadata_cache[file_idx] = { + metadata.num_rows(), metadata.num_rowgroups() + }; + } + } + // Map chunk IDs to (file_idx, split_idx) for (std::size_t chunk_id = chunk_start; chunk_id < chunk_end; ++chunk_id) { std::size_t file_idx = chunk_id / splits_per_file; std::size_t split_idx = chunk_id % splits_per_file; - - if (file_idx >= num_files) continue; // Past the end - + + if (file_idx >= num_files) + continue; // Past the end + const auto& filepath = files[file_idx]; - - // Read metadata to determine row groups - auto metadata = cudf::io::read_parquet_metadata(cudf::io::source_info(filepath)); - auto total_rows = metadata.num_rows(); - auto num_row_groups = metadata.num_rowgroups(); - + auto [total_rows, num_row_groups] = file_metadata_cache[file_idx]; + // Determine slice boundaries std::int64_t skip_rows, num_rows; - + if (splits_per_file <= static_cast(num_row_groups)) { // Align to row groups - use row-based splitting for now // TODO: Extract row group boundaries when API is available - std::int64_t rows_per_split = total_rows / static_cast(splits_per_file); - std::int64_t extra_rows = total_rows % static_cast(splits_per_file); - - skip_rows = static_cast(split_idx) * rows_per_split + - std::min(static_cast(split_idx), extra_rows); - num_rows = rows_per_split + (split_idx < static_cast(extra_rows) ? 1 : 0); + std::int64_t rows_per_split = + total_rows / static_cast(splits_per_file); + std::int64_t extra_rows = + total_rows % static_cast(splits_per_file); + + skip_rows = static_cast(split_idx) * rows_per_split + + std::min(static_cast(split_idx), extra_rows); + num_rows = rows_per_split + + (split_idx < static_cast(extra_rows) ? 1 : 0); } else { // More splits than row groups - split by rows - std::int64_t rows_per_split = total_rows / static_cast(splits_per_file); - std::int64_t extra_rows = total_rows % static_cast(splits_per_file); - - skip_rows = static_cast(split_idx) * rows_per_split + - std::min(static_cast(split_idx), extra_rows); - num_rows = rows_per_split + (split_idx < static_cast(extra_rows) ? 1 : 0); + std::int64_t rows_per_split = + total_rows / static_cast(splits_per_file); + std::int64_t extra_rows = + total_rows % static_cast(splits_per_file); + + skip_rows = static_cast(split_idx) * rows_per_split + + std::min(static_cast(split_idx), extra_rows); + num_rows = rows_per_split + + (split_idx < static_cast(extra_rows) ? 1 : 0); } - + chunks_per_producer[sequence_number % num_producers].emplace_back( - ChunkDesc{sequence_number, skip_rows, num_rows, cudf::io::source_info(filepath)} + ChunkDesc{ + sequence_number, skip_rows, num_rows, cudf::io::source_info(filepath) + } ); sequence_number++; } } else { // GROUP MODE: Multiple files per chunk (file-aligned) - std::size_t files_per_chunk = (num_files + target_num_chunks - 1) / target_num_chunks; - + // Read entire files without needing metadata + std::size_t files_per_chunk = + (num_files + target_num_chunks - 1) / target_num_chunks; + // Calculate which chunk IDs this rank owns std::size_t chunks_per_rank = (target_num_chunks + size - 1) / size; std::size_t chunk_start = rank * chunks_per_rank; - std::size_t chunk_end = std::min(chunk_start + chunks_per_rank, target_num_chunks); - + std::size_t chunk_end = + std::min(chunk_start + chunks_per_rank, target_num_chunks); + // Map chunk IDs to file ranges for (std::size_t chunk_id = chunk_start; chunk_id < chunk_end; ++chunk_id) { std::size_t file_start = chunk_id * files_per_chunk; std::size_t file_end = std::min(file_start + files_per_chunk, num_files); - - if (file_start >= num_files) continue; // Past the end - + + if (file_start >= num_files) + continue; // Past the end + // Collect files for this chunk std::vector chunk_files; for (std::size_t file_idx = file_start; file_idx < file_end; ++file_idx) { chunk_files.push_back(files[file_idx]); } - - // Calculate total rows across all files in this chunk - // We need to read metadata to know how many rows to read - auto source = cudf::io::source_info(chunk_files); - auto metadata = cudf::io::read_parquet_metadata(source); - auto total_rows = metadata.num_rows(); - + + // Read entire files - no need for metadata + // Use -1 for num_rows to read all rows chunks_per_producer[sequence_number % num_producers].emplace_back( - ChunkDesc{sequence_number, 0, total_rows, source} + ChunkDesc{sequence_number, 0, -1, cudf::io::source_info(chunk_files)} ); sequence_number++; } } - + // Handle empty case if (std::ranges::all_of(chunks_per_producer, [](auto&& v) { return v.empty(); })) { // No chunks to read - drain and return @@ -403,7 +428,7 @@ Node read_parquet_uniform( } co_return; } - + // Launch producers std::vector read_tasks; read_tasks.reserve(1 + num_producers); @@ -416,7 +441,7 @@ Node read_parquet_uniform( } read_tasks.push_back(lineariser.drain()); coro_results(co_await coro::when_all(std::move(read_tasks))); - + co_await ch_out->drain(ctx->executor()); if (filter != nullptr) { cuda_stream_join( From 3b4a52809e00d5df1b095ebaba2a86d1c18aa866 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 9 Dec 2025 19:04:07 -0800 Subject: [PATCH 4/7] try aligning row-groups --- cpp/src/streaming/cudf/parquet.cpp | 48 +++++++++++++++++------------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/cpp/src/streaming/cudf/parquet.cpp b/cpp/src/streaming/cudf/parquet.cpp index 7e707d03e..7e1303a46 100644 --- a/cpp/src/streaming/cudf/parquet.cpp +++ b/cpp/src/streaming/cudf/parquet.cpp @@ -313,24 +313,30 @@ Node read_parquet_uniform( std::size_t chunk_start = rank * chunks_per_rank; std::size_t chunk_end = std::min(chunk_start + chunks_per_rank, total_chunks); - // Read metadata once per unique file this rank needs - std::unordered_map> file_metadata_cache; + // Read metadata once per unique file this rank needs and compute row group + // offsets + std::unordered_map> file_rg_offsets_cache; for (std::size_t chunk_id = chunk_start; chunk_id < chunk_end; ++chunk_id) { std::size_t file_idx = chunk_id / splits_per_file; if (file_idx >= num_files) continue; - if (file_metadata_cache.find(file_idx) == file_metadata_cache.end()) { + if (file_rg_offsets_cache.find(file_idx) == file_rg_offsets_cache.end()) { auto metadata = cudf::io::read_parquet_metadata( cudf::io::source_info(files[file_idx]) ); - file_metadata_cache[file_idx] = { - metadata.num_rows(), metadata.num_rowgroups() - }; + auto rg_metadata = metadata.rowgroup_metadata(); + std::vector rg_offsets; + rg_offsets.reserve(rg_metadata.size() + 1); + rg_offsets.push_back(0); + for (auto const& rg : rg_metadata) { + rg_offsets.push_back(rg_offsets.back() + rg.at("num_rows")); + } + file_rg_offsets_cache[file_idx] = std::move(rg_offsets); } } - // Map chunk IDs to (file_idx, split_idx) + // Map chunk IDs to (file_idx, split_idx) with row-group-aligned splits for (std::size_t chunk_id = chunk_start; chunk_id < chunk_end; ++chunk_id) { std::size_t file_idx = chunk_id / splits_per_file; std::size_t split_idx = chunk_id % splits_per_file; @@ -339,23 +345,23 @@ Node read_parquet_uniform( continue; // Past the end const auto& filepath = files[file_idx]; - auto [total_rows, num_row_groups] = file_metadata_cache[file_idx]; + auto const& rg_offsets = file_rg_offsets_cache[file_idx]; + auto num_row_groups = rg_offsets.size() - 1; + auto total_rows = rg_offsets.back(); - // Determine slice boundaries + // Determine slice boundaries aligned to row group boundaries std::int64_t skip_rows, num_rows; - if (splits_per_file <= static_cast(num_row_groups)) { - // Align to row groups - use row-based splitting for now - // TODO: Extract row group boundaries when API is available - std::int64_t rows_per_split = - total_rows / static_cast(splits_per_file); - std::int64_t extra_rows = - total_rows % static_cast(splits_per_file); - - skip_rows = static_cast(split_idx) * rows_per_split - + std::min(static_cast(split_idx), extra_rows); - num_rows = rows_per_split - + (split_idx < static_cast(extra_rows) ? 1 : 0); + if (splits_per_file <= num_row_groups) { + // Align to row groups + std::size_t rg_per_split = num_row_groups / splits_per_file; + std::size_t extra_rg = num_row_groups % splits_per_file; + std::size_t rg_start = + split_idx * rg_per_split + std::min(split_idx, extra_rg); + std::size_t rg_end = + rg_start + rg_per_split + (split_idx < extra_rg ? 1 : 0); + skip_rows = rg_offsets[rg_start]; + num_rows = rg_offsets[rg_end] - skip_rows; } else { // More splits than row groups - split by rows std::int64_t rows_per_split = From 73a85d70b22523b8c5b0801892a970990ab94247 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 10 Dec 2025 07:47:33 -0800 Subject: [PATCH 5/7] add estimate_target_num_chunks and test coverage --- .../rapidsmpf/streaming/cudf/parquet.hpp | 33 ++++- cpp/src/streaming/cudf/parquet.cpp | 33 +++++ cpp/tests/streaming/test_read_parquet.cpp | 121 ++++++++++++++++++ 3 files changed, 183 insertions(+), 4 deletions(-) diff --git a/cpp/include/rapidsmpf/streaming/cudf/parquet.hpp b/cpp/include/rapidsmpf/streaming/cudf/parquet.hpp index 2b68ec0f1..57380035c 100644 --- a/cpp/include/rapidsmpf/streaming/cudf/parquet.hpp +++ b/cpp/include/rapidsmpf/streaming/cudf/parquet.hpp @@ -6,6 +6,8 @@ #pragma once #include #include +#include +#include #include #include @@ -63,11 +65,13 @@ Node read_parquet( * * Unlike read_parquet which targets a specific number of rows per chunk, this function * targets a specific total number of chunks and distributes them uniformly across ranks. - * - * When target_num_chunks <= num_files: Files are grouped and read completely (file-aligned). - * When target_num_chunks > num_files: Files are split into slices, aligned to row groups. * - * @note This is a collective operation, all ranks must participate with identical parameters. + * When target_num_chunks <= num_files: Files are grouped and read completely + * (file-aligned). When target_num_chunks > num_files: Files are split into slices, + * aligned to row groups. + * + * @note This is a collective operation, all ranks must participate with identical + * parameters. * * @param ctx The execution context to use. * @param ch_out Channel to which `TableChunk`s are sent. @@ -86,5 +90,26 @@ Node read_parquet_uniform( std::size_t target_num_chunks, std::unique_ptr filter = nullptr ); + +/** + * @brief Estimate target chunk count from parquet file metadata. + * + * Samples metadata from up to `max_samples` files to estimate total rows, + * then calculates how many chunks are needed to achieve the target rows per chunk. + * + * This is useful for computing the `target_num_chunks` parameter for + * `read_parquet_uniform` when you have a target `num_rows_per_chunk` instead. + * + * @param files List of parquet file paths. + * @param num_rows_per_chunk Target number of rows per output chunk. + * @param max_samples Maximum number of files to sample for row estimation. + * + * @return Estimated target number of chunks. + */ +std::size_t estimate_target_num_chunks( + std::vector const& files, + cudf::size_type num_rows_per_chunk, + std::size_t max_samples = 3 +); } // namespace node } // namespace rapidsmpf::streaming diff --git a/cpp/src/streaming/cudf/parquet.cpp b/cpp/src/streaming/cudf/parquet.cpp index 7e1303a46..d6d58d697 100644 --- a/cpp/src/streaming/cudf/parquet.cpp +++ b/cpp/src/streaming/cudf/parquet.cpp @@ -461,4 +461,37 @@ Node read_parquet_uniform( ); } } + +std::size_t estimate_target_num_chunks( + std::vector const& files, + cudf::size_type num_rows_per_chunk, + std::size_t max_samples +) { + RAPIDSMPF_EXPECTS(files.size() > 0, "Must have at least one file"); + RAPIDSMPF_EXPECTS(num_rows_per_chunk > 0, "num_rows_per_chunk must be positive"); + + // Sample files with a stride to spread samples evenly across the file list + std::size_t stride = std::max(std::size_t{1}, files.size() / max_samples); + std::vector sample_files; + for (std::size_t i = 0; i < files.size() && sample_files.size() < max_samples; + i += stride) + { + sample_files.push_back(files[i]); + } + + // Read metadata from sampled files to get row count + auto metadata = cudf::io::read_parquet_metadata(cudf::io::source_info(sample_files)); + std::int64_t sampled_rows = metadata.num_rows(); + + // Extrapolate to estimate total rows across all files + std::int64_t estimated_total_rows = + (sampled_rows * static_cast(files.size())) + / static_cast(sample_files.size()); + + // Calculate target chunks (at least 1) + return std::max( + std::size_t{1}, + static_cast(estimated_total_rows / num_rows_per_chunk) + ); +} } // namespace rapidsmpf::streaming::node diff --git a/cpp/tests/streaming/test_read_parquet.cpp b/cpp/tests/streaming/test_read_parquet.cpp index c6d2014f7..60e425bf5 100644 --- a/cpp/tests/streaming/test_read_parquet.cpp +++ b/cpp/tests/streaming/test_read_parquet.cpp @@ -279,3 +279,124 @@ TEST_P(StreamingReadParquetParams, ReadParquet) { EXPECT_EQ(result->num_columns(), 1); CUDF_TEST_EXPECT_TABLES_EQUIVALENT(result->view(), expected->view()); } + +// Test estimate_target_num_chunks utility function +TEST_F(StreamingReadParquet, EstimateTargetNumChunks) { + // 10 files with 10 rows each = 100 total rows + // Verify: larger chunk size -> fewer chunks, always >= 1 + EXPECT_EQ(node::estimate_target_num_chunks(source_files, 1), 100); + EXPECT_EQ(node::estimate_target_num_chunks(source_files, 10), 10); + EXPECT_EQ(node::estimate_target_num_chunks(source_files, 25), 4); + EXPECT_EQ(node::estimate_target_num_chunks(source_files, 1000000), 1); +} + +// Test read_parquet_uniform using same parameterized cases as read_parquet +// (only for cases where skip_rows and num_rows are not set) +TEST_P(StreamingReadParquetParams, ReadParquetUniform) { + auto [skip_rows, num_rows, use_filter, truncate_file_list] = GetParam(); + + // read_parquet_uniform doesn't support skip_rows/num_rows + if (skip_rows.has_value() || num_rows.has_value()) { + GTEST_SKIP() << "read_parquet_uniform doesn't support skip_rows/num_rows"; + } + + auto source = get_source_info(truncate_file_list); + auto options = cudf::io::parquet_reader_options::builder(source).build(); + + auto filter_expr = [&]() -> std::unique_ptr { + if (!use_filter) { + return nullptr; + } + auto stream = ctx->br()->stream_pool().get_stream(); + auto owner = new std::vector; + owner->push_back( + std::make_shared>(15, true, stream) + ); + owner->push_back( + std::make_shared( + *std::any_cast>>( + owner->at(0) + ) + ) + ); + owner->push_back(std::make_shared(0)); + owner->push_back( + std::make_shared( + cudf::ast::ast_operator::LESS, + *std::any_cast>( + owner->at(2) + ), + *std::any_cast>(owner->at(1)) + ) + ); + return std::make_unique( + stream, + *std::any_cast>(owner->back()), + OwningWrapper(static_cast(owner), [](void* p) { + delete static_cast*>(p); + }) + ); + }(); + + auto expected = [&]() { + if (filter_expr != nullptr) { + auto expected_options = options; + expected_options.set_filter(filter_expr->filter); + filter_expr->stream.synchronize(); + auto expected = cudf::io::read_parquet(expected_options).tbl; + filter_expr->stream.synchronize(); + return expected; + } else { + return cudf::io::read_parquet(options).tbl; + } + }(); + + // Estimate target chunks and run read_parquet_uniform + auto target_chunks = node::estimate_target_num_chunks(source_files, 3); + auto ch = ctx->create_channel(); + std::vector nodes; + nodes.push_back( + node::read_parquet_uniform( + ctx, ch, 4, options, target_chunks, std::move(filter_expr) + ) + ); + + std::vector messages; + nodes.push_back(node::pull_from_channel(ctx, ch, messages)); + run_streaming_pipeline(std::move(nodes)); + + allgather::AllGather allgather( + GlobalEnvironment->comm_, + GlobalEnvironment->progress_thread_, + /* op_id = */ 0, + br.get() + ); + + for (auto& msg : messages) { + auto chunk = msg.release(); + auto seq = msg.sequence_number(); + auto [reservation, _] = + br->reserve(MemoryType::DEVICE, chunk.make_available_cost(), true); + chunk = chunk.make_available(reservation); + auto packed_columns = + cudf::pack(chunk.table_view(), chunk.stream(), br->device_mr()); + auto packed_data = PackedData{ + std::move(packed_columns.metadata), + br->move(std::move(packed_columns.gpu_data), chunk.stream()) + }; + + allgather.insert(seq, std::move(packed_data)); + } + + allgather.insert_finished(); + + auto gathered_packed_data = + allgather.wait_and_extract(allgather::AllGather::Ordered::YES); + auto result = unpack_and_concat( + std::move(gathered_packed_data), rmm::cuda_stream_default, br.get() + ); + EXPECT_EQ(result->num_rows(), expected->num_rows()); + EXPECT_EQ(result->num_columns(), expected->num_columns()); + EXPECT_EQ(result->num_columns(), 1); + CUDF_TEST_EXPECT_TABLES_EQUIVALENT(result->view(), expected->view()); +} From e4d557120f54b0faf63f63cf793cc5fd4f6fb703 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 10 Dec 2025 07:59:43 -0800 Subject: [PATCH 6/7] add test coverage --- .../rapidsmpf/streaming/cudf/parquet.pyi | 6 +- .../rapidsmpf/streaming/cudf/parquet.pyx | 49 ++++++++++ .../tests/streaming/test_read_parquet.py | 96 ++++++++++++++++++- 3 files changed, 145 insertions(+), 6 deletions(-) diff --git a/python/rapidsmpf/rapidsmpf/streaming/cudf/parquet.pyi b/python/rapidsmpf/rapidsmpf/streaming/cudf/parquet.pyi index efab84bc0..ce496fe1f 100644 --- a/python/rapidsmpf/rapidsmpf/streaming/cudf/parquet.pyi +++ b/python/rapidsmpf/rapidsmpf/streaming/cudf/parquet.pyi @@ -21,7 +21,6 @@ def read_parquet( num_rows_per_chunk: int, filter: Filter | None = None, ) -> CppNode: ... - def read_parquet_uniform( ctx: Context, ch_out: Channel[TableChunk], @@ -30,3 +29,8 @@ def read_parquet_uniform( target_num_chunks: int, filter: Filter | None = None, ) -> CppNode: ... +def estimate_target_num_chunks( + files: list[str], + num_rows_per_chunk: int, + max_samples: int = 3, +) -> int: ... diff --git a/python/rapidsmpf/rapidsmpf/streaming/cudf/parquet.pyx b/python/rapidsmpf/rapidsmpf/streaming/cudf/parquet.pyx index e5597832c..4808791b9 100644 --- a/python/rapidsmpf/rapidsmpf/streaming/cudf/parquet.pyx +++ b/python/rapidsmpf/rapidsmpf/streaming/cudf/parquet.pyx @@ -6,7 +6,9 @@ from cpython.ref cimport Py_INCREF from cython.operator cimport dereference as deref from libc.stddef cimport size_t from libcpp.memory cimport make_unique, shared_ptr, unique_ptr +from libcpp.string cimport string from libcpp.utility cimport move +from libcpp.vector cimport vector from pylibcudf.expressions cimport Expression from pylibcudf.io.parquet cimport ParquetReaderOptions from pylibcudf.libcudf.expressions cimport expression @@ -46,6 +48,13 @@ cdef extern from "" nogil: unique_ptr[cpp_Filter], ) except + + cdef size_t cpp_estimate_target_num_chunks \ + "rapidsmpf::streaming::node::estimate_target_num_chunks"( + const vector[string]& files, + size_type num_rows_per_chunk, + size_t max_samples, + ) except + + cdef class Filter: """ @@ -202,3 +211,43 @@ def read_parquet_uniform( return CppNode.from_handle( make_unique[cpp_Node](move(_ret)), owner=None ) + + +def estimate_target_num_chunks( + files: list[str], + size_type num_rows_per_chunk, + size_t max_samples = 3, +) -> int: + """ + Estimate target chunk count from parquet file metadata. + + Parameters + ---------- + files + List of parquet file paths. + num_rows_per_chunk + Target number of rows per output chunk. + max_samples + Maximum number of files to sample for row estimation. + + Returns + ------- + Estimated target number of chunks. + + Notes + ----- + Samples metadata from up to `max_samples` files to estimate total rows, + then calculates how many chunks are needed to achieve the target rows per chunk. + + This is useful for computing the `target_num_chunks` parameter for + `read_parquet_uniform` when you have a target `num_rows_per_chunk` instead. + """ + cdef vector[string] c_files + for f in files: + c_files.push_back(f.encode()) + cdef size_t result + with nogil: + result = cpp_estimate_target_num_chunks( + c_files, num_rows_per_chunk, max_samples + ) + return result diff --git a/python/rapidsmpf/rapidsmpf/tests/streaming/test_read_parquet.py b/python/rapidsmpf/rapidsmpf/tests/streaming/test_read_parquet.py index 1d5b96e3c..3a3694992 100644 --- a/python/rapidsmpf/rapidsmpf/tests/streaming/test_read_parquet.py +++ b/python/rapidsmpf/rapidsmpf/tests/streaming/test_read_parquet.py @@ -13,7 +13,12 @@ from rapidsmpf.streaming.core.leaf_node import pull_from_channel from rapidsmpf.streaming.core.node import run_streaming_pipeline -from rapidsmpf.streaming.cudf.parquet import Filter, read_parquet +from rapidsmpf.streaming.cudf.parquet import ( + Filter, + estimate_target_num_chunks, + read_parquet, + read_parquet_uniform, +) from rapidsmpf.streaming.cudf.table_chunk import TableChunk if TYPE_CHECKING: @@ -27,9 +32,10 @@ @pytest.fixture(scope="module") -def source( +def source_files( tmp_path_factory: pytest.TempPathFactory, -) -> plc.io.SourceInfo: +) -> list[str]: + """Create parquet files and return list of file paths.""" path = tmp_path_factory.mktemp("read_parquet") nrows = 10 @@ -45,8 +51,14 @@ def source( sink = plc.io.SinkInfo([filename]) options = plc.io.parquet.ParquetWriterOptions.builder(sink, table).build() plc.io.parquet.write_parquet(options) - sources.append(filename) - return plc.io.SourceInfo(sources) + sources.append(str(filename)) + return sources + + +@pytest.fixture(scope="module") +def source(source_files: list[str]) -> plc.io.SourceInfo: + """Return SourceInfo from file paths.""" + return plc.io.SourceInfo(source_files) def make_filter(stream: Stream) -> plc.expressions.Expression: @@ -159,3 +171,77 @@ def test_read_parquet( plc.DataType(plc.TypeId.BOOL8), ) assert all_equal.to_py() + + +def test_estimate_target_num_chunks(source_files: list[str]) -> None: + """Test estimate_target_num_chunks utility function.""" + # 10 files with 10 rows each = 100 total rows + # Verify: larger chunk size -> fewer chunks, always >= 1 + assert estimate_target_num_chunks(source_files, 1) == 100 + assert estimate_target_num_chunks(source_files, 10) == 10 + assert estimate_target_num_chunks(source_files, 25) == 4 + assert estimate_target_num_chunks(source_files, 1000000) == 1 + + +@pytest.mark.parametrize("use_filter", [False, True]) +def test_read_parquet_uniform( + context: Context, + source: plc.io.SourceInfo, + source_files: list[str], + use_filter: bool, # noqa: FBT001 +) -> None: + """Test read_parquet_uniform with estimated target chunks.""" + ch: Channel[TableChunk] = context.create_channel() + + options = plc.io.parquet.ParquetReaderOptions.builder(source).build() + + # Estimate target chunks from num_rows_per_chunk + target_chunks = estimate_target_num_chunks(source_files, 3) + + if use_filter: + fstream = context.get_stream_from_pool() + producer = read_parquet_uniform( + context, + ch, + 4, + options, + target_chunks, + Filter(fstream, make_filter(fstream)), + ) + else: + producer = read_parquet_uniform(context, ch, 4, options, target_chunks) + + consumer, deferred_messages = pull_from_channel(context, ch) + + run_streaming_pipeline(nodes=[producer, consumer]) + + messages = deferred_messages.release() + assert all( + m1.sequence_number < m2.sequence_number + for m1, m2 in itertools.pairwise(messages) + ) + chunks = [TableChunk.from_message(m) for m in messages] + for chunk in chunks: + chunk.stream.synchronize() + + got = plc.concatenate.concatenate([chunk.table_view() for chunk in chunks]) + for chunk in chunks: + chunk.stream.synchronize() + + expected = get_expected(context, source, "none", "all", use_filter=use_filter) + + assert got.num_rows() == expected.num_rows() + assert got.num_columns() == expected.num_columns() + assert got.num_columns() == 1 + + all_equal = plc.reduce.reduce( + plc.binaryop.binary_operation( + got.columns()[0], + expected.columns()[0], + plc.binaryop.BinaryOperator.EQUAL, + plc.DataType(plc.TypeId.BOOL8), + ), + plc.aggregation.all(), + plc.DataType(plc.TypeId.BOOL8), + ) + assert all_equal.to_py() From 5ce111a80db51726b34c1619f7bfc9853be22d1b Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 12 Dec 2025 12:58:32 -0800 Subject: [PATCH 7/7] fix build errors --- cpp/src/streaming/cudf/parquet.cpp | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/cpp/src/streaming/cudf/parquet.cpp b/cpp/src/streaming/cudf/parquet.cpp index d6d58d697..ce9687c67 100644 --- a/cpp/src/streaming/cudf/parquet.cpp +++ b/cpp/src/streaming/cudf/parquet.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include @@ -371,13 +372,16 @@ Node read_parquet_uniform( skip_rows = static_cast(split_idx) * rows_per_split + std::min(static_cast(split_idx), extra_rows); - num_rows = rows_per_split - + (split_idx < static_cast(extra_rows) ? 1 : 0); + num_rows = + rows_per_split + (std::cmp_less(split_idx, extra_rows) ? 1 : 0); } chunks_per_producer[sequence_number % num_producers].emplace_back( ChunkDesc{ - sequence_number, skip_rows, num_rows, cudf::io::source_info(filepath) + .sequence_number = sequence_number, + .skip_rows = skip_rows, + .num_rows = num_rows, + .source = cudf::io::source_info(filepath) } ); sequence_number++; @@ -411,7 +415,12 @@ Node read_parquet_uniform( // Read entire files - no need for metadata // Use -1 for num_rows to read all rows chunks_per_producer[sequence_number % num_producers].emplace_back( - ChunkDesc{sequence_number, 0, -1, cudf::io::source_info(chunk_files)} + ChunkDesc{ + .sequence_number = sequence_number, + .skip_rows = 0, + .num_rows = -1, + .source = cudf::io::source_info(chunk_files) + } ); sequence_number++; }