diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 8caeda901b519..85e11853956af 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -30,6 +30,7 @@ use arrow::{ record_batch::RecordBatch, util::pretty::pretty_format_batches, }; +use arrow_schema::SchemaRef; use chrono::{Datelike, Duration, TimeDelta}; use datafusion::{ datasource::{provider_as_source, TableProvider}, @@ -109,6 +110,26 @@ struct ContextWithParquet { ctx: SessionContext, } +struct PruningMetric { + total_pruned: usize, + total_matched: usize, + total_fully_matched: usize, +} + +impl PruningMetric { + pub fn total_pruned(&self) -> usize { + self.total_pruned + } + + pub fn total_matched(&self) -> usize { + self.total_matched + } + + pub fn total_fully_matched(&self) -> usize { + self.total_fully_matched + } +} + /// The output of running one of the test cases struct TestOutput { /// The input query SQL @@ -126,8 +147,8 @@ struct TestOutput { impl TestOutput { /// retrieve the value of the named metric, if any fn metric_value(&self, metric_name: &str) -> Option { - if let Some((pruned, _matched)) = self.pruning_metric(metric_name) { - return Some(pruned); + if let Some(pm) = self.pruning_metric(metric_name) { + return Some(pm.total_pruned()); } self.parquet_metrics @@ -140,9 +161,10 @@ impl TestOutput { }) } - fn pruning_metric(&self, metric_name: &str) -> Option<(usize, usize)> { + fn pruning_metric(&self, metric_name: &str) -> Option { let mut total_pruned = 0; let mut total_matched = 0; + let mut total_fully_matched = 0; let mut found = false; for metric in self.parquet_metrics.iter() { @@ -154,13 +176,19 @@ impl TestOutput { { total_pruned += pruning_metrics.pruned(); total_matched += pruning_metrics.matched(); + total_fully_matched += pruning_metrics.fully_matched(); + found = true; } } } if found { - Some((total_pruned, total_matched)) + Some(PruningMetric { + total_pruned, + total_matched, + total_fully_matched, + }) } else { None } @@ -172,39 +200,33 @@ impl TestOutput { } /// The number of row_groups pruned / matched by bloom filter - fn row_groups_bloom_filter(&self) -> Option<(usize, usize)> { + fn row_groups_bloom_filter(&self) -> Option { self.pruning_metric("row_groups_pruned_bloom_filter") } /// The number of row_groups matched by statistics fn row_groups_matched_statistics(&self) -> Option { self.pruning_metric("row_groups_pruned_statistics") - .map(|(_pruned, matched)| matched) + .map(|pm| pm.total_matched()) } - /* /// The number of row_groups fully matched by statistics fn row_groups_fully_matched_statistics(&self) -> Option { - self.metric_value("row_groups_fully_matched_statistics") - } - - /// The number of row groups pruned by limit pruning - fn limit_pruned_row_groups(&self) -> Option { - self.metric_value("limit_pruned_row_groups") + self.pruning_metric("row_groups_pruned_statistics") + .map(|pm| pm.total_fully_matched()) } - */ /// The number of row_groups pruned by statistics fn row_groups_pruned_statistics(&self) -> Option { self.pruning_metric("row_groups_pruned_statistics") - .map(|(pruned, _matched)| pruned) + .map(|pm| pm.total_pruned()) } /// Metric `files_ranges_pruned_statistics` tracks both pruned and matched count, /// for testing purpose, here it only aggregate the `pruned` count. fn files_ranges_pruned_statistics(&self) -> Option { self.pruning_metric("files_ranges_pruned_statistics") - .map(|(pruned, _matched)| pruned) + .map(|pm| pm.total_pruned()) } /// The number of row_groups matched by bloom filter or statistics @@ -213,14 +235,13 @@ impl TestOutput { /// filter: 7 total -> 3 matched, this function returns 3 for the final matched /// count. fn row_groups_matched(&self) -> Option { - self.row_groups_bloom_filter() - .map(|(_pruned, matched)| matched) + self.row_groups_bloom_filter().map(|pm| pm.total_matched()) } /// The number of row_groups pruned fn row_groups_pruned(&self) -> Option { self.row_groups_bloom_filter() - .map(|(pruned, _matched)| pruned) + .map(|pm| pm.total_pruned()) .zip(self.row_groups_pruned_statistics()) .map(|(a, b)| a + b) } @@ -228,7 +249,13 @@ impl TestOutput { /// The number of row pages pruned fn row_pages_pruned(&self) -> Option { self.pruning_metric("page_index_rows_pruned") - .map(|(pruned, _matched)| pruned) + .map(|pm| pm.total_pruned()) + } + + /// The number of row groups pruned by limit pruning + fn limit_pruned_row_groups(&self) -> Option { + self.pruning_metric("limit_pruned_row_groups") + .map(|pm| pm.total_pruned()) } fn description(&self) -> String { @@ -247,6 +274,23 @@ impl ContextWithParquet { Self::with_config(scenario, unit, SessionConfig::new(), None, None).await } + /// Set custom schema and batches for the test + pub async fn with_custom_data( + scenario: Scenario, + unit: Unit, + schema: Arc, + batches: Vec, + ) -> Self { + Self::with_config( + scenario, + unit, + SessionConfig::new(), + Some(schema), + Some(batches), + ) + .await + } + // Set custom schema and batches for the test /* pub async fn with_custom_data( @@ -270,7 +314,7 @@ impl ContextWithParquet { scenario: Scenario, unit: Unit, mut config: SessionConfig, - custom_schema: Option>, + custom_schema: Option, custom_batches: Option>, ) -> Self { // Use a single partition for deterministic results no matter how many CPUs the host has @@ -1109,7 +1153,7 @@ fn create_data_batch(scenario: Scenario) -> Vec { async fn make_test_file_rg( scenario: Scenario, row_per_group: usize, - custom_schema: Option>, + custom_schema: Option, custom_batches: Option>, ) -> NamedTempFile { let mut output_file = tempfile::Builder::new() diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs b/datafusion/core/tests/parquet/row_group_pruning.rs index c5e2a4c917b0d..360d212f69e65 100644 --- a/datafusion/core/tests/parquet/row_group_pruning.rs +++ b/datafusion/core/tests/parquet/row_group_pruning.rs @@ -18,8 +18,12 @@ //! This file contains an end to end test of parquet pruning. It writes //! data into a parquet file and then verifies row groups are pruned as //! expected. +use std::sync::Arc; + +use arrow::array::{ArrayRef, Int32Array, RecordBatch}; +use arrow_schema::{DataType, Field, Schema}; use datafusion::prelude::SessionConfig; -use datafusion_common::ScalarValue; +use datafusion_common::{DataFusionError, ScalarValue}; use itertools::Itertools; use crate::parquet::Unit::RowGroup; @@ -30,12 +34,12 @@ struct RowGroupPruningTest { query: String, expected_errors: Option, expected_row_group_matched_by_statistics: Option, - // expected_row_group_fully_matched_by_statistics: Option, + expected_row_group_fully_matched_by_statistics: Option, expected_row_group_pruned_by_statistics: Option, expected_files_pruned_by_statistics: Option, expected_row_group_matched_by_bloom_filter: Option, expected_row_group_pruned_by_bloom_filter: Option, - // expected_limit_pruned_row_groups: Option, + expected_limit_pruned_row_groups: Option, expected_rows: usize, } impl RowGroupPruningTest { @@ -47,11 +51,11 @@ impl RowGroupPruningTest { expected_errors: None, expected_row_group_matched_by_statistics: None, expected_row_group_pruned_by_statistics: None, - // expected_row_group_fully_matched_by_statistics: None, + expected_row_group_fully_matched_by_statistics: None, expected_files_pruned_by_statistics: None, expected_row_group_matched_by_bloom_filter: None, expected_row_group_pruned_by_bloom_filter: None, - // expected_limit_pruned_row_groups: None, + expected_limit_pruned_row_groups: None, expected_rows: 0, } } @@ -81,7 +85,6 @@ impl RowGroupPruningTest { } // Set the expected fully matched row groups by statistics - /* fn with_fully_matched_by_stats( mut self, fully_matched_by_stats: Option, @@ -90,12 +93,6 @@ impl RowGroupPruningTest { self } - fn with_limit_pruned_row_groups(mut self, pruned_by_limit: Option) -> Self { - self.expected_limit_pruned_row_groups = pruned_by_limit; - self - } - */ - // Set the expected pruned row groups by statistics fn with_pruned_by_stats(mut self, pruned_by_stats: Option) -> Self { self.expected_row_group_pruned_by_statistics = pruned_by_stats; @@ -119,6 +116,11 @@ impl RowGroupPruningTest { self } + fn with_limit_pruned_row_groups(mut self, pruned_by_limit: Option) -> Self { + self.expected_limit_pruned_row_groups = pruned_by_limit; + self + } + /// Set the number of expected rows from the output of this test fn with_expected_rows(mut self, rows: usize) -> Self { self.expected_rows = rows; @@ -155,12 +157,12 @@ impl RowGroupPruningTest { ); let bloom_filter_metrics = output.row_groups_bloom_filter(); assert_eq!( - bloom_filter_metrics.map(|(_pruned, matched)| matched), + bloom_filter_metrics.as_ref().map(|pm| pm.total_matched()), self.expected_row_group_matched_by_bloom_filter, "mismatched row_groups_matched_bloom_filter", ); assert_eq!( - bloom_filter_metrics.map(|(pruned, _matched)| pruned), + bloom_filter_metrics.map(|pm| pm.total_pruned()), self.expected_row_group_pruned_by_bloom_filter, "mismatched row_groups_pruned_bloom_filter", ); @@ -175,6 +177,64 @@ impl RowGroupPruningTest { ); } + // Execute the test with the current configuration + async fn test_row_group_prune_with_custom_data( + self, + schema: Arc, + batches: Vec, + max_row_per_group: usize, + ) { + let output = ContextWithParquet::with_custom_data( + self.scenario, + RowGroup(max_row_per_group), + schema, + batches, + ) + .await + .query(&self.query) + .await; + + println!("{}", output.description()); + assert_eq!( + output.predicate_evaluation_errors(), + self.expected_errors, + "mismatched predicate_evaluation error" + ); + assert_eq!( + output.row_groups_matched_statistics(), + self.expected_row_group_matched_by_statistics, + "mismatched row_groups_matched_statistics", + ); + assert_eq!( + output.row_groups_fully_matched_statistics(), + self.expected_row_group_fully_matched_by_statistics, + "mismatched row_groups_fully_matched_statistics", + ); + assert_eq!( + output.row_groups_pruned_statistics(), + self.expected_row_group_pruned_by_statistics, + "mismatched row_groups_pruned_statistics", + ); + assert_eq!( + output.files_ranges_pruned_statistics(), + self.expected_files_pruned_by_statistics, + "mismatched files_ranges_pruned_statistics", + ); + assert_eq!( + output.limit_pruned_row_groups(), + self.expected_limit_pruned_row_groups, + "mismatched limit_pruned_row_groups", + ); + assert_eq!( + output.result_rows, + self.expected_rows, + "Expected {} rows, got {}: {}", + output.result_rows, + self.expected_rows, + output.description(), + ); + } + // Execute the test with the current configuration /* async fn test_row_group_prune_with_custom_data( @@ -1723,7 +1783,6 @@ async fn test_bloom_filter_decimal_dict() { .await; } -/* // Helper function to create a batch with a single Int32 column. fn make_i32_batch( name: &str, @@ -1950,7 +2009,7 @@ async fn test_limit_pruning_exceeds_fully_matched() -> datafusion_common::error: .with_scenario(Scenario::Int) .with_query(query) .with_expected_errors(Some(0)) - .with_expected_rows(10) // Total: 1 + 3 + 4 + 1 = 9 (less than limit) + .with_expected_rows(10) // Total: 1 + 4 + 4 + 1 = 10 .with_pruned_files(Some(0)) .with_matched_by_stats(Some(4)) // RG0,1,2,3 matched .with_fully_matched_by_stats(Some(2)) @@ -1958,7 +2017,5 @@ async fn test_limit_pruning_exceeds_fully_matched() -> datafusion_common::error: .with_limit_pruned_row_groups(Some(0)) // No limit pruning since we need all RGs .test_row_group_prune_with_custom_data(schema, batches, 4) .await; - Ok(()) } -*/ diff --git a/datafusion/datasource-parquet/src/metrics.rs b/datafusion/datasource-parquet/src/metrics.rs index c45d234f3b512..db0673651f6e3 100644 --- a/datafusion/datasource-parquet/src/metrics.rs +++ b/datafusion/datasource-parquet/src/metrics.rs @@ -44,18 +44,14 @@ pub struct ParquetFileMetrics { pub files_ranges_pruned_statistics: PruningMetrics, /// Number of times the predicate could not be evaluated pub predicate_evaluation_errors: Count, - /// Number of row groups whose bloom filters were checked, tracked with matched/pruned counts + /// Number of row groups pruned by bloom filters pub row_groups_pruned_bloom_filter: PruningMetrics, - /// Number of row groups whose statistics were checked, tracked with matched/pruned counts + /// Number of row groups pruned due to limit pruning. + pub limit_pruned_row_groups: PruningMetrics, + /// Number of row groups pruned by statistics pub row_groups_pruned_statistics: PruningMetrics, /// Number of row groups whose bloom filters were checked and matched (not pruned) pub row_groups_matched_bloom_filter: Count, - /// Number of row groups pruned due to limit pruning. - pub limit_pruned_row_groups: Count, - /// Number of row groups whose statistics were checked and fully matched - pub row_groups_fully_matched_statistics: Count, - /// Number of row groups whose statistics were checked and matched (not pruned) - pub row_groups_matched_statistics: Count, /// Total number of bytes scanned pub bytes_scanned: Count, /// Total rows filtered out by predicates pushed into parquet scan @@ -104,15 +100,8 @@ impl ParquetFileMetrics { let limit_pruned_row_groups = MetricBuilder::new(metrics) .with_new_label("filename", filename.to_string()) - .counter("limit_pruned_row_groups", partition); - - let row_groups_fully_matched_statistics = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) - .counter("row_groups_fully_matched_statistics", partition); - - let row_groups_matched_statistics = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) - .counter("row_groups_matched_statistics", partition); + .with_type(MetricType::SUMMARY) + .pruning_metrics("limit_pruned_row_groups", partition); let row_groups_pruned_statistics = MetricBuilder::new(metrics) .with_new_label("filename", filename.to_string()) @@ -179,8 +168,6 @@ impl ParquetFileMetrics { predicate_evaluation_errors, row_groups_matched_bloom_filter, row_groups_pruned_bloom_filter, - row_groups_fully_matched_statistics, - row_groups_matched_statistics, row_groups_pruned_statistics, limit_pruned_row_groups, bytes_scanned, diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 5701b22ccbcc5..a3cb112058939 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -61,13 +61,15 @@ use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader}; /// Implements [`FileOpener`] for a parquet file pub(super) struct ParquetOpener { /// Execution partition index - pub partition_index: usize, + pub(crate) partition_index: usize, /// Column indexes in `table_schema` needed by the query pub projection: Arc<[usize]>, /// Target number of rows in each output RecordBatch pub batch_size: usize, /// Optional limit on the number of rows to read - pub limit: Option, + pub(crate) limit: Option, + /// If should keep the output rows in order + pub preserve_order: bool, /// Optional predicate to apply during the scan pub predicate: Option>, /// Schema of the output table without partition columns. @@ -99,8 +101,6 @@ pub(super) struct ParquetOpener { pub enable_row_group_stats_pruning: bool, /// Coerce INT96 timestamps to specific TimeUnit pub coerce_int96: Option, - /// Should limit pruning be applied - pub enable_limit_pruning: bool, /// Optional parquet FileDecryptionProperties #[cfg(feature = "parquet_encryption")] pub file_decryption_properties: Option>, @@ -153,7 +153,6 @@ impl FileOpener for ParquetOpener { let enable_bloom_filter = self.enable_bloom_filter; let enable_row_group_stats_pruning = self.enable_row_group_stats_pruning; let limit = self.limit; - let enable_limit_pruning = self.enable_limit_pruning; let predicate_creation_errors = MetricBuilder::new(&self.metrics) .global_counter("num_predicate_creation_errors"); @@ -165,6 +164,7 @@ impl FileOpener for ParquetOpener { #[cfg(feature = "parquet_encryption")] let encryption_context = self.get_encryption_context(); let max_predicate_cache_size = self.max_predicate_cache_size; + let preserve_order = self.preserve_order; Ok(Box::pin(async move { #[cfg(feature = "parquet_encryption")] @@ -410,14 +410,13 @@ impl FileOpener for ParquetOpener { .add_matched(n_remaining_row_groups); } - // Prune by limit - if enable_limit_pruning { - if let Some(limit) = limit { - row_groups.prune_by_limit(limit, rg_metadata, &file_metrics); - } + // Prune by limit if limit is set and limit order is not sensitive + if let (Some(limit), false) = (limit, preserve_order) { + row_groups.prune_by_limit(limit, rg_metadata, &file_metrics); } let mut access_plan = row_groups.build(); + // page index pruning: if all data on individual pages can // be ruled using page metadata, rows from other columns // with that range can be skipped as well @@ -886,6 +885,7 @@ mod test { projection: Arc::new([0, 1]), batch_size: 1024, limit: None, + preserve_order: false, predicate: Some(predicate), logical_file_schema: schema.clone(), metadata_size_hint: None, @@ -898,7 +898,6 @@ mod test { reorder_filters: false, enable_page_index: false, enable_bloom_filter: false, - enable_limit_pruning: false, schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: true, coerce_int96: None, @@ -956,6 +955,7 @@ mod test { projection: Arc::new([0]), batch_size: 1024, limit: None, + preserve_order: false, predicate: Some(predicate), logical_file_schema: file_schema.clone(), metadata_size_hint: None, @@ -972,7 +972,6 @@ mod test { reorder_filters: false, enable_page_index: false, enable_bloom_filter: false, - enable_limit_pruning: false, schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: true, coerce_int96: None, @@ -1046,6 +1045,7 @@ mod test { projection: Arc::new([0]), batch_size: 1024, limit: None, + preserve_order: false, predicate: Some(predicate), logical_file_schema: file_schema.clone(), metadata_size_hint: None, @@ -1062,7 +1062,6 @@ mod test { reorder_filters: false, enable_page_index: false, enable_bloom_filter: false, - enable_limit_pruning: false, schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: true, coerce_int96: None, @@ -1139,6 +1138,7 @@ mod test { projection: Arc::new([0]), batch_size: 1024, limit: None, + preserve_order: false, predicate: Some(predicate), logical_file_schema: file_schema.clone(), metadata_size_hint: None, @@ -1155,7 +1155,6 @@ mod test { reorder_filters: true, enable_page_index: false, enable_bloom_filter: false, - enable_limit_pruning: false, schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: false, // note that this is false! coerce_int96: None, @@ -1232,6 +1231,7 @@ mod test { projection: Arc::new([0]), batch_size: 1024, limit: None, + preserve_order: false, predicate: Some(predicate), logical_file_schema: file_schema.clone(), metadata_size_hint: None, @@ -1248,7 +1248,6 @@ mod test { reorder_filters: false, enable_page_index: false, enable_bloom_filter: false, - enable_limit_pruning: false, schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: true, coerce_int96: None, @@ -1387,6 +1386,7 @@ mod test { projection: Arc::new([0, 1]), batch_size: 1024, limit: None, + preserve_order: false, predicate: Some(predicate), logical_file_schema: Arc::clone(&table_schema), metadata_size_hint: None, @@ -1399,7 +1399,6 @@ mod test { reorder_filters: false, enable_page_index: false, enable_bloom_filter: false, - enable_limit_pruning: false, schema_adapter_factory: Arc::new(CustomSchemaAdapterFactory), enable_row_group_stats_pruning: false, coerce_int96: None, diff --git a/datafusion/datasource-parquet/src/row_group_filter.rs b/datafusion/datasource-parquet/src/row_group_filter.rs index f0d483ba35b10..6004da8ae65da 100644 --- a/datafusion/datasource-parquet/src/row_group_filter.rs +++ b/datafusion/datasource-parquet/src/row_group_filter.rs @@ -48,7 +48,8 @@ use parquet::{ pub struct RowGroupAccessPlanFilter { /// which row groups should be accessed access_plan: ParquetAccessPlan, - /// which row groups are fully contained within the pruning predicate + /// Row groups where ALL rows are known to match the pruning predicate + /// (the predicate does not filter any rows) is_fully_matched: Vec, } @@ -84,6 +85,93 @@ impl RowGroupAccessPlanFilter { } /// Prunes the access plan based on the limit and fully contained row groups. + /// + /// The pruning works by leveraging the concept of fully matched row groups. Consider a query like: + /// `WHERE species LIKE 'Alpine%' AND s >= 50 LIMIT N` + /// + /// After initial filtering, row groups can be classified into three states: + /// + /// 1. Not Matching / Pruned + /// 2. Partially Matching (Row Group/Page contains some matches) + /// 3. Fully Matching (Entire range is within predicate) + /// + /// +-----------------------------------------------------------------------+ + /// | NOT MATCHING | + /// | Row group 1 | + /// | +-----------------------------------+-----------------------------+ | + /// | | SPECIES | S | | + /// | +-----------------------------------+-----------------------------+ | + /// | | Snow Vole | 7 | | + /// | | Brown Bear | 133 ✅ | | + /// | | Gray Wolf | 82 ✅ | | + /// | +-----------------------------------+-----------------------------+ | + /// +-----------------------------------------------------------------------+ + /// + /// +---------------------------------------------------------------------------+ + /// | PARTIALLY MATCHING | + /// | | + /// | Row group 2 Row group 4 | + /// | +------------------+--------------+ +------------------+----------+ | + /// | | SPECIES | S | | SPECIES | S | | + /// | +------------------+--------------+ +------------------+----------+ | + /// | | Lynx | 71 ✅ | | Europ. Mole | 4 | | + /// | | Red Fox | 40 | | Polecat | 16 | | + /// | | Alpine Bat ✅ | 6 | | Alpine Ibex ✅ | 97 ✅ | | + /// | +------------------+--------------+ +------------------+----------+ | + /// +---------------------------------------------------------------------------+ + /// + /// +-----------------------------------------------------------------------+ + /// | FULLY MATCHING | + /// | Row group 3 | + /// | +-----------------------------------+-----------------------------+ | + /// | | SPECIES | S | | + /// | +-----------------------------------+-----------------------------+ | + /// | | Alpine Ibex ✅ | 101 ✅ | | + /// | | Alpine Goat ✅ | 76 ✅ | | + /// | | Alpine Sheep ✅ | 83 ✅ | | + /// | +-----------------------------------+-----------------------------+ | + /// +-----------------------------------------------------------------------+ + /// + /// ### Identification of Fully Matching Row Groups + /// + /// DataFusion identifies row groups where ALL rows satisfy the filter by inverting the + /// predicate and checking if statistics prove the inverted version is false for the group. + /// + /// For example, prefix matches like `species LIKE 'Alpine%'` are pruned using ranges: + /// 1. Candidate Range: `species >= 'Alpine' AND species < 'Alpinf'` + /// 2. Inverted Condition (to prove full match): `species < 'Alpine' OR species >= 'Alpinf'` + /// 3. Statistical Evaluation (check if any row *could* satisfy the inverted condition): + /// `min < 'Alpine' OR max >= 'Alpinf'` + /// + /// If this evaluation is **false**, it proves no row can fail the original filter, + /// so the row group is **FULLY MATCHING**. + /// + /// ### Impact of Statistics Truncation + /// + /// The precision of pruning depends on the metadata quality. Truncated statistics + /// may prevent the system from proving a full match. + /// + /// **Example**: `WHERE species LIKE 'Alpine%'` (Target range: `['Alpine', 'Alpinf')`) + /// + /// | Truncation Length | min / max | Inverted Evaluation | Status | + /// |-------------------|---------------------|---------------------------------------------------------------------|------------------------| + /// | **Length 6** | `Alpine` / `Alpine` | `"Alpine" < "Alpine" (F) OR "Alpine" >= "Alpinf" (F)` -> **false** | **FULLY MATCHING** | + /// | **Length 3** | `Alp` / `Alq` | `"Alp" < "Alpine" (T) OR "Alq" >= "Alpinf" (T)` -> **true** | **PARTIALLY MATCHING** | + /// + /// Even though Row Group 3 only contains matching rows, truncation to length 3 makes + /// the statistics `[Alp, Alq]` too broad to prove it (they could include "Alpha"). + /// The system must conservatively scan the group. + /// + /// Without limit pruning: Scan Partition 2 → Partition 3 → Partition 4 (until limit reached) + /// With limit pruning: If Partition 3 contains enough rows to satisfy the limit, + /// skip Partitions 2 and 4 entirely and go directly to Partition 3. + /// + /// This optimization is particularly effective when: + /// - The limit is small relative to the total dataset size + /// - There are row groups that are fully matched by the filter predicates + /// - The fully matched row groups contain sufficient rows to satisfy the limit + /// + /// For more information, see the [paper](https://arxiv.org/pdf/2504.11540)'s "Pruning for LIMIT Queries" part pub fn prune_by_limit( &mut self, limit: usize, @@ -93,7 +181,8 @@ impl RowGroupAccessPlanFilter { let mut fully_matched_row_group_indexes: Vec = Vec::new(); let mut fully_matched_rows_count: usize = 0; - // Iterate through the currently accessible row groups + // Iterate through the currently accessible row groups and try to + // find a set of matching row groups that can satisfy the limit for &idx in self.access_plan.row_group_indexes().iter() { if self.is_fully_matched[idx] { let row_group_row_count = rg_metadata[idx].num_rows() as usize; @@ -105,13 +194,15 @@ impl RowGroupAccessPlanFilter { } } + // If we can satisfy the limit with fully matching row groups, + // rewrite the plan to do so if fully_matched_rows_count >= limit { let original_num_accessible_row_groups = self.access_plan.row_group_indexes().len(); let new_num_accessible_row_groups = fully_matched_row_group_indexes.len(); let pruned_count = original_num_accessible_row_groups .saturating_sub(new_num_accessible_row_groups); - metrics.limit_pruned_row_groups.add(pruned_count); + metrics.limit_pruned_row_groups.add_pruned(pruned_count); let mut new_access_plan = ParquetAccessPlan::new_none(rg_metadata.len()); for &idx in &fully_matched_row_group_indexes { @@ -197,45 +288,15 @@ impl RowGroupAccessPlanFilter { } } - // Note: this part of code shouldn't be expensive with a limited number of row groups - // If we do find it's expensive, we can consider optimizing it further. - if !fully_contained_candidates_original_idx.is_empty() { - // Use NotExpr to create the inverted predicate - let inverted_expr = - Arc::new(NotExpr::new(Arc::clone(predicate.orig_expr()))); - // Simplify the NOT expression (e.g., NOT(c1 = 0) -> c1 != 0) - // before building the pruning predicate - let mut simplifier = PhysicalExprSimplifier::new(arrow_schema); - let inverted_expr = simplifier.simplify(inverted_expr).unwrap(); - if let Ok(inverted_predicate) = PruningPredicate::try_new( - inverted_expr, - Arc::clone(predicate.schema()), - ) { - let inverted_pruning_stats = RowGroupPruningStatistics { - parquet_schema, - row_group_metadatas: fully_contained_candidates_original_idx - .iter() - .map(|&i| &groups[i]) - .collect::>(), - arrow_schema, - }; - - if let Ok(inverted_values) = - inverted_predicate.prune(&inverted_pruning_stats) - { - for (i, &original_row_group_idx) in - fully_contained_candidates_original_idx.iter().enumerate() - { - // If the inverted predicate *also* prunes this row group (meaning inverted_values[i] is false), - // it implies that *all* rows in this group satisfy the original predicate. - if !inverted_values[i] { - self.is_fully_matched[original_row_group_idx] = true; - metrics.row_groups_fully_matched_statistics.add(1); - } - } - } - } - } + // Check if any of the matched row groups are fully contained by the predicate + self.identify_fully_matched_row_groups( + &fully_contained_candidates_original_idx, + arrow_schema, + parquet_schema, + groups, + predicate, + metrics, + ); } // stats filter array could not be built, so we can't prune Err(e) => { @@ -245,6 +306,68 @@ impl RowGroupAccessPlanFilter { } } + /// Identifies row groups that are fully matched by the predicate. + /// + /// This optimization checks whether all rows in a row group satisfy the predicate + /// by inverting the predicate and checking if it prunes the row group. If the + /// inverted predicate prunes a row group, it means no rows match the inverted + /// predicate, which implies all rows match the original predicate. + /// + /// Note: This optimization is relatively inexpensive for a limited number of row groups. + fn identify_fully_matched_row_groups( + &mut self, + candidate_row_group_indices: &[usize], + arrow_schema: &Schema, + parquet_schema: &SchemaDescriptor, + groups: &[RowGroupMetaData], + predicate: &PruningPredicate, + metrics: &ParquetFileMetrics, + ) { + if candidate_row_group_indices.is_empty() { + return; + } + + // Use NotExpr to create the inverted predicate + let inverted_expr = Arc::new(NotExpr::new(Arc::clone(predicate.orig_expr()))); + + // Simplify the NOT expression (e.g., NOT(c1 = 0) -> c1 != 0) + // before building the pruning predicate + let mut simplifier = PhysicalExprSimplifier::new(arrow_schema); + let Ok(inverted_expr) = simplifier.simplify(inverted_expr) else { + return; + }; + + let Ok(inverted_predicate) = + PruningPredicate::try_new(inverted_expr, Arc::clone(predicate.schema())) + else { + return; + }; + + let inverted_pruning_stats = RowGroupPruningStatistics { + parquet_schema, + row_group_metadatas: candidate_row_group_indices + .iter() + .map(|&i| &groups[i]) + .collect::>(), + arrow_schema, + }; + + let Ok(inverted_values) = inverted_predicate.prune(&inverted_pruning_stats) + else { + return; + }; + + for (i, &original_row_group_idx) in candidate_row_group_indices.iter().enumerate() + { + // If the inverted predicate *also* prunes this row group (meaning inverted_values[i] is false), + // it implies that *all* rows in this group satisfy the original predicate. + if !inverted_values[i] { + self.is_fully_matched[original_row_group_idx] = true; + metrics.row_groups_pruned_statistics.add_fully_matched(1); + } + } + } + /// Prune remaining row groups using available bloom filters and the /// [`PruningPredicate`]. /// diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 339d36b57cc35..ae1cab83ba470 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -564,6 +564,7 @@ impl FileSource for ParquetSource { .batch_size .expect("Batch size must set before creating ParquetOpener"), limit: base_config.limit, + preserve_order: base_config.preserve_order, predicate: self.predicate.clone(), logical_file_schema: Arc::clone(base_config.file_schema()), partition_fields: base_config.table_partition_cols().clone(), @@ -575,7 +576,6 @@ impl FileSource for ParquetSource { enable_page_index: self.enable_page_index(), enable_bloom_filter: self.bloom_filter_on_read(), enable_row_group_stats_pruning: self.table_parquet_options.global.pruning, - enable_limit_pruning: base_config.limit_pruning, schema_adapter_factory, coerce_int96, #[cfg(feature = "parquet_encryption")] diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 02d9762a4a396..20b4370772617 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -187,6 +187,11 @@ pub struct FileScanConfig { /// The maximum number of records to read from this plan. If `None`, /// all records after filtering are returned. pub limit: Option, + /// Whether the scan's limit is order sensitive + /// When `true`, files must be read in the exact order specified to produce + /// correct results (e.g., for `ORDER BY ... LIMIT` queries). When `false`, + /// DataFusion may reorder file processing for optimization without affecting correctness. + pub preserve_order: bool, /// All equivalent lexicographical orderings that describe the schema. pub output_ordering: Vec, /// File compression type @@ -268,8 +273,9 @@ pub struct FileScanConfigBuilder { /// [`DataSourceExec`]: crate::source::DataSourceExec table_schema: TableSchema, file_source: Arc, - limit: Option, projection_indices: Option>, + limit: Option, + preserve_order: bool, constraints: Option, file_groups: Vec, statistics: Option, @@ -305,6 +311,7 @@ impl FileScanConfigBuilder { new_lines_in_values: None, limit: None, projection_indices: None, + preserve_order: false, constraints: None, batch_size: None, expr_adapter_factory: None, @@ -319,6 +326,15 @@ impl FileScanConfigBuilder { self } + /// Set whether the limit should be order-sensitive. + /// When `true`, files must be read in the exact order specified to produce + /// correct results (e.g., for `ORDER BY ... LIMIT` queries). When `false`, + /// DataFusion may reorder file processing for optimization without affecting correctness. + pub fn with_preserve_order(mut self, order_sensitive: bool) -> Self { + self.preserve_order = order_sensitive; + self + } + /// Set the file source for scanning files. /// /// This method allows you to change the file source implementation (e.g. ParquetSource, CsvSource, etc.) @@ -466,6 +482,7 @@ impl FileScanConfigBuilder { file_source, limit, projection_indices, + preserve_order, constraints, file_groups, statistics, @@ -494,12 +511,16 @@ impl FileScanConfigBuilder { ProjectionExprs::from_indices(&indices, table_schema.table_schema()) }); + // If there is an output ordering, we should preserve it. + let preserve_order = preserve_order || !output_ordering.is_empty(); + FileScanConfig { object_store_url, table_schema, file_source, limit, projection_exprs, + preserve_order, constraints, file_groups, output_ordering, @@ -514,6 +535,7 @@ impl FileScanConfigBuilder { impl From for FileScanConfigBuilder { fn from(config: FileScanConfig) -> Self { + let projection_indices = config.projection_indices(); Self { object_store_url: config.object_store_url, table_schema: config.table_schema, @@ -524,9 +546,8 @@ impl From for FileScanConfigBuilder { file_compression_type: Some(config.file_compression_type), new_lines_in_values: Some(config.new_lines_in_values), limit: config.limit, - projection_indices: config - .projection_exprs - .map(|p| p.ordered_column_indices()), + projection_indices: Some(projection_indices), + preserve_order: config.preserve_order, constraints: Some(config.constraints), batch_size: config.batch_size, expr_adapter_factory: config.expr_adapter_factory, @@ -757,6 +778,18 @@ impl DataSource for FileScanConfig { } } } + + fn with_preserve_order(&self, preserve_order: bool) -> Option> { + if self.preserve_order == preserve_order { + return Some(Arc::new(self.clone())); + } + + let new_config = FileScanConfig { + preserve_order, + ..self.clone() + }; + Some(Arc::new(new_config)) + } } impl FileScanConfig { diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 7169997bd0316..9227647bd07c8 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -189,6 +189,11 @@ pub trait DataSource: Send + Sync + Debug { vec![PushedDown::No; filters.len()], )) } + + /// Returns a variant of this `DataSource` that is aware of order-sensitivity. + fn with_preserve_order(&self, _preserve_order: bool) -> Option> { + None + } } /// [`ExecutionPlan`] that reads one or more files @@ -371,6 +376,18 @@ impl ExecutionPlan for DataSourceExec { }), } } + + fn with_preserve_order( + &self, + preserve_order: bool, + ) -> Option> { + self.data_source + .with_preserve_order(preserve_order) + .map(|new_data_source| { + Arc::new(self.clone().with_data_source(new_data_source)) + as Arc + }) + } } impl DataSourceExec { diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 5db71417bc8fd..59a9d703f670a 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -268,15 +268,10 @@ fn optimize_projections( Some(projection) => indices.into_mapped_indices(|idx| projection[idx]), None => indices.into_inner(), }; - return TableScan::try_new( - table_name, - source, - Some(projection), - filters, - fetch, - ) - .map(LogicalPlan::TableScan) - .map(Transformed::yes); + let new_scan = + TableScan::try_new(table_name, source, Some(projection), filters, fetch)?; + + return Ok(Transformed::yes(LogicalPlan::TableScan(new_scan))); } // Other node types are handled below _ => {} diff --git a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs index 28d187bbf8930..a42abad1dcc1d 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs @@ -584,11 +584,17 @@ fn analyze_immediate_sort_removal( // Remove the sort: node.children = node.children.swap_remove(0).children; if let Some(fetch) = sort_exec.fetch() { + let required_ordering = sort_exec.properties().output_ordering().cloned(); // If the sort has a fetch, we need to add a limit: if properties.output_partitioning().partition_count() == 1 { - Arc::new(GlobalLimitExec::new(Arc::clone(sort_input), 0, Some(fetch))) + let mut global_limit = + GlobalLimitExec::new(Arc::clone(sort_input), 0, Some(fetch)); + global_limit.set_required_ordering(required_ordering); + Arc::new(global_limit) } else { - Arc::new(LocalLimitExec::new(Arc::clone(sort_input), fetch)) + let mut local_limit = LocalLimitExec::new(Arc::clone(sort_input), fetch); + local_limit.set_required_ordering(required_ordering); + Arc::new(local_limit) } } else { Arc::clone(sort_input) diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs b/datafusion/physical-optimizer/src/limit_pushdown.rs index 7469c3af9344c..e0079578a35db 100644 --- a/datafusion/physical-optimizer/src/limit_pushdown.rs +++ b/datafusion/physical-optimizer/src/limit_pushdown.rs @@ -50,6 +50,7 @@ pub struct GlobalRequirements { fetch: Option, skip: usize, satisfied: bool, + preserve_order: bool, } impl LimitPushdown { @@ -69,6 +70,7 @@ impl PhysicalOptimizerRule for LimitPushdown { fetch: None, skip: 0, satisfied: false, + preserve_order: false, }; pushdown_limits(plan, global_state) } @@ -111,6 +113,13 @@ impl LimitExec { Self::Local(_) => 0, } } + + fn preserve_order(&self) -> bool { + match self { + Self::Global(global) => global.required_ordering().is_some(), + Self::Local(local) => local.required_ordering().is_some(), + } + } } impl From for Arc { @@ -145,6 +154,7 @@ pub fn pushdown_limit_helper( ); global_state.skip = skip; global_state.fetch = fetch; + global_state.preserve_order = limit_exec.preserve_order(); // Now the global state has the most recent information, we can remove // the `LimitExec` plan. We will decide later if we should add it again @@ -241,17 +251,28 @@ pub fn pushdown_limit_helper( let maybe_fetchable = pushdown_plan.with_fetch(skip_and_fetch); if global_state.satisfied { if let Some(plan_with_fetch) = maybe_fetchable { - Ok((Transformed::yes(plan_with_fetch), global_state)) + let plan_with_preserve_order = plan_with_fetch + .with_preserve_order(global_state.preserve_order) + .unwrap_or(plan_with_fetch); + Ok((Transformed::yes(plan_with_preserve_order), global_state)) } else { Ok((Transformed::no(pushdown_plan), global_state)) } } else { global_state.satisfied = true; pushdown_plan = if let Some(plan_with_fetch) = maybe_fetchable { + let plan_with_preserve_order = plan_with_fetch + .with_preserve_order(global_state.preserve_order) + .unwrap_or(plan_with_fetch); + if global_skip > 0 { - add_global_limit(plan_with_fetch, global_skip, Some(global_fetch)) + add_global_limit( + plan_with_preserve_order, + global_skip, + Some(global_fetch), + ) } else { - plan_with_fetch + plan_with_preserve_order } } else { add_limit(pushdown_plan, global_skip, global_fetch) diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index 31e5a7369cab3..ebf97cee587c4 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -284,6 +284,19 @@ impl ExecutionPlan for CoalescePartitionsExec { })) } + fn with_preserve_order( + &self, + preserve_order: bool, + ) -> Option> { + self.input + .with_preserve_order(preserve_order) + .and_then(|new_input| { + Arc::new(self.clone()) + .with_new_children(vec![new_input]) + .ok() + }) + } + fn gather_filters_for_pushdown( &self, _phase: FilterPushdownPhase, diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 340e5662cebcb..35866d1cfe8db 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -688,6 +688,19 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { ) -> Option> { None } + + /// Returns a variant of this `ExecutionPlan` that is aware of order-sensitivity. + /// + /// This is used to signal to data sources that the output ordering must be + /// preserved, even if it might be more efficient to ignore it (e.g. by + /// skipping some row groups in Parquet). + /// + fn with_preserve_order( + &self, + _preserve_order: bool, + ) -> Option> { + None + } } /// [`ExecutionPlan`] Invariant Level diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 2ee7d283d8ad8..c86dd2ff166e2 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -570,6 +570,19 @@ impl ExecutionPlan for FilterExec { updated_node, }) } + + fn with_preserve_order( + &self, + preserve_order: bool, + ) -> Option> { + self.input + .with_preserve_order(preserve_order) + .and_then(|new_input| { + Arc::new(self.clone()) + .with_new_children(vec![new_input]) + .ok() + }) + } } impl EmbeddedProjection for FilterExec { diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index 5bdd697b3555b..fecb70be32133 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -35,6 +35,7 @@ use arrow::record_batch::RecordBatch; use datafusion_common::{internal_err, Result}; use datafusion_execution::TaskContext; +use datafusion_physical_expr::LexOrdering; use futures::stream::{Stream, StreamExt}; use log::trace; @@ -51,6 +52,9 @@ pub struct GlobalLimitExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, cache: PlanProperties, + /// Does the limit have to preserve the order of its input, and if so what is it? + /// Some optimizations may reorder the input if no particular sort is required + required_ordering: Option, } impl GlobalLimitExec { @@ -63,6 +67,7 @@ impl GlobalLimitExec { fetch, metrics: ExecutionPlanMetricsSet::new(), cache, + required_ordering: None, } } @@ -91,6 +96,16 @@ impl GlobalLimitExec { Boundedness::Bounded, ) } + + /// Get the required ordering from limit + pub fn required_ordering(&self) -> &Option { + &self.required_ordering + } + + /// Set the required ordering for limit + pub fn set_required_ordering(&mut self, required_ordering: Option) { + self.required_ordering = required_ordering; + } } impl DisplayAs for GlobalLimitExec { @@ -230,6 +245,9 @@ pub struct LocalLimitExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, cache: PlanProperties, + /// If the child plan is a sort node, after the sort node is removed during + /// physical optimization, we should add the required ordering to the limit node + required_ordering: Option, } impl LocalLimitExec { @@ -241,6 +259,7 @@ impl LocalLimitExec { fetch, metrics: ExecutionPlanMetricsSet::new(), cache, + required_ordering: None, } } @@ -264,6 +283,16 @@ impl LocalLimitExec { Boundedness::Bounded, ) } + + /// Get the required ordering from limit + pub fn required_ordering(&self) -> &Option { + &self.required_ordering + } + + /// Set the required ordering for limit + pub fn set_required_ordering(&mut self, required_ordering: Option) { + self.required_ordering = required_ordering; + } } impl DisplayAs for LocalLimitExec { diff --git a/datafusion/physical-plan/src/metrics/value.rs b/datafusion/physical-plan/src/metrics/value.rs index 298d63e5e216a..b66119b86913f 100644 --- a/datafusion/physical-plan/src/metrics/value.rs +++ b/datafusion/physical-plan/src/metrics/value.rs @@ -372,14 +372,23 @@ impl Drop for ScopedTimerGuard<'_> { pub struct PruningMetrics { pruned: Arc, matched: Arc, + fully_matched: Arc, } impl Display for PruningMetrics { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { let matched = self.matched.load(Ordering::Relaxed); let total = self.pruned.load(Ordering::Relaxed) + matched; + let fully_matched = self.fully_matched.load(Ordering::Relaxed); - write!(f, "{total} total → {matched} matched") + if fully_matched != 0 { + write!( + f, + "{total} total → {matched} matched -> {fully_matched} fully matched", + ) + } else { + write!(f, "{total} total → {matched} matched") + } } } @@ -395,6 +404,7 @@ impl PruningMetrics { Self { pruned: Arc::new(AtomicUsize::new(0)), matched: Arc::new(AtomicUsize::new(0)), + fully_matched: Arc::new(AtomicUsize::new(0)), } } @@ -412,6 +422,13 @@ impl PruningMetrics { self.matched.fetch_add(n, Ordering::Relaxed); } + /// Add `n` to the metric's fully matched value + pub fn add_fully_matched(&self, n: usize) { + // relaxed ordering for operations on `value` poses no issues + // we're purely using atomic ops with no associated memory ops + self.fully_matched.fetch_add(n, Ordering::Relaxed); + } + /// Subtract `n` to the metric's matched value. pub fn subtract_matched(&self, n: usize) { // relaxed ordering for operations on `value` poses no issues @@ -428,6 +445,11 @@ impl PruningMetrics { pub fn matched(&self) -> usize { self.matched.load(Ordering::Relaxed) } + + /// Number of items fully matched + pub fn fully_matched(&self) -> usize { + self.fully_matched.load(Ordering::Relaxed) + } } /// Counters tracking ratio metrics (e.g. matched vs total) @@ -842,8 +864,11 @@ impl MetricValue { ) => { let pruned = other_pruning_metrics.pruned.load(Ordering::Relaxed); let matched = other_pruning_metrics.matched.load(Ordering::Relaxed); + let fully_matched = + other_pruning_metrics.fully_matched.load(Ordering::Relaxed); pruning_metrics.add_pruned(pruned); pruning_metrics.add_matched(matched); + pruning_metrics.add_fully_matched(fully_matched); } ( Self::Ratio { ratio_metrics, .. }, diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index cfdaa4e9d9fd4..4fe051dbacb4c 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -346,6 +346,19 @@ impl ExecutionPlan for ProjectionExec { ) -> Result>> { Ok(FilterPushdownPropagation::if_all(child_pushdown_result)) } + + fn with_preserve_order( + &self, + preserve_order: bool, + ) -> Option> { + self.input + .with_preserve_order(preserve_order) + .and_then(|new_input| { + Arc::new(self.clone()) + .with_new_children(vec![new_input]) + .ok() + }) + } } impl ProjectionStream { diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 11f42c8153e08..150f1e60cbbf8 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -245,6 +245,19 @@ impl ExecutionPlan for SortPreservingMergeExec { })) } + fn with_preserve_order( + &self, + preserve_order: bool, + ) -> Option> { + self.input + .with_preserve_order(preserve_order) + .and_then(|new_input| { + Arc::new(self.clone()) + .with_new_children(vec![new_input]) + .ok() + }) + } + fn required_input_distribution(&self) -> Vec { vec![Distribution::UnspecifiedDistribution] } diff --git a/datafusion/sqllogictest/README.md b/datafusion/sqllogictest/README.md index a389ae1ef60e2..8768deee3d87e 100644 --- a/datafusion/sqllogictest/README.md +++ b/datafusion/sqllogictest/README.md @@ -142,6 +142,17 @@ select substr('Andrew Lamb', 1, 6), '|' Andrew | ``` +## Cookbook: Ignoring volatile output + +Sometimes parts of a result change every run (timestamps, counters, etc.). To keep the rest of the snapshot checked in, replace those fragments with the `` marker inside the expected block. During validation the marker acts like a wildcard, so only the surrounding text must match. + +```text +query TT +EXPLAIN ANALYZE SELECT * FROM generate_series(100); +---- +Plan with Metrics LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=0, end=100, batch_size=8192], metrics=[output_rows=101, elapsed_compute=, output_bytes=] +``` + # Reference ## Running tests: Validation Mode diff --git a/datafusion/sqllogictest/src/util.rs b/datafusion/sqllogictest/src/util.rs index 695fe463fa676..721c559ab11ef 100644 --- a/datafusion/sqllogictest/src/util.rs +++ b/datafusion/sqllogictest/src/util.rs @@ -82,6 +82,10 @@ pub fn df_value_validator( actual: &[Vec], expected: &[String], ) -> bool { + // Support ignore marker to skip volatile parts of output. + const IGNORE_MARKER: &str = ""; + let contains_ignore_marker = expected.iter().any(|line| line.contains(IGNORE_MARKER)); + let normalized_expected = expected.iter().map(normalizer).collect::>(); let normalized_actual = actual .iter() @@ -89,6 +93,32 @@ pub fn df_value_validator( .map(|str| str.trim_end().to_string()) .collect_vec(); + // If ignore marker present, perform fragment-based matching on the full snapshot. + if contains_ignore_marker { + let expected_snapshot = normalized_expected.join("\n"); + let actual_snapshot = normalized_actual.join("\n"); + let fragments: Vec<&str> = expected_snapshot.split(IGNORE_MARKER).collect(); + let mut pos = 0; + for (i, frag) in fragments.iter().enumerate() { + if frag.is_empty() { + continue; + } + if let Some(idx) = actual_snapshot[pos..].find(frag) { + // Edge case: The following example is expected to fail + // Actual - 'foo bar baz' + // Expected - 'bar ' + if (i == 0) && (idx != 0) { + return false; + } + + pos += idx + frag.len(); + } else { + return false; + } + } + return true; + } + if log_enabled!(Warn) && normalized_actual != normalized_expected { warn!("df validation failed. actual vs expected:"); for i in 0..normalized_actual.len() { @@ -110,3 +140,20 @@ pub fn df_value_validator( pub fn is_spark_path(relative_path: &Path) -> bool { relative_path.starts_with("spark/") } + +#[cfg(test)] +mod tests { + use super::*; + + // Validation should fail for the below case: + // Actual - 'foo bar baz' + // Expected - 'bar ' + #[test] + fn ignore_marker_does_not_skip_leading_text() { + // Actual snapshot contains unexpected prefix before the expected fragment. + let actual = vec![vec!["foo bar baz".to_string()]]; + let expected = vec!["bar ".to_string()]; + + assert!(!df_value_validator(value_normalizer, &actual, &expected)); + } +} diff --git a/datafusion/sqllogictest/test_files/limit_pruning.slt b/datafusion/sqllogictest/test_files/limit_pruning.slt new file mode 100644 index 0000000000000..90ee528ecf9b9 --- /dev/null +++ b/datafusion/sqllogictest/test_files/limit_pruning.slt @@ -0,0 +1,78 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +statement ok +set datafusion.execution.parquet.pushdown_filters = true; + + +statement ok +CREATE TABLE tracking_data AS VALUES +-- ***** Row Group 0 ***** + ('Anow Vole', 7), + ('Brown Bear', 133), + ('Gray Wolf', 82), +-- ***** Row Group 1 ***** + ('Lynx', 71), + ('Red Fox', 40), + ('Alpine Bat', 6), +-- ***** Row Group 2 ***** + ('Nlpine Ibex', 101), + ('Nlpine Goat', 76), + ('Nlpine Sheep', 83), +-- ***** Row Group 3 ***** + ('Europ. Mole', 4), + ('Polecat', 16), + ('Alpine Ibex', 97); + +statement ok +COPY (SELECT column1 as species, column2 as s FROM tracking_data) +TO 'test_files/scratch/limit_pruning/data.parquet' +STORED AS PARQUET +OPTIONS ( + 'format.max_row_group_size' '3' +); + +statement ok +drop table tracking_data; + +statement ok +CREATE EXTERNAL TABLE tracking_data +STORED AS PARQUET +LOCATION 'test_files/scratch/limit_pruning/data.parquet'; + + +statement ok +set datafusion.explain.analyze_level = summary; + +# row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched +# limit_pruned_row_groups=2 total → 0 matched +query TT +explain analyze select * from tracking_data where species > 'M' AND s >= 50 limit 3; +---- +Plan with Metrics DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]}, projection=[species, s], limit=3, file_type=parquet, predicate=species@0 > M AND s@1 >= 50, pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50, required_guarantees=[], metrics=[output_rows=3, elapsed_compute=, output_bytes=, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_rows_pruned=3 total → 3 matched, limit_pruned_row_groups=2 total → 0 matched, bytes_scanned=, metadata_load_time=] + +# limit_pruned_row_groups=0 total → 0 matched +# because of order by, scan needs to preserve sort, so limit pruning is disabled +query TT +explain analyze select * from tracking_data where species > 'M' AND s >= 50 order by species limit 3; +---- +Plan with Metrics +01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep], metrics=[output_rows=3, elapsed_compute=, output_bytes=] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, elapsed_compute=, output_bytes=, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_rows_pruned=9 total → 9 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=, metadata_load_time=] + +statement ok +drop table tracking_data; diff --git a/datafusion/sqllogictest/test_files/slt_features.slt b/datafusion/sqllogictest/test_files/slt_features.slt new file mode 100644 index 0000000000000..f3d467ea0d93a --- /dev/null +++ b/datafusion/sqllogictest/test_files/slt_features.slt @@ -0,0 +1,74 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# ================================= +# Test sqllogictest runner features +# ================================= + +# -------------------------- +# Test `` marker +# -------------------------- +query T +select 'DataFusion' +---- + + +query T +select 'DataFusion' +---- +Data + +query T +select 'DataFusion' +---- +Fusion + +query T +select 'Apache DataFusion'; +---- +Data + +query T +select 'DataFusion' +---- +DataFusion + +query T +select 'DataFusion' +---- +DataFusion + +query T +select 'DataFusion' +---- +DataFusion + +query I +select * from generate_series(3); +---- +0 +1 + +3 + +query I +select * from generate_series(3); +---- + +1 + + \ No newline at end of file diff --git a/docs/source/user-guide/explain-usage.md b/docs/source/user-guide/explain-usage.md index 5a1184539c034..8fe83163813da 100644 --- a/docs/source/user-guide/explain-usage.md +++ b/docs/source/user-guide/explain-usage.md @@ -228,6 +228,7 @@ When predicate pushdown is enabled, `DataSourceExec` with `ParquetSource` gains - `page_index_rows_pruned`: number of rows evaluated by page index filters. The metric reports both how many rows were considered in total and how many matched (were not pruned). - `row_groups_pruned_bloom_filter`: number of row groups evaluated by Bloom Filters, reporting both total checked groups and groups that matched. - `row_groups_pruned_statistics`: number of row groups evaluated by row-group statistics (min/max), reporting both total checked groups and groups that matched. +- `limit_pruned_row_groups`: number of row groups pruned by the limit. - `pushdown_rows_matched`: rows that were tested by any of the above filters, and passed all of them. - `pushdown_rows_pruned`: rows that were tested by any of the above filters, and did not pass at least one of them. - `predicate_evaluation_errors`: number of times evaluating the filter expression failed (expected to be zero in normal operation)