From 78bdf4b3faaa978a66d86874b9488189cc37de33 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 11 Dec 2025 09:11:05 -0800 Subject: [PATCH 1/5] distributed parquet reading for a small number of large files --- cpp/src/streaming/cudf/parquet.cpp | 305 ++++++++++++++---- cpp/tests/streaming/test_read_parquet.cpp | 5 +- .../tests/streaming/test_read_parquet.py | 81 ++++- 3 files changed, 323 insertions(+), 68 deletions(-) diff --git a/cpp/src/streaming/cudf/parquet.cpp b/cpp/src/streaming/cudf/parquet.cpp index 8ac1eefd7..24ca1aaf9 100644 --- a/cpp/src/streaming/cudf/parquet.cpp +++ b/cpp/src/streaming/cudf/parquet.cpp @@ -56,6 +56,111 @@ struct ChunkDesc { cudf::io::source_info source; }; +/** + * @brief Estimate total rows by sampling a subset of files. + * + * @param files List of all file paths. + * @param max_samples Maximum number of files to sample. + * + * @return Estimated total rows across all files. + */ +std::int64_t estimate_total_rows( + std::vector const& files, std::size_t max_samples = 10 +) { + RAPIDSMPF_EXPECTS(files.size() > 0, "Must have at least one file"); + + // Sample files with a stride to spread samples evenly across the file list + std::size_t stride = std::max(std::size_t{1}, files.size() / max_samples); + std::vector sample_files; + for (std::size_t i = 0; i < files.size() && sample_files.size() < max_samples; + i += stride) + { + sample_files.push_back(files[i]); + } + + // Read metadata from sampled files to get row count + auto metadata = cudf::io::read_parquet_metadata(cudf::io::source_info(sample_files)); + std::int64_t sampled_rows = metadata.num_rows(); + + // Extrapolate to estimate total rows across all files + return (sampled_rows * static_cast(files.size())) + / static_cast(sample_files.size()); +} + +/** + * @brief Structure to hold row group boundary information for a file. + */ +struct FileRowGroupInfo { + std::vector rg_offsets; ///< Cumulative row offsets for row groups. + std::int64_t total_rows; ///< Total rows in the file. +}; + +/** + * @brief Read row group metadata for a file and compute cumulative offsets. + * + * @param filepath Path to the parquet file. + * + * @return FileRowGroupInfo with row group offsets. + */ +FileRowGroupInfo get_file_row_group_info(std::string const& filepath) { + auto metadata = cudf::io::read_parquet_metadata(cudf::io::source_info(filepath)); + auto rg_metadata = metadata.rowgroup_metadata(); + std::vector rg_offsets; + rg_offsets.reserve(rg_metadata.size() + 1); + rg_offsets.push_back(0); + for (auto const& rg : rg_metadata) { + rg_offsets.push_back(rg_offsets.back() + rg.at("num_rows")); + } + auto total_rows = rg_offsets.back(); + return FileRowGroupInfo{ + .rg_offsets = std::move(rg_offsets), .total_rows = total_rows + }; +} + +/** + * @brief Compute row-group-aligned skip_rows and num_rows for a split of a file. + * + * Given a file's row group offsets, compute the skip_rows and num_rows for a specific + * split, aligning to row group boundaries when possible. + * + * @param rg_info Row group information for the file. + * @param split_idx Index of this split (0-based). + * @param total_splits Total number of splits for this file. + * + * @return Pair of (skip_rows, num_rows). + */ +std::pair compute_split_range( + FileRowGroupInfo const& rg_info, std::size_t split_idx, std::size_t total_splits +) { + auto const& rg_offsets = rg_info.rg_offsets; + auto num_row_groups = rg_offsets.size() - 1; + auto total_rows = rg_info.total_rows; + + std::int64_t skip_rows, num_rows; + + if (total_splits <= num_row_groups) { + // Align to row groups - distribute row groups evenly across splits + std::size_t rg_per_split = num_row_groups / total_splits; + std::size_t extra_rg = num_row_groups % total_splits; + std::size_t rg_start = split_idx * rg_per_split + std::min(split_idx, extra_rg); + std::size_t rg_end = rg_start + rg_per_split + (split_idx < extra_rg ? 1 : 0); + skip_rows = rg_offsets[rg_start]; + num_rows = rg_offsets[rg_end] - skip_rows; + } else { + // More splits than row groups - split by rows + std::int64_t rows_per_split = + total_rows / static_cast(total_splits); + std::int64_t extra_rows = total_rows % static_cast(total_splits); + auto split_idx_signed = static_cast(split_idx); + + skip_rows = + split_idx_signed * rows_per_split + std::min(split_idx_signed, extra_rows); + num_rows = rows_per_split + (split_idx_signed < extra_rows ? 1 : 0); + } + + return {skip_rows, num_rows}; +} + /** * @brief Read chunks and send them to an output channel. * @@ -153,70 +258,156 @@ Node read_parquet( std::ranges::single_view(filter->stream) ); } - // TODO: Handle case where multiple ranks are reading from a single file. - int files_per_rank = - static_cast(files.size() / size + (rank < (files.size() % size))); - int file_offset = rank * (files.size() / size) + std::min(rank, files.size() % size); - auto local_files = std::vector( - files.begin() + file_offset, files.begin() + file_offset + files_per_rank - ); std::uint64_t sequence_number = 0; std::vector> chunks_per_producer(num_producers); - auto const num_files = local_files.size(); - // Estimate number of rows per file - std::size_t files_per_chunk = 1; - if (files.size() > 1) { - auto nrows = - cudf::io::read_parquet_metadata(cudf::io::source_info(local_files[0])) - .num_rows(); - files_per_chunk = - static_cast(std::max(num_rows_per_chunk / nrows, 1l)); - } - auto to_skip = options.get_skip_rows(); - auto to_read = options.get_num_rows().value_or(std::numeric_limits::max()); - for (std::size_t file_offset = 0; file_offset < num_files; - file_offset += files_per_chunk) - { - std::vector chunk_files; - auto const nchunk_files = std::min(num_files - file_offset, files_per_chunk); - std::ranges::copy_n( - local_files.begin() + static_cast(file_offset), - static_cast(nchunk_files), - std::back_inserter(chunk_files) + + if (files.size() >= size) { + // Standard case: at least one file per rank + // Distribute files evenly across ranks + int files_per_rank = + static_cast(files.size() / size + (rank < (files.size() % size))); + int file_offset = + rank * (files.size() / size) + std::min(rank, files.size() % size); + auto local_files = std::vector( + files.begin() + file_offset, files.begin() + file_offset + files_per_rank ); - auto source = cudf::io::source_info(chunk_files); - // Must read [skip_rows, skip_rows + num_rows) from full fileset - auto chunk_rows = cudf::io::read_parquet_metadata(source).num_rows() - to_skip; - auto chunk_skip_rows = to_skip; - // If the chunk is larger than the number rows we need to skip, on the next - // iteration we don't need to skip any more rows, otherwise we must skip the - // remainder. - to_skip = std::max(0l, -chunk_rows); - while (chunk_rows > 0 && to_read > 0) { - auto rows_read = - std::min({static_cast(num_rows_per_chunk), chunk_rows, to_read}); - chunks_per_producer[sequence_number % num_producers].emplace_back( - sequence_number, chunk_skip_rows, rows_read, source + auto const num_files = local_files.size(); + // Estimate number of rows per file + std::size_t files_per_chunk = 1; + if (files.size() > 1) { + auto nrows = + cudf::io::read_parquet_metadata(cudf::io::source_info(local_files[0])) + .num_rows(); + files_per_chunk = + static_cast(std::max(num_rows_per_chunk / nrows, 1l)); + } + auto to_skip = options.get_skip_rows(); + auto to_read = + options.get_num_rows().value_or(std::numeric_limits::max()); + for (std::size_t file_offset = 0; file_offset < num_files; + file_offset += files_per_chunk) + { + std::vector chunk_files; + auto const nchunk_files = std::min(num_files - file_offset, files_per_chunk); + std::ranges::copy_n( + local_files.begin() + static_cast(file_offset), + static_cast(nchunk_files), + std::back_inserter(chunk_files) ); - sequence_number++; - to_read = std::max(0l, to_read - rows_read); - chunk_skip_rows += rows_read; - chunk_rows -= rows_read; + auto source = cudf::io::source_info(chunk_files); + // Must read [skip_rows, skip_rows + num_rows) from full fileset + auto chunk_rows = + cudf::io::read_parquet_metadata(source).num_rows() - to_skip; + auto chunk_skip_rows = to_skip; + // If the chunk is larger than the number rows we need to skip, on the next + // iteration we don't need to skip any more rows, otherwise we must skip the + // remainder. + to_skip = std::max(0l, -chunk_rows); + while (chunk_rows > 0 && to_read > 0) { + auto rows_read = std::min( + {static_cast(num_rows_per_chunk), chunk_rows, to_read} + ); + chunks_per_producer[sequence_number % num_producers].emplace_back( + sequence_number, chunk_skip_rows, rows_read, source + ); + sequence_number++; + to_read = std::max(0l, to_read - rows_read); + chunk_skip_rows += rows_read; + chunk_rows -= rows_read; + } + } + } else { + // Multi-rank single-file case: fewer files than ranks + // Use sampling to estimate chunks and distribute work across ranks + auto const num_files = files.size(); + + // Estimate total rows by sampling + auto estimated_total_rows = estimate_total_rows(files); + + // Estimate total chunks and splits per file + auto estimated_total_chunks = std::max( + std::size_t{1}, + static_cast(estimated_total_rows / num_rows_per_chunk) + ); + auto splits_per_file = + (estimated_total_chunks + num_files - 1) / num_files; // Round up + auto total_splits = num_files * splits_per_file; + + // Distribute split indices across ranks (only use as many ranks as we have + // splits) + auto active_ranks = std::min(size, total_splits); + if (rank < active_ranks) { + // Distribute splits evenly across active ranks + auto splits_per_rank = total_splits / active_ranks; + auto extra_splits = total_splits % active_ranks; + auto split_start = rank * splits_per_rank + std::min(rank, extra_splits); + auto split_end = + split_start + splits_per_rank + (rank < extra_splits ? 1 : 0); + + // Process each split assigned to this rank + // Track which file we're currently working on to avoid re-reading metadata + std::size_t current_file_idx = std::numeric_limits::max(); + FileRowGroupInfo current_file_info; + + for (auto split_idx = split_start; split_idx < split_end; ++split_idx) { + auto file_idx = split_idx / splits_per_file; + auto local_split_idx = split_idx % splits_per_file; + + if (file_idx >= num_files) { + // Past the end of files (can happen with rounding) + break; + } + + // Read file metadata if we haven't already for this file + if (file_idx != current_file_idx) { + current_file_idx = file_idx; + current_file_info = get_file_row_group_info(files[file_idx]); + } + + // Compute row-group-aligned range for this split + auto [skip_rows, total_rows_for_split] = compute_split_range( + current_file_info, local_split_idx, splits_per_file + ); + + if (total_rows_for_split <= 0) { + continue; + } + + // Produce chunks of num_rows_per_chunk from this split's row range + auto source = cudf::io::source_info(files[file_idx]); + auto chunk_skip = skip_rows; + auto remaining = total_rows_for_split; + + while (remaining > 0) { + auto chunk_rows = std::min( + static_cast(num_rows_per_chunk), remaining + ); + chunks_per_producer[sequence_number % num_producers].emplace_back( + ChunkDesc{ + .sequence_number = sequence_number, + .skip_rows = chunk_skip, + .num_rows = chunk_rows, + .source = source + } + ); + sequence_number++; + chunk_skip += chunk_rows; + remaining -= chunk_rows; + } + } } } if (std::ranges::all_of(chunks_per_producer, [](auto&& v) { return v.empty(); })) { - if (local_files.size() > 0) { - // If we're on the hook to read some files, but the skip_rows/num_rows setup - // meant our slice was empty, send an empty table of correct shape. - // Anyone with no files will just immediately close their output channel. - auto empty_opts = options; - empty_opts.set_source(cudf::io::source_info(local_files[0])); - empty_opts.set_skip_rows(0); - empty_opts.set_num_rows(0); - co_await ctx->executor()->schedule(ch_out->send(read_parquet_chunk( - ctx, ctx->br()->stream_pool().get_stream(), std::move(empty_opts), 0 - ))); - } + // If we're on the hook to read some files, but the skip_rows/num_rows setup + // meant our slice was empty, send an empty table of correct shape. + // Use the first file to get the schema for the empty table. + auto empty_opts = options; + empty_opts.set_source(cudf::io::source_info(files[0])); + empty_opts.set_skip_rows(0); + empty_opts.set_num_rows(0); + co_await ctx->executor()->schedule(ch_out->send(read_parquet_chunk( + ctx, ctx->br()->stream_pool().get_stream(), std::move(empty_opts), 0 + ))); } else { std::vector read_tasks; read_tasks.reserve(1 + num_producers); diff --git a/cpp/tests/streaming/test_read_parquet.cpp b/cpp/tests/streaming/test_read_parquet.cpp index c6d2014f7..7236dcb93 100644 --- a/cpp/tests/streaming/test_read_parquet.cpp +++ b/cpp/tests/streaming/test_read_parquet.cpp @@ -102,10 +102,7 @@ class StreamingReadParquet : public BaseStreamingFixture { [[nodiscard]] cudf::io::source_info get_source_info(bool truncate_file_list) const { if (truncate_file_list) { - std::vector files( - source_files.begin(), source_files.begin() + 2 - ); - return cudf::io::source_info(source_files); + return cudf::io::source_info(std::vector{source_files[0]}); } else { return cudf::io::source_info(source_files); } diff --git a/python/rapidsmpf/rapidsmpf/tests/streaming/test_read_parquet.py b/python/rapidsmpf/rapidsmpf/tests/streaming/test_read_parquet.py index 1d5b96e3c..93abfc19d 100644 --- a/python/rapidsmpf/rapidsmpf/tests/streaming/test_read_parquet.py +++ b/python/rapidsmpf/rapidsmpf/tests/streaming/test_read_parquet.py @@ -26,21 +26,24 @@ from rapidsmpf.streaming.core.node import CppNode -@pytest.fixture(scope="module") -def source( - tmp_path_factory: pytest.TempPathFactory, +def _create_parquet_files( + tmp_path_factory: pytest.TempPathFactory, num_files: int, nrows_per_file: int = 10 ) -> plc.io.SourceInfo: + """Helper to create parquet files for testing.""" path = tmp_path_factory.mktemp("read_parquet") - nrows = 10 start = 0 sources = [] - for i in range(10): + for i in range(num_files): table = plc.Table( - [plc.Column.from_array(np.arange(start, start + nrows, dtype="int32"))] + [ + plc.Column.from_array( + np.arange(start, start + nrows_per_file, dtype="int32") + ) + ] ) # gaps in the column numbering we produce - start += nrows + nrows // 2 + start += nrows_per_file + nrows_per_file // 2 filename = path / f"{i:3d}.pq" sink = plc.io.SinkInfo([filename]) options = plc.io.parquet.ParquetWriterOptions.builder(sink, table).build() @@ -49,6 +52,21 @@ def source( return plc.io.SourceInfo(sources) +@pytest.fixture(scope="module") +def source( + tmp_path_factory: pytest.TempPathFactory, +) -> plc.io.SourceInfo: + return _create_parquet_files(tmp_path_factory, num_files=10) + + +@pytest.fixture(scope="module") +def single_file_source( + tmp_path_factory: pytest.TempPathFactory, +) -> plc.io.SourceInfo: + """Single file source for testing fewer-files-than-ranks scenario.""" + return _create_parquet_files(tmp_path_factory, num_files=1, nrows_per_file=100) + + def make_filter(stream: Stream) -> plc.expressions.Expression: return plc.expressions.Operation( plc.expressions.ASTOperator.LESS, @@ -159,3 +177,52 @@ def test_read_parquet( plc.DataType(plc.TypeId.BOOL8), ) assert all_equal.to_py() + + +def test_read_parquet_single_file( + context: Context, + single_file_source: plc.io.SourceInfo, +) -> None: + """Test reading a single file, which exercises the fewer-files-than-ranks code path.""" + ch: Channel[TableChunk] = context.create_channel() + + options = plc.io.parquet.ParquetReaderOptions.builder(single_file_source).build() + + producer = make_producer(context, ch, options, use_filter=False) + + consumer, deferred_messages = pull_from_channel(context, ch) + + run_streaming_pipeline(nodes=[producer, consumer]) + + messages = deferred_messages.release() + assert all( + m1.sequence_number < m2.sequence_number + for m1, m2 in itertools.pairwise(messages) + ) + chunks = [TableChunk.from_message(m) for m in messages] + for chunk in chunks: + chunk.stream.synchronize() + + got = plc.concatenate.concatenate([chunk.table_view() for chunk in chunks]) + for chunk in chunks: + chunk.stream.synchronize() + + expected = get_expected( + context, single_file_source, skip_rows="none", num_rows="all", use_filter=False + ) + + assert got.num_rows() == expected.num_rows() + assert got.num_columns() == expected.num_columns() + assert got.num_columns() == 1 + + all_equal = plc.reduce.reduce( + plc.binaryop.binary_operation( + got.columns()[0], + expected.columns()[0], + plc.binaryop.BinaryOperator.EQUAL, + plc.DataType(plc.TypeId.BOOL8), + ), + plc.aggregation.all(), + plc.DataType(plc.TypeId.BOOL8), + ) + assert all_equal.to_py() From d9bb9c5bdcb34d0161d6afc95b129756183d360c Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 11 Dec 2025 09:58:22 -0800 Subject: [PATCH 2/5] fix empty data bug --- cpp/src/streaming/cudf/parquet.cpp | 36 +++++++++++++++++------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/cpp/src/streaming/cudf/parquet.cpp b/cpp/src/streaming/cudf/parquet.cpp index 24ca1aaf9..f1a830d40 100644 --- a/cpp/src/streaming/cudf/parquet.cpp +++ b/cpp/src/streaming/cudf/parquet.cpp @@ -260,17 +260,19 @@ Node read_parquet( } std::uint64_t sequence_number = 0; std::vector> chunks_per_producer(num_producers); + bool rank_has_assigned_work = false; // Track if this rank was assigned work if (files.size() >= size) { // Standard case: at least one file per rank // Distribute files evenly across ranks - int files_per_rank = - static_cast(files.size() / size + (rank < (files.size() % size))); - int file_offset = + std::size_t files_per_rank = files.size() / size + (rank < (files.size() % size)); + std::size_t file_offset = rank * (files.size() / size) + std::min(rank, files.size() % size); auto local_files = std::vector( - files.begin() + file_offset, files.begin() + file_offset + files_per_rank + files.begin() + static_cast(file_offset), + files.begin() + static_cast(file_offset + files_per_rank) ); + rank_has_assigned_work = !local_files.empty(); auto const num_files = local_files.size(); // Estimate number of rows per file std::size_t files_per_chunk = 1; @@ -336,7 +338,8 @@ Node read_parquet( // Distribute split indices across ranks (only use as many ranks as we have // splits) auto active_ranks = std::min(size, total_splits); - if (rank < active_ranks) { + rank_has_assigned_work = (rank < active_ranks); + if (rank_has_assigned_work) { // Distribute splits evenly across active ranks auto splits_per_rank = total_splits / active_ranks; auto extra_splits = total_splits % active_ranks; @@ -398,16 +401,19 @@ Node read_parquet( } } if (std::ranges::all_of(chunks_per_producer, [](auto&& v) { return v.empty(); })) { - // If we're on the hook to read some files, but the skip_rows/num_rows setup - // meant our slice was empty, send an empty table of correct shape. - // Use the first file to get the schema for the empty table. - auto empty_opts = options; - empty_opts.set_source(cudf::io::source_info(files[0])); - empty_opts.set_skip_rows(0); - empty_opts.set_num_rows(0); - co_await ctx->executor()->schedule(ch_out->send(read_parquet_chunk( - ctx, ctx->br()->stream_pool().get_stream(), std::move(empty_opts), 0 - ))); + if (rank_has_assigned_work) { + // If we're on the hook to read some files, but the skip_rows/num_rows setup + // meant our slice was empty, send an empty table of correct shape. + // Use the first file to get the schema for the empty table. + auto empty_opts = options; + empty_opts.set_source(cudf::io::source_info(files[0])); + empty_opts.set_skip_rows(0); + empty_opts.set_num_rows(0); + co_await ctx->executor()->schedule(ch_out->send(read_parquet_chunk( + ctx, ctx->br()->stream_pool().get_stream(), std::move(empty_opts), 0 + ))); + } + // Ranks without assigned work just close their output channel without sending } else { std::vector read_tasks; read_tasks.reserve(1 + num_producers); From b5f6bf7b3eb31b2f0154bf33c218ceea31991079 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 11 Dec 2025 10:18:04 -0800 Subject: [PATCH 3/5] reduce metadata reads a bit for single-file case --- cpp/src/streaming/cudf/parquet.cpp | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/cpp/src/streaming/cudf/parquet.cpp b/cpp/src/streaming/cudf/parquet.cpp index f1a830d40..9520a4da5 100644 --- a/cpp/src/streaming/cudf/parquet.cpp +++ b/cpp/src/streaming/cudf/parquet.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -323,8 +324,17 @@ Node read_parquet( // Use sampling to estimate chunks and distribute work across ranks auto const num_files = files.size(); - // Estimate total rows by sampling - auto estimated_total_rows = estimate_total_rows(files); + // For single file, read metadata once and reuse; otherwise sample + std::optional single_file_info; + std::int64_t estimated_total_rows; + if (num_files == 1) { + // Single file: read metadata once, use for both estimation and splits + single_file_info = get_file_row_group_info(files[0]); + estimated_total_rows = single_file_info->total_rows; + } else { + // Multiple files: sample to estimate + estimated_total_rows = estimate_total_rows(files); + } // Estimate total chunks and splits per file auto estimated_total_chunks = std::max( @@ -364,7 +374,12 @@ Node read_parquet( // Read file metadata if we haven't already for this file if (file_idx != current_file_idx) { current_file_idx = file_idx; - current_file_info = get_file_row_group_info(files[file_idx]); + // Reuse cached metadata for single-file case + if (single_file_info.has_value() && file_idx == 0) { + current_file_info = *single_file_info; + } else { + current_file_info = get_file_row_group_info(files[file_idx]); + } } // Compute row-group-aligned range for this split From bc9ce9edc7fef61e88eaf301205feced8ca88c45 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 11 Dec 2025 10:24:07 -0800 Subject: [PATCH 4/5] general cleanup --- cpp/src/streaming/cudf/parquet.cpp | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/cpp/src/streaming/cudf/parquet.cpp b/cpp/src/streaming/cudf/parquet.cpp index 9520a4da5..5052e71ea 100644 --- a/cpp/src/streaming/cudf/parquet.cpp +++ b/cpp/src/streaming/cudf/parquet.cpp @@ -68,7 +68,7 @@ struct ChunkDesc { std::int64_t estimate_total_rows( std::vector const& files, std::size_t max_samples = 10 ) { - RAPIDSMPF_EXPECTS(files.size() > 0, "Must have at least one file"); + RAPIDSMPF_EXPECTS(!files.empty(), "Must have at least one file"); // Sample files with a stride to spread samples evenly across the file list std::size_t stride = std::max(std::size_t{1}, files.size() / max_samples); @@ -237,7 +237,7 @@ Node read_parquet( "Skipping rows not yet supported in multi-rank execution" ); auto files = source.filepaths(); - RAPIDSMPF_EXPECTS(files.size() > 0, "Must have at least one file to read"); + RAPIDSMPF_EXPECTS(!files.empty(), "Must have at least one file to read"); RAPIDSMPF_EXPECTS( files.size() < std::numeric_limits::max(), "Trying to read too many files" ); @@ -266,7 +266,8 @@ Node read_parquet( if (files.size() >= size) { // Standard case: at least one file per rank // Distribute files evenly across ranks - std::size_t files_per_rank = files.size() / size + (rank < (files.size() % size)); + std::size_t files_per_rank = + files.size() / size + ((rank < (files.size() % size)) ? 1 : 0); std::size_t file_offset = rank * (files.size() / size) + std::min(rank, files.size() % size); auto local_files = std::vector( @@ -281,8 +282,9 @@ Node read_parquet( auto nrows = cudf::io::read_parquet_metadata(cudf::io::source_info(local_files[0])) .num_rows(); - files_per_chunk = - static_cast(std::max(num_rows_per_chunk / nrows, 1l)); + files_per_chunk = static_cast( + std::max(num_rows_per_chunk / nrows, std::int64_t{1}) + ); } auto to_skip = options.get_skip_rows(); auto to_read = @@ -305,16 +307,16 @@ Node read_parquet( // If the chunk is larger than the number rows we need to skip, on the next // iteration we don't need to skip any more rows, otherwise we must skip the // remainder. - to_skip = std::max(0l, -chunk_rows); + to_skip = std::max(std::int64_t{0}, -chunk_rows); while (chunk_rows > 0 && to_read > 0) { auto rows_read = std::min( - {static_cast(num_rows_per_chunk), chunk_rows, to_read} + {static_cast(num_rows_per_chunk), chunk_rows, to_read} ); chunks_per_producer[sequence_number % num_producers].emplace_back( sequence_number, chunk_skip_rows, rows_read, source ); sequence_number++; - to_read = std::max(0l, to_read - rows_read); + to_read = std::max(std::int64_t{0}, to_read - rows_read); chunk_skip_rows += rows_read; chunk_rows -= rows_read; } @@ -325,8 +327,8 @@ Node read_parquet( auto const num_files = files.size(); // For single file, read metadata once and reuse; otherwise sample - std::optional single_file_info; - std::int64_t estimated_total_rows; + std::optional single_file_info = std::nullopt; + std::int64_t estimated_total_rows = 0; if (num_files == 1) { // Single file: read metadata once, use for both estimation and splits single_file_info = get_file_row_group_info(files[0]); From 63aae8926f4e362d53fb876b1e9ad63ab9c65b88 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 17 Dec 2025 06:50:27 -0800 Subject: [PATCH 5/5] refactor --- cpp/src/streaming/cudf/parquet.cpp | 380 +++++++++++++++++------------ 1 file changed, 222 insertions(+), 158 deletions(-) diff --git a/cpp/src/streaming/cudf/parquet.cpp b/cpp/src/streaming/cudf/parquet.cpp index 7de237249..7be9b987d 100644 --- a/cpp/src/streaming/cudf/parquet.cpp +++ b/cpp/src/streaming/cudf/parquet.cpp @@ -163,6 +163,219 @@ std::pair compute_split_range( return {skip_rows, num_rows}; } +/** + * @brief Result of chunk assignment containing work distribution for producers. + */ +struct ChunkAssignment { + std::vector> chunks_per_producer; + bool rank_has_assigned_work; +}; + +/** + * @brief Assign chunks using standard file distribution (at least one file per rank). + * + * Distributes files evenly across ranks, then splits each file's rows into chunks. + * + * @param files List of all file paths. + * @param rank Current rank index. + * @param size Total number of ranks. + * @param num_producers Number of producer tasks. + * @param num_rows_per_chunk Target number of rows per chunk. + * @param options Parquet reader options (for skip_rows/num_rows). + * + * @return ChunkAssignment with chunks distributed across producers. + */ +ChunkAssignment assign_chunks_standard( + std::vector const& files, + std::size_t rank, + std::size_t size, + std::size_t num_producers, + cudf::size_type num_rows_per_chunk, + cudf::io::parquet_reader_options const& options +) { + std::vector> chunks_per_producer(num_producers); + std::uint64_t sequence_number = 0; + + // Distribute files evenly across ranks + std::size_t files_per_rank = + files.size() / size + ((rank < (files.size() % size)) ? 1 : 0); + std::size_t file_offset = + rank * (files.size() / size) + std::min(rank, files.size() % size); + auto local_files = std::vector( + files.begin() + static_cast(file_offset), + files.begin() + static_cast(file_offset + files_per_rank) + ); + bool rank_has_assigned_work = !local_files.empty(); + auto const num_files = local_files.size(); + + // Estimate number of rows per file + std::size_t files_per_chunk = 1; + if (files.size() > 1) { + auto nrows = + cudf::io::read_parquet_metadata(cudf::io::source_info(local_files[0])) + .num_rows(); + files_per_chunk = static_cast( + std::max(num_rows_per_chunk / nrows, std::int64_t{1}) + ); + } + auto to_skip = options.get_skip_rows(); + auto to_read = options.get_num_rows().value_or(std::numeric_limits::max()); + for (std::size_t file_offset = 0; file_offset < num_files; + file_offset += files_per_chunk) + { + std::vector chunk_files; + auto const nchunk_files = std::min(num_files - file_offset, files_per_chunk); + std::ranges::copy_n( + local_files.begin() + static_cast(file_offset), + static_cast(nchunk_files), + std::back_inserter(chunk_files) + ); + auto source = cudf::io::source_info(chunk_files); + // Must read [skip_rows, skip_rows + num_rows) from full fileset + auto chunk_rows = cudf::io::read_parquet_metadata(source).num_rows() - to_skip; + auto chunk_skip_rows = to_skip; + // If the chunk is larger than the number rows we need to skip, on the next + // iteration we don't need to skip any more rows, otherwise we must skip the + // remainder. + to_skip = std::max(std::int64_t{0}, -chunk_rows); + while (chunk_rows > 0 && to_read > 0) { + auto rows_read = std::min( + {static_cast(num_rows_per_chunk), chunk_rows, to_read} + ); + chunks_per_producer[sequence_number % num_producers].emplace_back( + sequence_number, chunk_skip_rows, rows_read, source + ); + sequence_number++; + to_read = std::max(std::int64_t{0}, to_read - rows_read); + chunk_skip_rows += rows_read; + chunk_rows -= rows_read; + } + } + + return { + .chunks_per_producer = std::move(chunks_per_producer), + .rank_has_assigned_work = rank_has_assigned_work + }; +} + +/** + * @brief Assign chunks by splitting files across multiple ranks. + * + * Used when there are fewer files than ranks. Splits each file into multiple + * row-group-aligned ranges and distributes those splits across ranks. + * + * @param files List of all file paths. + * @param rank Current rank index. + * @param size Total number of ranks. + * @param num_producers Number of producer tasks. + * @param num_rows_per_chunk Target number of rows per chunk. + * + * @return ChunkAssignment with chunks distributed across producers. + */ +ChunkAssignment assign_chunks_split_files( + std::vector const& files, + std::size_t rank, + std::size_t size, + std::size_t num_producers, + cudf::size_type num_rows_per_chunk +) { + std::vector> chunks_per_producer(num_producers); + std::uint64_t sequence_number = 0; + auto const num_files = files.size(); + + // For single file, read metadata once and reuse; otherwise sample + std::optional single_file_info = std::nullopt; + std::int64_t estimated_total_rows = 0; + if (num_files == 1) { + // Single file: read metadata once, use for both estimation and splits + single_file_info = get_file_row_group_info(files[0]); + estimated_total_rows = single_file_info->total_rows; + } else { + // Multiple files: sample to estimate + estimated_total_rows = estimate_total_rows(files); + } + + // Estimate total chunks and splits per file + auto estimated_total_chunks = std::max( + std::size_t{1}, + static_cast(estimated_total_rows / num_rows_per_chunk) + ); + auto splits_per_file = + (estimated_total_chunks + num_files - 1) / num_files; // Round up + auto total_splits = num_files * splits_per_file; + + // Distribute split indices across ranks (only use as many ranks as we have splits) + auto active_ranks = std::min(size, total_splits); + bool rank_has_assigned_work = (rank < active_ranks); + if (rank_has_assigned_work) { + // Distribute splits evenly across active ranks + auto splits_per_rank = total_splits / active_ranks; + auto extra_splits = total_splits % active_ranks; + auto split_start = rank * splits_per_rank + std::min(rank, extra_splits); + auto split_end = split_start + splits_per_rank + (rank < extra_splits ? 1 : 0); + + // Process each split assigned to this rank + // Track which file we're currently working on to avoid re-reading metadata + std::size_t current_file_idx = std::numeric_limits::max(); + FileRowGroupInfo current_file_info{.rg_offsets = {}, .total_rows = 0}; + + for (auto split_idx = split_start; split_idx < split_end; ++split_idx) { + auto file_idx = split_idx / splits_per_file; + auto local_split_idx = split_idx % splits_per_file; + + if (file_idx >= num_files) { + // Past the end of files (can happen with rounding) + break; + } + + // Read file metadata if we haven't already for this file + if (file_idx != current_file_idx) { + current_file_idx = file_idx; + // Reuse cached metadata for single-file case + if (single_file_info.has_value() && file_idx == 0) { + current_file_info = *single_file_info; + } else { + current_file_info = get_file_row_group_info(files[file_idx]); + } + } + + // Compute row-group-aligned range for this split + auto [skip_rows, total_rows_for_split] = + compute_split_range(current_file_info, local_split_idx, splits_per_file); + + if (total_rows_for_split <= 0) { + continue; + } + + // Produce chunks of num_rows_per_chunk from this split's row range + auto source = cudf::io::source_info(files[file_idx]); + auto chunk_skip = skip_rows; + auto remaining = total_rows_for_split; + + while (remaining > 0) { + auto chunk_rows = + std::min(static_cast(num_rows_per_chunk), remaining); + chunks_per_producer[sequence_number % num_producers].emplace_back( + ChunkDesc{ + .sequence_number = sequence_number, + .skip_rows = chunk_skip, + .num_rows = chunk_rows, + .source = source + } + ); + sequence_number++; + chunk_skip += chunk_rows; + remaining -= chunk_rows; + } + } + } + + return { + .chunks_per_producer = std::move(chunks_per_producer), + .rank_has_assigned_work = rank_has_assigned_work + }; +} + /** * @brief Read chunks and send them to an output channel. * @@ -272,164 +485,15 @@ Node read_parquet( std::ranges::single_view(filter->stream) ); } - std::uint64_t sequence_number = 0; - std::vector> chunks_per_producer(num_producers); - bool rank_has_assigned_work = false; // Track if this rank was assigned work - - if (files.size() >= size) { - // Standard case: at least one file per rank - // Distribute files evenly across ranks - std::size_t files_per_rank = - files.size() / size + ((rank < (files.size() % size)) ? 1 : 0); - std::size_t file_offset = - rank * (files.size() / size) + std::min(rank, files.size() % size); - auto local_files = std::vector( - files.begin() + static_cast(file_offset), - files.begin() + static_cast(file_offset + files_per_rank) - ); - rank_has_assigned_work = !local_files.empty(); - auto const num_files = local_files.size(); - // Estimate number of rows per file - std::size_t files_per_chunk = 1; - if (files.size() > 1) { - auto nrows = - cudf::io::read_parquet_metadata(cudf::io::source_info(local_files[0])) - .num_rows(); - files_per_chunk = static_cast( - std::max(num_rows_per_chunk / nrows, std::int64_t{1}) - ); - } - auto to_skip = options.get_skip_rows(); - auto to_read = - options.get_num_rows().value_or(std::numeric_limits::max()); - for (std::size_t file_offset = 0; file_offset < num_files; - file_offset += files_per_chunk) - { - std::vector chunk_files; - auto const nchunk_files = std::min(num_files - file_offset, files_per_chunk); - std::ranges::copy_n( - local_files.begin() + static_cast(file_offset), - static_cast(nchunk_files), - std::back_inserter(chunk_files) - ); - auto source = cudf::io::source_info(chunk_files); - // Must read [skip_rows, skip_rows + num_rows) from full fileset - auto chunk_rows = - cudf::io::read_parquet_metadata(source).num_rows() - to_skip; - auto chunk_skip_rows = to_skip; - // If the chunk is larger than the number rows we need to skip, on the next - // iteration we don't need to skip any more rows, otherwise we must skip the - // remainder. - to_skip = std::max(std::int64_t{0}, -chunk_rows); - while (chunk_rows > 0 && to_read > 0) { - auto rows_read = std::min( - {static_cast(num_rows_per_chunk), chunk_rows, to_read} - ); - chunks_per_producer[sequence_number % num_producers].emplace_back( - sequence_number, chunk_skip_rows, rows_read, source - ); - sequence_number++; - to_read = std::max(std::int64_t{0}, to_read - rows_read); - chunk_skip_rows += rows_read; - chunk_rows -= rows_read; - } - } - } else { - // Multi-rank single-file case: fewer files than ranks - // Use sampling to estimate chunks and distribute work across ranks - auto const num_files = files.size(); - - // For single file, read metadata once and reuse; otherwise sample - std::optional single_file_info = std::nullopt; - std::int64_t estimated_total_rows = 0; - if (num_files == 1) { - // Single file: read metadata once, use for both estimation and splits - single_file_info = get_file_row_group_info(files[0]); - estimated_total_rows = single_file_info->total_rows; - } else { - // Multiple files: sample to estimate - estimated_total_rows = estimate_total_rows(files); - } - - // Estimate total chunks and splits per file - auto estimated_total_chunks = std::max( - std::size_t{1}, - static_cast(estimated_total_rows / num_rows_per_chunk) - ); - auto splits_per_file = - (estimated_total_chunks + num_files - 1) / num_files; // Round up - auto total_splits = num_files * splits_per_file; - - // Distribute split indices across ranks (only use as many ranks as we have - // splits) - auto active_ranks = std::min(size, total_splits); - rank_has_assigned_work = (rank < active_ranks); - if (rank_has_assigned_work) { - // Distribute splits evenly across active ranks - auto splits_per_rank = total_splits / active_ranks; - auto extra_splits = total_splits % active_ranks; - auto split_start = rank * splits_per_rank + std::min(rank, extra_splits); - auto split_end = - split_start + splits_per_rank + (rank < extra_splits ? 1 : 0); - - // Process each split assigned to this rank - // Track which file we're currently working on to avoid re-reading metadata - std::size_t current_file_idx = std::numeric_limits::max(); - FileRowGroupInfo current_file_info; - - for (auto split_idx = split_start; split_idx < split_end; ++split_idx) { - auto file_idx = split_idx / splits_per_file; - auto local_split_idx = split_idx % splits_per_file; - - if (file_idx >= num_files) { - // Past the end of files (can happen with rounding) - break; - } - - // Read file metadata if we haven't already for this file - if (file_idx != current_file_idx) { - current_file_idx = file_idx; - // Reuse cached metadata for single-file case - if (single_file_info.has_value() && file_idx == 0) { - current_file_info = *single_file_info; - } else { - current_file_info = get_file_row_group_info(files[file_idx]); - } - } - - // Compute row-group-aligned range for this split - auto [skip_rows, total_rows_for_split] = compute_split_range( - current_file_info, local_split_idx, splits_per_file - ); - - if (total_rows_for_split <= 0) { - continue; - } - - // Produce chunks of num_rows_per_chunk from this split's row range - auto source = cudf::io::source_info(files[file_idx]); - auto chunk_skip = skip_rows; - auto remaining = total_rows_for_split; - - while (remaining > 0) { - auto chunk_rows = std::min( - static_cast(num_rows_per_chunk), remaining - ); - chunks_per_producer[sequence_number % num_producers].emplace_back( - ChunkDesc{ - .sequence_number = sequence_number, - .skip_rows = chunk_skip, - .num_rows = chunk_rows, - .source = source - } - ); - sequence_number++; - chunk_skip += chunk_rows; - remaining -= chunk_rows; - } - } - } - } + // Assign chunks to producers based on file/rank distribution + auto [chunks_per_producer, rank_has_assigned_work] = + (files.size() >= size) + ? assign_chunks_standard( + files, rank, size, num_producers, num_rows_per_chunk, options + ) + : assign_chunks_split_files( + files, rank, size, num_producers, num_rows_per_chunk + ); if (std::ranges::all_of(chunks_per_producer, [](auto&& v) { return v.empty(); })) { if (rank_has_assigned_work) { // If we're on the hook to read some files, but the skip_rows/num_rows setup