diff --git a/cpp/src/streaming/cudf/parquet.cpp b/cpp/src/streaming/cudf/parquet.cpp index 26d8c1fc3..7be9b987d 100644 --- a/cpp/src/streaming/cudf/parquet.cpp +++ b/cpp/src/streaming/cudf/parquet.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -57,6 +58,324 @@ struct ChunkDesc { cudf::io::source_info source; }; +/** + * @brief Estimate total rows by sampling a subset of files. + * + * @param files List of all file paths. + * @param max_samples Maximum number of files to sample. + * + * @return Estimated total rows across all files. + */ +std::int64_t estimate_total_rows( + std::vector const& files, std::size_t max_samples = 10 +) { + RAPIDSMPF_EXPECTS(!files.empty(), "Must have at least one file"); + + // Sample files with a stride to spread samples evenly across the file list + std::size_t stride = std::max(std::size_t{1}, files.size() / max_samples); + std::vector sample_files; + for (std::size_t i = 0; i < files.size() && sample_files.size() < max_samples; + i += stride) + { + sample_files.push_back(files[i]); + } + + // Read metadata from sampled files to get row count + auto metadata = cudf::io::read_parquet_metadata(cudf::io::source_info(sample_files)); + std::int64_t sampled_rows = metadata.num_rows(); + + // Extrapolate to estimate total rows across all files + return (sampled_rows * static_cast(files.size())) + / static_cast(sample_files.size()); +} + +/** + * @brief Structure to hold row group boundary information for a file. + */ +struct FileRowGroupInfo { + std::vector rg_offsets; ///< Cumulative row offsets for row groups. + std::int64_t total_rows; ///< Total rows in the file. +}; + +/** + * @brief Read row group metadata for a file and compute cumulative offsets. + * + * @param filepath Path to the parquet file. + * + * @return FileRowGroupInfo with row group offsets. + */ +FileRowGroupInfo get_file_row_group_info(std::string const& filepath) { + auto metadata = cudf::io::read_parquet_metadata(cudf::io::source_info(filepath)); + auto rg_metadata = metadata.rowgroup_metadata(); + std::vector rg_offsets; + rg_offsets.reserve(rg_metadata.size() + 1); + rg_offsets.push_back(0); + for (auto const& rg : rg_metadata) { + rg_offsets.push_back(rg_offsets.back() + rg.at("num_rows")); + } + auto total_rows = rg_offsets.back(); + return FileRowGroupInfo{ + .rg_offsets = std::move(rg_offsets), .total_rows = total_rows + }; +} + +/** + * @brief Compute row-group-aligned skip_rows and num_rows for a split of a file. + * + * Given a file's row group offsets, compute the skip_rows and num_rows for a specific + * split, aligning to row group boundaries when possible. + * + * @param rg_info Row group information for the file. + * @param split_idx Index of this split (0-based). + * @param total_splits Total number of splits for this file. + * + * @return Pair of (skip_rows, num_rows). + */ +std::pair compute_split_range( + FileRowGroupInfo const& rg_info, std::size_t split_idx, std::size_t total_splits +) { + auto const& rg_offsets = rg_info.rg_offsets; + auto num_row_groups = rg_offsets.size() - 1; + auto total_rows = rg_info.total_rows; + + std::int64_t skip_rows, num_rows; + + if (total_splits <= num_row_groups) { + // Align to row groups - distribute row groups evenly across splits + std::size_t rg_per_split = num_row_groups / total_splits; + std::size_t extra_rg = num_row_groups % total_splits; + std::size_t rg_start = split_idx * rg_per_split + std::min(split_idx, extra_rg); + std::size_t rg_end = rg_start + rg_per_split + (split_idx < extra_rg ? 1 : 0); + skip_rows = rg_offsets[rg_start]; + num_rows = rg_offsets[rg_end] - skip_rows; + } else { + // More splits than row groups - split by rows + std::int64_t rows_per_split = + total_rows / static_cast(total_splits); + std::int64_t extra_rows = total_rows % static_cast(total_splits); + auto split_idx_signed = static_cast(split_idx); + + skip_rows = + split_idx_signed * rows_per_split + std::min(split_idx_signed, extra_rows); + num_rows = rows_per_split + (split_idx_signed < extra_rows ? 1 : 0); + } + + return {skip_rows, num_rows}; +} + +/** + * @brief Result of chunk assignment containing work distribution for producers. + */ +struct ChunkAssignment { + std::vector> chunks_per_producer; + bool rank_has_assigned_work; +}; + +/** + * @brief Assign chunks using standard file distribution (at least one file per rank). + * + * Distributes files evenly across ranks, then splits each file's rows into chunks. + * + * @param files List of all file paths. + * @param rank Current rank index. + * @param size Total number of ranks. + * @param num_producers Number of producer tasks. + * @param num_rows_per_chunk Target number of rows per chunk. + * @param options Parquet reader options (for skip_rows/num_rows). + * + * @return ChunkAssignment with chunks distributed across producers. + */ +ChunkAssignment assign_chunks_standard( + std::vector const& files, + std::size_t rank, + std::size_t size, + std::size_t num_producers, + cudf::size_type num_rows_per_chunk, + cudf::io::parquet_reader_options const& options +) { + std::vector> chunks_per_producer(num_producers); + std::uint64_t sequence_number = 0; + + // Distribute files evenly across ranks + std::size_t files_per_rank = + files.size() / size + ((rank < (files.size() % size)) ? 1 : 0); + std::size_t file_offset = + rank * (files.size() / size) + std::min(rank, files.size() % size); + auto local_files = std::vector( + files.begin() + static_cast(file_offset), + files.begin() + static_cast(file_offset + files_per_rank) + ); + bool rank_has_assigned_work = !local_files.empty(); + auto const num_files = local_files.size(); + + // Estimate number of rows per file + std::size_t files_per_chunk = 1; + if (files.size() > 1) { + auto nrows = + cudf::io::read_parquet_metadata(cudf::io::source_info(local_files[0])) + .num_rows(); + files_per_chunk = static_cast( + std::max(num_rows_per_chunk / nrows, std::int64_t{1}) + ); + } + auto to_skip = options.get_skip_rows(); + auto to_read = options.get_num_rows().value_or(std::numeric_limits::max()); + for (std::size_t file_offset = 0; file_offset < num_files; + file_offset += files_per_chunk) + { + std::vector chunk_files; + auto const nchunk_files = std::min(num_files - file_offset, files_per_chunk); + std::ranges::copy_n( + local_files.begin() + static_cast(file_offset), + static_cast(nchunk_files), + std::back_inserter(chunk_files) + ); + auto source = cudf::io::source_info(chunk_files); + // Must read [skip_rows, skip_rows + num_rows) from full fileset + auto chunk_rows = cudf::io::read_parquet_metadata(source).num_rows() - to_skip; + auto chunk_skip_rows = to_skip; + // If the chunk is larger than the number rows we need to skip, on the next + // iteration we don't need to skip any more rows, otherwise we must skip the + // remainder. + to_skip = std::max(std::int64_t{0}, -chunk_rows); + while (chunk_rows > 0 && to_read > 0) { + auto rows_read = std::min( + {static_cast(num_rows_per_chunk), chunk_rows, to_read} + ); + chunks_per_producer[sequence_number % num_producers].emplace_back( + sequence_number, chunk_skip_rows, rows_read, source + ); + sequence_number++; + to_read = std::max(std::int64_t{0}, to_read - rows_read); + chunk_skip_rows += rows_read; + chunk_rows -= rows_read; + } + } + + return { + .chunks_per_producer = std::move(chunks_per_producer), + .rank_has_assigned_work = rank_has_assigned_work + }; +} + +/** + * @brief Assign chunks by splitting files across multiple ranks. + * + * Used when there are fewer files than ranks. Splits each file into multiple + * row-group-aligned ranges and distributes those splits across ranks. + * + * @param files List of all file paths. + * @param rank Current rank index. + * @param size Total number of ranks. + * @param num_producers Number of producer tasks. + * @param num_rows_per_chunk Target number of rows per chunk. + * + * @return ChunkAssignment with chunks distributed across producers. + */ +ChunkAssignment assign_chunks_split_files( + std::vector const& files, + std::size_t rank, + std::size_t size, + std::size_t num_producers, + cudf::size_type num_rows_per_chunk +) { + std::vector> chunks_per_producer(num_producers); + std::uint64_t sequence_number = 0; + auto const num_files = files.size(); + + // For single file, read metadata once and reuse; otherwise sample + std::optional single_file_info = std::nullopt; + std::int64_t estimated_total_rows = 0; + if (num_files == 1) { + // Single file: read metadata once, use for both estimation and splits + single_file_info = get_file_row_group_info(files[0]); + estimated_total_rows = single_file_info->total_rows; + } else { + // Multiple files: sample to estimate + estimated_total_rows = estimate_total_rows(files); + } + + // Estimate total chunks and splits per file + auto estimated_total_chunks = std::max( + std::size_t{1}, + static_cast(estimated_total_rows / num_rows_per_chunk) + ); + auto splits_per_file = + (estimated_total_chunks + num_files - 1) / num_files; // Round up + auto total_splits = num_files * splits_per_file; + + // Distribute split indices across ranks (only use as many ranks as we have splits) + auto active_ranks = std::min(size, total_splits); + bool rank_has_assigned_work = (rank < active_ranks); + if (rank_has_assigned_work) { + // Distribute splits evenly across active ranks + auto splits_per_rank = total_splits / active_ranks; + auto extra_splits = total_splits % active_ranks; + auto split_start = rank * splits_per_rank + std::min(rank, extra_splits); + auto split_end = split_start + splits_per_rank + (rank < extra_splits ? 1 : 0); + + // Process each split assigned to this rank + // Track which file we're currently working on to avoid re-reading metadata + std::size_t current_file_idx = std::numeric_limits::max(); + FileRowGroupInfo current_file_info{.rg_offsets = {}, .total_rows = 0}; + + for (auto split_idx = split_start; split_idx < split_end; ++split_idx) { + auto file_idx = split_idx / splits_per_file; + auto local_split_idx = split_idx % splits_per_file; + + if (file_idx >= num_files) { + // Past the end of files (can happen with rounding) + break; + } + + // Read file metadata if we haven't already for this file + if (file_idx != current_file_idx) { + current_file_idx = file_idx; + // Reuse cached metadata for single-file case + if (single_file_info.has_value() && file_idx == 0) { + current_file_info = *single_file_info; + } else { + current_file_info = get_file_row_group_info(files[file_idx]); + } + } + + // Compute row-group-aligned range for this split + auto [skip_rows, total_rows_for_split] = + compute_split_range(current_file_info, local_split_idx, splits_per_file); + + if (total_rows_for_split <= 0) { + continue; + } + + // Produce chunks of num_rows_per_chunk from this split's row range + auto source = cudf::io::source_info(files[file_idx]); + auto chunk_skip = skip_rows; + auto remaining = total_rows_for_split; + + while (remaining > 0) { + auto chunk_rows = + std::min(static_cast(num_rows_per_chunk), remaining); + chunks_per_producer[sequence_number % num_producers].emplace_back( + ChunkDesc{ + .sequence_number = sequence_number, + .skip_rows = chunk_skip, + .num_rows = chunk_rows, + .source = source + } + ); + sequence_number++; + chunk_skip += chunk_rows; + remaining -= chunk_rows; + } + } + } + + return { + .chunks_per_producer = std::move(chunks_per_producer), + .rank_has_assigned_work = rank_has_assigned_work + }; +} + /** * @brief Read chunks and send them to an output channel. * @@ -144,7 +463,7 @@ Node read_parquet( "Skipping rows not yet supported in multi-rank execution" ); auto files = source.filepaths(); - RAPIDSMPF_EXPECTS(files.size() > 0, "Must have at least one file to read"); + RAPIDSMPF_EXPECTS(!files.empty(), "Must have at least one file to read"); RAPIDSMPF_EXPECTS( files.size() < std::numeric_limits::max(), "Trying to read too many files" ); @@ -166,70 +485,29 @@ Node read_parquet( std::ranges::single_view(filter->stream) ); } - // TODO: Handle case where multiple ranks are reading from a single file. - int files_per_rank = - static_cast(files.size() / size + (rank < (files.size() % size))); - int file_offset = rank * (files.size() / size) + std::min(rank, files.size() % size); - auto local_files = std::vector( - files.begin() + file_offset, files.begin() + file_offset + files_per_rank - ); - std::uint64_t sequence_number = 0; - std::vector> chunks_per_producer(num_producers); - auto const num_files = local_files.size(); - // Estimate number of rows per file - std::size_t files_per_chunk = 1; - if (files.size() > 1) { - auto nrows = - cudf::io::read_parquet_metadata(cudf::io::source_info(local_files[0])) - .num_rows(); - files_per_chunk = - static_cast(std::max(num_rows_per_chunk / nrows, 1l)); - } - auto to_skip = options.get_skip_rows(); - auto to_read = options.get_num_rows().value_or(std::numeric_limits::max()); - for (std::size_t file_offset = 0; file_offset < num_files; - file_offset += files_per_chunk) - { - std::vector chunk_files; - auto const nchunk_files = std::min(num_files - file_offset, files_per_chunk); - std::ranges::copy_n( - local_files.begin() + static_cast(file_offset), - static_cast(nchunk_files), - std::back_inserter(chunk_files) - ); - auto source = cudf::io::source_info(chunk_files); - // Must read [skip_rows, skip_rows + num_rows) from full fileset - auto chunk_rows = cudf::io::read_parquet_metadata(source).num_rows() - to_skip; - auto chunk_skip_rows = to_skip; - // If the chunk is larger than the number rows we need to skip, on the next - // iteration we don't need to skip any more rows, otherwise we must skip the - // remainder. - to_skip = std::max(0l, -chunk_rows); - while (chunk_rows > 0 && to_read > 0) { - auto rows_read = - std::min({static_cast(num_rows_per_chunk), chunk_rows, to_read}); - chunks_per_producer[sequence_number % num_producers].emplace_back( - sequence_number, chunk_skip_rows, rows_read, source - ); - sequence_number++; - to_read = std::max(0l, to_read - rows_read); - chunk_skip_rows += rows_read; - chunk_rows -= rows_read; - } - } + // Assign chunks to producers based on file/rank distribution + auto [chunks_per_producer, rank_has_assigned_work] = + (files.size() >= size) + ? assign_chunks_standard( + files, rank, size, num_producers, num_rows_per_chunk, options + ) + : assign_chunks_split_files( + files, rank, size, num_producers, num_rows_per_chunk + ); if (std::ranges::all_of(chunks_per_producer, [](auto&& v) { return v.empty(); })) { - if (local_files.size() > 0) { + if (rank_has_assigned_work) { // If we're on the hook to read some files, but the skip_rows/num_rows setup // meant our slice was empty, send an empty table of correct shape. - // Anyone with no files will just immediately close their output channel. + // Use the first file to get the schema for the empty table. auto empty_opts = options; - empty_opts.set_source(cudf::io::source_info(local_files[0])); + empty_opts.set_source(cudf::io::source_info(files[0])); empty_opts.set_skip_rows(0); empty_opts.set_num_rows(0); co_await ctx->executor()->schedule(ch_out->send(read_parquet_chunk( ctx, ctx->br()->stream_pool().get_stream(), std::move(empty_opts), 0 ))); } + // Ranks without assigned work just close their output channel without sending } else { std::vector read_tasks; read_tasks.reserve(1 + num_producers); diff --git a/cpp/tests/streaming/test_read_parquet.cpp b/cpp/tests/streaming/test_read_parquet.cpp index c6d2014f7..7236dcb93 100644 --- a/cpp/tests/streaming/test_read_parquet.cpp +++ b/cpp/tests/streaming/test_read_parquet.cpp @@ -102,10 +102,7 @@ class StreamingReadParquet : public BaseStreamingFixture { [[nodiscard]] cudf::io::source_info get_source_info(bool truncate_file_list) const { if (truncate_file_list) { - std::vector files( - source_files.begin(), source_files.begin() + 2 - ); - return cudf::io::source_info(source_files); + return cudf::io::source_info(std::vector{source_files[0]}); } else { return cudf::io::source_info(source_files); } diff --git a/python/rapidsmpf/rapidsmpf/tests/streaming/test_read_parquet.py b/python/rapidsmpf/rapidsmpf/tests/streaming/test_read_parquet.py index 1d5b96e3c..93abfc19d 100644 --- a/python/rapidsmpf/rapidsmpf/tests/streaming/test_read_parquet.py +++ b/python/rapidsmpf/rapidsmpf/tests/streaming/test_read_parquet.py @@ -26,21 +26,24 @@ from rapidsmpf.streaming.core.node import CppNode -@pytest.fixture(scope="module") -def source( - tmp_path_factory: pytest.TempPathFactory, +def _create_parquet_files( + tmp_path_factory: pytest.TempPathFactory, num_files: int, nrows_per_file: int = 10 ) -> plc.io.SourceInfo: + """Helper to create parquet files for testing.""" path = tmp_path_factory.mktemp("read_parquet") - nrows = 10 start = 0 sources = [] - for i in range(10): + for i in range(num_files): table = plc.Table( - [plc.Column.from_array(np.arange(start, start + nrows, dtype="int32"))] + [ + plc.Column.from_array( + np.arange(start, start + nrows_per_file, dtype="int32") + ) + ] ) # gaps in the column numbering we produce - start += nrows + nrows // 2 + start += nrows_per_file + nrows_per_file // 2 filename = path / f"{i:3d}.pq" sink = plc.io.SinkInfo([filename]) options = plc.io.parquet.ParquetWriterOptions.builder(sink, table).build() @@ -49,6 +52,21 @@ def source( return plc.io.SourceInfo(sources) +@pytest.fixture(scope="module") +def source( + tmp_path_factory: pytest.TempPathFactory, +) -> plc.io.SourceInfo: + return _create_parquet_files(tmp_path_factory, num_files=10) + + +@pytest.fixture(scope="module") +def single_file_source( + tmp_path_factory: pytest.TempPathFactory, +) -> plc.io.SourceInfo: + """Single file source for testing fewer-files-than-ranks scenario.""" + return _create_parquet_files(tmp_path_factory, num_files=1, nrows_per_file=100) + + def make_filter(stream: Stream) -> plc.expressions.Expression: return plc.expressions.Operation( plc.expressions.ASTOperator.LESS, @@ -159,3 +177,52 @@ def test_read_parquet( plc.DataType(plc.TypeId.BOOL8), ) assert all_equal.to_py() + + +def test_read_parquet_single_file( + context: Context, + single_file_source: plc.io.SourceInfo, +) -> None: + """Test reading a single file, which exercises the fewer-files-than-ranks code path.""" + ch: Channel[TableChunk] = context.create_channel() + + options = plc.io.parquet.ParquetReaderOptions.builder(single_file_source).build() + + producer = make_producer(context, ch, options, use_filter=False) + + consumer, deferred_messages = pull_from_channel(context, ch) + + run_streaming_pipeline(nodes=[producer, consumer]) + + messages = deferred_messages.release() + assert all( + m1.sequence_number < m2.sequence_number + for m1, m2 in itertools.pairwise(messages) + ) + chunks = [TableChunk.from_message(m) for m in messages] + for chunk in chunks: + chunk.stream.synchronize() + + got = plc.concatenate.concatenate([chunk.table_view() for chunk in chunks]) + for chunk in chunks: + chunk.stream.synchronize() + + expected = get_expected( + context, single_file_source, skip_rows="none", num_rows="all", use_filter=False + ) + + assert got.num_rows() == expected.num_rows() + assert got.num_columns() == expected.num_columns() + assert got.num_columns() == 1 + + all_equal = plc.reduce.reduce( + plc.binaryop.binary_operation( + got.columns()[0], + expected.columns()[0], + plc.binaryop.BinaryOperator.EQUAL, + plc.DataType(plc.TypeId.BOOL8), + ), + plc.aggregation.all(), + plc.DataType(plc.TypeId.BOOL8), + ) + assert all_equal.to_py()