Skip to content

Commit bde1608

Browse files
feat: Preserve File Partitioning From File Scans (#19124)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #19090. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> Datafusion does not have the option to preserve file partitioning from file scans, rather it always returns `UknownPartitioning`. Some queries and datasets would see great benefits by preserving their explicit partitioning to avoid shuffles. An example of this would be the following scenario: Say have data partitioned by `f_dkey` and ordered by `(f_dkey, timestamp)`, which is hive-style partitioned: ``` f_dkey=A/data.parquet f_dkey=B/data.parquet' f_dkey=C/data.parquet' ... Each table (partitioned by f_dkey and sorted internally sorted by timestamp): | f_dkey | timestamp | value | |--------|------------------------|--------| | A | 2023-01-01T09:00:00 | 95.5 | | A | 2023-01-01T09:00:10 | 102.3 | | A | 2023-01-01T09:00:20 | 98.7 | | A | 2023-01-01T09:12:20 | 105.1 | | A | 2023-01-01T09:12:30 | 100.0 | | A | 2023-01-01T09:12:40 | 150.0 | | A | 2023-01-01T09:12:50 | 120.8 | ``` Runnuing the query: ```sql EXPLAIN SELECT f_dkey, count(*), avg(value) FROM fact_table_ordered GROUP BY f_dkey ORDER BY f_dkey; ``` Prior to this PR would produce the plan: ```text 01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST] 02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*), avg(fact_table_ordered.value)@2 as avg(fact_table_ordered.value)] 03)----AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey], aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted 04)------SortExec: expr=[f_dkey@0 ASC NULLS LAST], preserve_partitioning=[true] 05)--------CoalesceBatchesExec: target_batch_size=8192 06)----------RepartitionExec: partitioning=Hash([f_dkey@0], 3), input_partitions=3 07)------------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey], aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted 08)--------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet ``` This can be improved. Our data is ordered on `(f_dkey, timestamp)`, when we hash repartition by `f_dkey` we lose our sort ordering thus forcing a `SortExec` to be inserted after the repartition. You could set `datafusion.optimizer.prefer_existing_sort = true;` to preserve the ordering through the repartition and thus preserve the ordering, but with the tradeoff of a more expensive shuffle. Since our data is partitioned by `f_dkey` at file scan time we can eliminate both the hash repartitioning, the eliminating the `SortExec` in the process. This would result in a plan that looks like: ```text 01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST] 02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*), avg(fact_table_ordered.value)@2 as avg(fact_table_ordered.value)] 03)----AggregateExec: mode=SinglePartitioned, gby=[f_dkey@1 as f_dkey], aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted 04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet ``` ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> I have extended the `FileScanConfig`'s implementation of `DataSource` to have the ability to preserve hive-style partitioning by declaring it's `output_partitioning` as hash partitoned on the hive columns. When the user sets the option `preserve_file_partitions > 0` (0 by default, which is disabled) Datafusion will take advantage of partitioned files. Specifically, when `preserve_file_partitions` is enabled: 1. if `preserve_file_partitions` > the number distinct partitions found -> we fallback to partitioning by byte ranges 2. if `preserve_file_partitions` <= the number distinct partitions found -> we will keep the file partitioning Because we can have fewer file partition groups than `target_partitions`, forcing a partition group (with possibly large amounts of data) to be read in a single partition can increase file I/O. This configuration choice was made to be able to control the amount of I/O overhead a user is willing to have in order to eliminate shuffles. (This was recommended by @gabotechs and is a great approach to have more granularity over this behavior rather than a boolean flag, thank you) Reusing hash repartitioning has rippling effects throughout query plans, such as propagating through joins and windows as well as preserving order, which is great to see. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 5. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> - Added unit tests for new functionality - Added sqllogictests to confirm end to end behavior - Added new benchmark that compares queries with: no optimization, preserver order through repartitions, and preserve partitioning from file scans (this PR). We see nice speed ups and this scales linearly as data grows (table with results below) ### Small Data: - Normal config: 10 partitions × 1000000 rows = 10000000 total rows - High-cardinality config: 25 partitions × 500000 rows = 12500000 total rows | Plan | preserve_order (ms) | Speedup vs not opt | Speedup vs psr | preserve_order_join (ms) | Speedup vs not opt | Speedup vs psr | preserve_order_window (ms) | Speedup vs not opt | Speedup vs psr | |---------------------------|---------------------|---------------------------|---------------------------|---------------------------|---------------------------|----------------------------|-----------------------------|-----------------------------|----------------------------| | not optimized | 17.268 | - | - | 41.521 | - | - | 28.796 | - | - | | preserve sort repartition | 15.908 | - | - | 40.580 | - | - | 17.669 | - | - | | optimized (this PR) | 15.000 | 13.1% (1.15x) | 5.7% (1.06x) | 40.977 | 1.3% (1.01x) | -1.0% (0.99x) | 4.301 | 85.1% (6.70x) | 75.7% (4.11x) | The optimized plan is roughly on par with the other plans for preserve_order and preserve_order_join, but it makes preserve_order_window 6.7× faster than not optimized and 4.1× faster than preserve-sort-repartition. --- ### Medium Dataset: - Normal config: 30 partitions × 3000000 rows = 90000000 total rows - High-cardinality config: 75 partitions × 1500000 rows = 112500000 total rows | Plan | preserve_order (ms) | Speedup vs not opt | Speedup vs psr | preserve_order_join (ms) | Speedup vs not opt | Speedup vs psr | preserve_order_window (ms) | Speedup vs not opt | Speedup vs psr | |---------------------------|---------------------|---------------------------|---------------------------|---------------------------|---------------------------|----------------------------|-----------------------------|-----------------------------|----------------------------| | not optimized | 752.130 | - | - | 451.300 | - | - | 193.210 | - | - | | preserve sort repartition | 392.050 | - | - | 477.400 | - | - | 115.320 | - | - | | optimized (this PR) | 93.818 | 87.5% (8.02x) | 76.1% (4.18x) | 203.540 | 54.9% (2.22x) | 57.4% (2.35x) | 9.841 | 94.9% (19.63x) | 91.5% (11.72x) | The optimized plan makes preserve_order about 8.0× faster than not optimized (4.2× vs PSR), preserve_order_join about 2.2× faster than not optimized (2.35× vs PSR), and preserve_order_window a huge 19.6× faster than not optimized (11.7× vs PSR). --- ### Large Dataset: - Normal config: 50 partitions × 6000000 rows = 300000000 total rows - High-cardinality config: 125 partitions × 3000000 rows = 375000000 total rows | Plan | preserve_order (ms) | Speedup vs not opt | Speedup vs psr | preserve_order_join (ms) | Speedup vs not opt | Speedup vs psr | preserve_order_window (ms) | Speedup vs not opt | Speedup vs psr | |---------------------------|---------------------|---------------------------|---------------------------|---------------------------|---------------------------|----------------------------|-----------------------------|-----------------------------|----------------------------| | not optimized | 2699.700 | - | - | 1563.800 | - | - | 614.440 | - | - | | preserve sort repartition | 1244.200 | - | - | 1594.300 | - | - | 371.300 | - | - | | optimized (this PR) | 290.740 | 89.2% (9.29x) | 76.6% (4.28x) | 645.180 | 58.7% (2.42x) | 59.5% (2.47x) | 11.538 | 98.1% (53.25x) | 96.9% (32.18x) | The optimized plan makes preserve_order about 9.3× faster than not optimized (4.3× vs PSR), preserve_order_join about 2.4× faster than not optimized (2.5× vs PSR), and preserve_order_window an extreme 53× faster than not optimized (32× vs PSR). ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> Yes users now can use the `preserve_file_partitions` option to define the amount of partitions they want to preserve file partitioning for (0 disabled). If enabled and triggered, users will see elimination of repartitions on their file partition key when appropriate. <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> ## Follow-Up Work - **Superset Partitioning:** currently, `Hash(a)` doesn't satisfy `Hash(a, b)` although it should. This is because `Hash(a)` guarantees that all of `a` is contained in a single partition. Thus, since `Hash(a, b)` is a subset of `Hash(a)`, anything that is `Hash(a)` is also `Hash(a, b)`. - **Reduce File I/O with Preserve File Partitioning:** In the current implementation, when a partition value has many files all of this file I/O will go to one task. This is a tradeoff that increases I/O overhead to eliminate shuffle and sort overhead. There could be ways to increase I/O while still maintaining partitioning. - **Sort Satisfaction for Monotonic Functions:** If we are sorted by `timestamp` and then try to order by `date_bin('1 hour', timestamp)`, Datafusion will not recognize that this is implicitly satisfied. Thus, for monotonic functions: `date_bin`, `CAST`, `FLOOR`, etc. we should maintain ordering, eliminating unnecessary sorts.
1 parent e914935 commit bde1608

File tree

11 files changed

+1842
-28
lines changed

11 files changed

+1842
-28
lines changed

datafusion/catalog-listing/src/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,4 @@ mod table;
3333

3434
pub use config::{ListingTableConfig, SchemaSource};
3535
pub use options::ListingOptions;
36-
pub use table::ListingTable;
36+
pub use table::{ListFilesResult, ListingTable};

datafusion/catalog-listing/src/table.rs

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,17 @@ use std::any::Any;
5252
use std::collections::HashMap;
5353
use std::sync::Arc;
5454

55+
/// Result of a file listing operation from [`ListingTable::list_files_for_scan`].
56+
#[derive(Debug)]
57+
pub struct ListFilesResult {
58+
/// File groups organized by the partitioning strategy.
59+
pub file_groups: Vec<FileGroup>,
60+
/// Aggregated statistics for all files.
61+
pub statistics: Statistics,
62+
/// Whether files are grouped by partition values (enables Hash partitioning).
63+
pub grouped_by_partition: bool,
64+
}
65+
5566
/// Built in [`TableProvider`] that reads data from one or more files as a single table.
5667
///
5768
/// The files are read using an [`ObjectStore`] instance, for example from
@@ -446,7 +457,11 @@ impl TableProvider for ListingTable {
446457
// at the same time. This is because the limit should be applied after the filters are applied.
447458
let statistic_file_limit = if filters.is_empty() { limit } else { None };
448459

449-
let (mut partitioned_file_lists, statistics) = self
460+
let ListFilesResult {
461+
file_groups: mut partitioned_file_lists,
462+
statistics,
463+
grouped_by_partition: partitioned_by_file_group,
464+
} = self
450465
.list_files_for_scan(state, &partition_filters, statistic_file_limit)
451466
.await?;
452467

@@ -508,6 +523,7 @@ impl TableProvider for ListingTable {
508523
.with_limit(limit)
509524
.with_output_ordering(output_ordering)
510525
.with_expr_adapter(self.expr_adapter_factory.clone())
526+
.with_partitioned_by_file_group(partitioned_by_file_group)
511527
.build(),
512528
)
513529
.await?;
@@ -620,11 +636,15 @@ impl ListingTable {
620636
ctx: &'a dyn Session,
621637
filters: &'a [Expr],
622638
limit: Option<usize>,
623-
) -> datafusion_common::Result<(Vec<FileGroup>, Statistics)> {
639+
) -> datafusion_common::Result<ListFilesResult> {
624640
let store = if let Some(url) = self.table_paths.first() {
625641
ctx.runtime_env().object_store(url)?
626642
} else {
627-
return Ok((vec![], Statistics::new_unknown(&self.file_schema)));
643+
return Ok(ListFilesResult {
644+
file_groups: vec![],
645+
statistics: Statistics::new_unknown(&self.file_schema),
646+
grouped_by_partition: false,
647+
});
628648
};
629649
// list files (with partitions)
630650
let file_list = future::try_join_all(self.table_paths.iter().map(|table_path| {
@@ -658,7 +678,35 @@ impl ListingTable {
658678
let (file_group, inexact_stats) =
659679
get_files_with_limit(files, limit, self.options.collect_stat).await?;
660680

661-
let file_groups = file_group.split_files(self.options.target_partitions);
681+
// Threshold: 0 = disabled, N > 0 = enabled when distinct_keys >= N
682+
//
683+
// When enabled, files are grouped by their Hive partition column values, allowing
684+
// FileScanConfig to declare Hash partitioning. This enables the optimizer to skip
685+
// hash repartitioning for aggregates and joins on partition columns.
686+
let threshold = ctx.config_options().optimizer.preserve_file_partitions;
687+
688+
let (file_groups, grouped_by_partition) = if threshold > 0
689+
&& !self.options.table_partition_cols.is_empty()
690+
{
691+
let grouped =
692+
file_group.group_by_partition_values(self.options.target_partitions);
693+
if grouped.len() >= threshold {
694+
(grouped, true)
695+
} else {
696+
let all_files: Vec<_> =
697+
grouped.into_iter().flat_map(|g| g.into_inner()).collect();
698+
(
699+
FileGroup::new(all_files).split_files(self.options.target_partitions),
700+
false,
701+
)
702+
}
703+
} else {
704+
(
705+
file_group.split_files(self.options.target_partitions),
706+
false,
707+
)
708+
};
709+
662710
let (mut file_groups, mut stats) = compute_all_files_statistics(
663711
file_groups,
664712
self.schema(),
@@ -678,7 +726,11 @@ impl ListingTable {
678726
}
679727
Ok::<_, DataFusionError>(())
680728
})?;
681-
Ok((file_groups, stats))
729+
Ok(ListFilesResult {
730+
file_groups,
731+
statistics: stats,
732+
grouped_by_partition,
733+
})
682734
}
683735

684736
/// Collects statistics for a given partitioned file.

datafusion/common/src/config.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -965,6 +965,19 @@ config_namespace! {
965965
/// record tables provided to the MemTable on creation.
966966
pub repartition_file_scans: bool, default = true
967967

968+
/// Minimum number of distinct partition values required to group files by their
969+
/// Hive partition column values (enabling Hash partitioning declaration).
970+
///
971+
/// How the option is used:
972+
/// - preserve_file_partitions=0: Disable it.
973+
/// - preserve_file_partitions=1: Always enable it.
974+
/// - preserve_file_partitions=N, actual file partitions=M: Only enable when M >= N.
975+
/// This threshold preserves I/O parallelism when file partitioning is below it.
976+
///
977+
/// Note: This may reduce parallelism, rooting from the I/O level, if the number of distinct
978+
/// partitions is less than the target_partitions.
979+
pub preserve_file_partitions: usize, default = 0
980+
968981
/// Should DataFusion repartition data using the partitions keys to execute window
969982
/// functions in parallel using the provided `target_partitions` level
970983
pub repartition_windows: bool, default = true

datafusion/core/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,3 +270,8 @@ name = "dataframe"
270270
[[bench]]
271271
harness = false
272272
name = "spm"
273+
274+
[[bench]]
275+
harness = false
276+
name = "preserve_file_partitioning"
277+
required-features = ["parquet"]

0 commit comments

Comments
 (0)