From 42572e85a026c91b641778998857705d1e6c8d46 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Fri, 16 Jan 2026 14:51:06 +0800 Subject: [PATCH 1/5] Row group limit pruning for row groups that entirely match predicates (#18868) - Part of https://github.com/apache/datafusion/issues/18860 See https://github.com/apache/datafusion/issues/18860#issuecomment-3563442093 1. How to decide if we can do limit pruning without messing up the sql semantics. 2. Add logic to decide if a row group is fully matched, all rows in the row group are matched the predicated. 3. Use the fully matched row groups to return limit rows. Yes No, no new configs, or API change --- datafusion/core/tests/parquet/mod.rs | 93 ++++-- .../core/tests/parquet/row_group_pruning.rs | 97 +++++-- datafusion/datasource-parquet/src/metrics.rs | 17 +- datafusion/datasource-parquet/src/opener.rs | 268 +++++++++++++++++- .../src/row_group_filter.rs | 209 +++++++++++--- datafusion/datasource-parquet/src/source.rs | 1 + datafusion/datasource/src/file_scan_config.rs | 67 ++++- datafusion/datasource/src/source.rs | 49 ++++ .../optimizer/src/optimize_projections/mod.rs | 13 +- .../src/enforce_sorting/mod.rs | 10 +- .../physical-optimizer/src/limit_pushdown.rs | 27 +- .../physical-plan/src/coalesce_partitions.rs | 13 + .../physical-plan/src/execution_plan.rs | 36 +++ datafusion/physical-plan/src/filter.rs | 26 ++ datafusion/physical-plan/src/limit.rs | 29 ++ datafusion/physical-plan/src/metrics/value.rs | 37 ++- datafusion/physical-plan/src/projection.rs | 79 ++++++ .../src/sorts/sort_preserving_merge.rs | 13 + .../sqllogictest/test_files/limit_pruning.slt | 81 ++++++ docs/source/user-guide/explain-usage.md | 1 + 20 files changed, 1040 insertions(+), 126 deletions(-) create mode 100644 datafusion/sqllogictest/test_files/limit_pruning.slt diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 8caeda901b519..39df40aae4106 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -30,6 +30,7 @@ use arrow::{ record_batch::RecordBatch, util::pretty::pretty_format_batches, }; +use arrow_schema::SchemaRef; use chrono::{Datelike, Duration, TimeDelta}; use datafusion::{ datasource::{provider_as_source, TableProvider}, @@ -109,6 +110,26 @@ struct ContextWithParquet { ctx: SessionContext, } +struct PruningMetric { + total_pruned: usize, + total_matched: usize, + total_fully_matched: usize, +} + +impl PruningMetric { + pub fn total_pruned(&self) -> usize { + self.total_pruned + } + + pub fn total_matched(&self) -> usize { + self.total_matched + } + + pub fn total_fully_matched(&self) -> usize { + self.total_fully_matched + } +} + /// The output of running one of the test cases struct TestOutput { /// The input query SQL @@ -126,8 +147,8 @@ struct TestOutput { impl TestOutput { /// retrieve the value of the named metric, if any fn metric_value(&self, metric_name: &str) -> Option { - if let Some((pruned, _matched)) = self.pruning_metric(metric_name) { - return Some(pruned); + if let Some(pm) = self.pruning_metric(metric_name) { + return Some(pm.total_pruned()); } self.parquet_metrics @@ -140,9 +161,10 @@ impl TestOutput { }) } - fn pruning_metric(&self, metric_name: &str) -> Option<(usize, usize)> { + fn pruning_metric(&self, metric_name: &str) -> Option { let mut total_pruned = 0; let mut total_matched = 0; + let mut total_fully_matched = 0; let mut found = false; for metric in self.parquet_metrics.iter() { @@ -151,16 +173,21 @@ impl TestOutput { if let MetricValue::PruningMetrics { pruning_metrics, .. } = metric.value() - { - total_pruned += pruning_metrics.pruned(); - total_matched += pruning_metrics.matched(); - found = true; - } + { + total_pruned += pruning_metrics.pruned(); + total_matched += pruning_metrics.matched(); + total_fully_matched += pruning_metrics.fully_matched(); + + found = true; } } if found { - Some((total_pruned, total_matched)) + Some(PruningMetric { + total_pruned, + total_matched, + total_fully_matched, + }) } else { None } @@ -172,14 +199,20 @@ impl TestOutput { } /// The number of row_groups pruned / matched by bloom filter - fn row_groups_bloom_filter(&self) -> Option<(usize, usize)> { + fn row_groups_bloom_filter(&self) -> Option { self.pruning_metric("row_groups_pruned_bloom_filter") } /// The number of row_groups matched by statistics fn row_groups_matched_statistics(&self) -> Option { self.pruning_metric("row_groups_pruned_statistics") - .map(|(_pruned, matched)| matched) + .map(|pm| pm.total_matched()) + } + + /// The number of row_groups fully matched by statistics + fn row_groups_fully_matched_statistics(&self) -> Option { + self.pruning_metric("row_groups_pruned_statistics") + .map(|pm| pm.total_fully_matched()) } /* @@ -197,14 +230,14 @@ impl TestOutput { /// The number of row_groups pruned by statistics fn row_groups_pruned_statistics(&self) -> Option { self.pruning_metric("row_groups_pruned_statistics") - .map(|(pruned, _matched)| pruned) + .map(|pm| pm.total_pruned()) } /// Metric `files_ranges_pruned_statistics` tracks both pruned and matched count, /// for testing purpose, here it only aggregate the `pruned` count. fn files_ranges_pruned_statistics(&self) -> Option { self.pruning_metric("files_ranges_pruned_statistics") - .map(|(pruned, _matched)| pruned) + .map(|pm| pm.total_pruned()) } /// The number of row_groups matched by bloom filter or statistics @@ -213,14 +246,13 @@ impl TestOutput { /// filter: 7 total -> 3 matched, this function returns 3 for the final matched /// count. fn row_groups_matched(&self) -> Option { - self.row_groups_bloom_filter() - .map(|(_pruned, matched)| matched) + self.row_groups_bloom_filter().map(|pm| pm.total_matched()) } /// The number of row_groups pruned fn row_groups_pruned(&self) -> Option { self.row_groups_bloom_filter() - .map(|(pruned, _matched)| pruned) + .map(|pm| pm.total_pruned()) .zip(self.row_groups_pruned_statistics()) .map(|(a, b)| a + b) } @@ -228,7 +260,13 @@ impl TestOutput { /// The number of row pages pruned fn row_pages_pruned(&self) -> Option { self.pruning_metric("page_index_rows_pruned") - .map(|(pruned, _matched)| pruned) + .map(|pm| pm.total_pruned()) + } + + /// The number of row groups pruned by limit pruning + fn limit_pruned_row_groups(&self) -> Option { + self.pruning_metric("limit_pruned_row_groups") + .map(|pm| pm.total_pruned()) } fn description(&self) -> String { @@ -247,6 +285,23 @@ impl ContextWithParquet { Self::with_config(scenario, unit, SessionConfig::new(), None, None).await } + /// Set custom schema and batches for the test + pub async fn with_custom_data( + scenario: Scenario, + unit: Unit, + schema: Arc, + batches: Vec, + ) -> Self { + Self::with_config( + scenario, + unit, + SessionConfig::new(), + Some(schema), + Some(batches), + ) + .await + } + // Set custom schema and batches for the test /* pub async fn with_custom_data( @@ -270,7 +325,7 @@ impl ContextWithParquet { scenario: Scenario, unit: Unit, mut config: SessionConfig, - custom_schema: Option>, + custom_schema: Option, custom_batches: Option>, ) -> Self { // Use a single partition for deterministic results no matter how many CPUs the host has @@ -1109,7 +1164,7 @@ fn create_data_batch(scenario: Scenario) -> Vec { async fn make_test_file_rg( scenario: Scenario, row_per_group: usize, - custom_schema: Option>, + custom_schema: Option, custom_batches: Option>, ) -> NamedTempFile { let mut output_file = tempfile::Builder::new() diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs b/datafusion/core/tests/parquet/row_group_pruning.rs index c5e2a4c917b0d..df0c5a7de9848 100644 --- a/datafusion/core/tests/parquet/row_group_pruning.rs +++ b/datafusion/core/tests/parquet/row_group_pruning.rs @@ -18,8 +18,12 @@ //! This file contains an end to end test of parquet pruning. It writes //! data into a parquet file and then verifies row groups are pruned as //! expected. +use std::sync::Arc; + +use arrow::array::{ArrayRef, Int32Array, RecordBatch}; +use arrow_schema::{DataType, Field, Schema}; use datafusion::prelude::SessionConfig; -use datafusion_common::ScalarValue; +use datafusion_common::{DataFusionError, ScalarValue}; use itertools::Itertools; use crate::parquet::Unit::RowGroup; @@ -30,12 +34,12 @@ struct RowGroupPruningTest { query: String, expected_errors: Option, expected_row_group_matched_by_statistics: Option, - // expected_row_group_fully_matched_by_statistics: Option, + expected_row_group_fully_matched_by_statistics: Option, expected_row_group_pruned_by_statistics: Option, expected_files_pruned_by_statistics: Option, expected_row_group_matched_by_bloom_filter: Option, expected_row_group_pruned_by_bloom_filter: Option, - // expected_limit_pruned_row_groups: Option, + expected_limit_pruned_row_groups: Option, expected_rows: usize, } impl RowGroupPruningTest { @@ -47,11 +51,11 @@ impl RowGroupPruningTest { expected_errors: None, expected_row_group_matched_by_statistics: None, expected_row_group_pruned_by_statistics: None, - // expected_row_group_fully_matched_by_statistics: None, + expected_row_group_fully_matched_by_statistics: None, expected_files_pruned_by_statistics: None, expected_row_group_matched_by_bloom_filter: None, expected_row_group_pruned_by_bloom_filter: None, - // expected_limit_pruned_row_groups: None, + expected_limit_pruned_row_groups: None, expected_rows: 0, } } @@ -81,7 +85,6 @@ impl RowGroupPruningTest { } // Set the expected fully matched row groups by statistics - /* fn with_fully_matched_by_stats( mut self, fully_matched_by_stats: Option, @@ -90,12 +93,6 @@ impl RowGroupPruningTest { self } - fn with_limit_pruned_row_groups(mut self, pruned_by_limit: Option) -> Self { - self.expected_limit_pruned_row_groups = pruned_by_limit; - self - } - */ - // Set the expected pruned row groups by statistics fn with_pruned_by_stats(mut self, pruned_by_stats: Option) -> Self { self.expected_row_group_pruned_by_statistics = pruned_by_stats; @@ -119,6 +116,11 @@ impl RowGroupPruningTest { self } + fn with_limit_pruned_row_groups(mut self, pruned_by_limit: Option) -> Self { + self.expected_limit_pruned_row_groups = pruned_by_limit; + self + } + /// Set the number of expected rows from the output of this test fn with_expected_rows(mut self, rows: usize) -> Self { self.expected_rows = rows; @@ -155,12 +157,12 @@ impl RowGroupPruningTest { ); let bloom_filter_metrics = output.row_groups_bloom_filter(); assert_eq!( - bloom_filter_metrics.map(|(_pruned, matched)| matched), + bloom_filter_metrics.as_ref().map(|pm| pm.total_matched()), self.expected_row_group_matched_by_bloom_filter, "mismatched row_groups_matched_bloom_filter", ); assert_eq!( - bloom_filter_metrics.map(|(pruned, _matched)| pruned), + bloom_filter_metrics.map(|pm| pm.total_pruned()), self.expected_row_group_pruned_by_bloom_filter, "mismatched row_groups_pruned_bloom_filter", ); @@ -175,6 +177,64 @@ impl RowGroupPruningTest { ); } + // Execute the test with the current configuration + async fn test_row_group_prune_with_custom_data( + self, + schema: Arc, + batches: Vec, + max_row_per_group: usize, + ) { + let output = ContextWithParquet::with_custom_data( + self.scenario, + RowGroup(max_row_per_group), + schema, + batches, + ) + .await + .query(&self.query) + .await; + + println!("{}", output.description()); + assert_eq!( + output.predicate_evaluation_errors(), + self.expected_errors, + "mismatched predicate_evaluation error" + ); + assert_eq!( + output.row_groups_matched_statistics(), + self.expected_row_group_matched_by_statistics, + "mismatched row_groups_matched_statistics", + ); + assert_eq!( + output.row_groups_fully_matched_statistics(), + self.expected_row_group_fully_matched_by_statistics, + "mismatched row_groups_fully_matched_statistics", + ); + assert_eq!( + output.row_groups_pruned_statistics(), + self.expected_row_group_pruned_by_statistics, + "mismatched row_groups_pruned_statistics", + ); + assert_eq!( + output.files_ranges_pruned_statistics(), + self.expected_files_pruned_by_statistics, + "mismatched files_ranges_pruned_statistics", + ); + assert_eq!( + output.limit_pruned_row_groups(), + self.expected_limit_pruned_row_groups, + "mismatched limit_pruned_row_groups", + ); + assert_eq!( + output.result_rows, + self.expected_rows, + "Expected {} rows, got {}: {}", + output.result_rows, + self.expected_rows, + output.description(), + ); + } + // Execute the test with the current configuration /* async fn test_row_group_prune_with_custom_data( @@ -1723,7 +1783,6 @@ async fn test_bloom_filter_decimal_dict() { .await; } -/* // Helper function to create a batch with a single Int32 column. fn make_i32_batch( name: &str, @@ -1841,8 +1900,8 @@ async fn test_limit_pruning_complex_filter() -> datafusion_common::error::Result } #[tokio::test] -async fn test_limit_pruning_multiple_fully_matched( -) -> datafusion_common::error::Result<()> { +async fn test_limit_pruning_multiple_fully_matched() +-> datafusion_common::error::Result<()> { // Test Case 2: Limit requires multiple fully matched row groups // Row Group 0: a=[5,5,5,5] -> Fully matched, 4 rows // Row Group 1: a=[5,5,5,5] -> Fully matched, 4 rows @@ -1950,7 +2009,7 @@ async fn test_limit_pruning_exceeds_fully_matched() -> datafusion_common::error: .with_scenario(Scenario::Int) .with_query(query) .with_expected_errors(Some(0)) - .with_expected_rows(10) // Total: 1 + 3 + 4 + 1 = 9 (less than limit) + .with_expected_rows(10) // Total: 1 + 4 + 4 + 1 = 10 .with_pruned_files(Some(0)) .with_matched_by_stats(Some(4)) // RG0,1,2,3 matched .with_fully_matched_by_stats(Some(2)) @@ -1958,7 +2017,5 @@ async fn test_limit_pruning_exceeds_fully_matched() -> datafusion_common::error: .with_limit_pruned_row_groups(Some(0)) // No limit pruning since we need all RGs .test_row_group_prune_with_custom_data(schema, batches, 4) .await; - Ok(()) } -*/ diff --git a/datafusion/datasource-parquet/src/metrics.rs b/datafusion/datasource-parquet/src/metrics.rs index c45d234f3b512..f6e2ba0641358 100644 --- a/datafusion/datasource-parquet/src/metrics.rs +++ b/datafusion/datasource-parquet/src/metrics.rs @@ -44,9 +44,11 @@ pub struct ParquetFileMetrics { pub files_ranges_pruned_statistics: PruningMetrics, /// Number of times the predicate could not be evaluated pub predicate_evaluation_errors: Count, - /// Number of row groups whose bloom filters were checked, tracked with matched/pruned counts + /// Number of row groups pruned by bloom filters pub row_groups_pruned_bloom_filter: PruningMetrics, - /// Number of row groups whose statistics were checked, tracked with matched/pruned counts + /// Number of row groups pruned due to limit pruning. + pub limit_pruned_row_groups: PruningMetrics, + /// Number of row groups pruned by statistics pub row_groups_pruned_statistics: PruningMetrics, /// Number of row groups whose bloom filters were checked and matched (not pruned) pub row_groups_matched_bloom_filter: Count, @@ -104,15 +106,8 @@ impl ParquetFileMetrics { let limit_pruned_row_groups = MetricBuilder::new(metrics) .with_new_label("filename", filename.to_string()) - .counter("limit_pruned_row_groups", partition); - - let row_groups_fully_matched_statistics = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) - .counter("row_groups_fully_matched_statistics", partition); - - let row_groups_matched_statistics = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) - .counter("row_groups_matched_statistics", partition); + .with_type(MetricType::SUMMARY) + .pruning_metrics("limit_pruned_row_groups", partition); let row_groups_pruned_statistics = MetricBuilder::new(metrics) .with_new_label("filename", filename.to_string()) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 5701b22ccbcc5..0099643649d07 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -61,13 +61,15 @@ use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader}; /// Implements [`FileOpener`] for a parquet file pub(super) struct ParquetOpener { /// Execution partition index - pub partition_index: usize, - /// Column indexes in `table_schema` needed by the query - pub projection: Arc<[usize]>, + pub(crate) partition_index: usize, + /// Projection to apply on top of the table schema (i.e. can reference partition columns). + pub projection: ProjectionExprs, /// Target number of rows in each output RecordBatch pub batch_size: usize, /// Optional limit on the number of rows to read - pub limit: Option, + pub(crate) limit: Option, + /// If should keep the output rows in order + pub preserve_order: bool, /// Optional predicate to apply during the scan pub predicate: Option>, /// Schema of the output table without partition columns. @@ -166,6 +168,9 @@ impl FileOpener for ParquetOpener { let encryption_context = self.get_encryption_context(); let max_predicate_cache_size = self.max_predicate_cache_size; + let reverse_row_groups = self.reverse_row_groups; + let preserve_order = self.preserve_order; + Ok(Box::pin(async move { #[cfg(feature = "parquet_encryption")] let file_decryption_properties = encryption_context @@ -410,13 +415,14 @@ impl FileOpener for ParquetOpener { .add_matched(n_remaining_row_groups); } - // Prune by limit - if enable_limit_pruning { - if let Some(limit) = limit { - row_groups.prune_by_limit(limit, rg_metadata, &file_metrics); - } + // Prune by limit if limit is set and limit order is not sensitive + if let (Some(limit), false) = (limit, preserve_order) { + row_groups.prune_by_limit(limit, rg_metadata, &file_metrics); } + // -------------------------------------------------------- + // Step: prune pages from the kept row groups + // let mut access_plan = row_groups.build(); // page index pruning: if all data on individual pages can // be ruled using page metadata, rows from other columns @@ -792,7 +798,249 @@ mod test { use object_store::{memory::InMemory, path::Path, ObjectStore}; use parquet::arrow::ArrowWriter; - use crate::{opener::ParquetOpener, DefaultParquetFileReaderFactory}; + /// Builder for creating [`ParquetOpener`] instances with sensible defaults for tests. + /// This helps reduce code duplication and makes it clear what differs between test cases. + struct ParquetOpenerBuilder { + store: Option>, + table_schema: Option, + partition_index: usize, + projection_indices: Option>, + projection: Option, + batch_size: usize, + limit: Option, + predicate: Option>, + metadata_size_hint: Option, + metrics: ExecutionPlanMetricsSet, + pushdown_filters: bool, + reorder_filters: bool, + force_filter_selections: bool, + enable_page_index: bool, + enable_bloom_filter: bool, + enable_row_group_stats_pruning: bool, + coerce_int96: Option, + max_predicate_cache_size: Option, + reverse_row_groups: bool, + preserve_order: bool, + } + + impl ParquetOpenerBuilder { + /// Create a new builder with sensible defaults for tests. + fn new() -> Self { + Self { + store: None, + table_schema: None, + partition_index: 0, + projection_indices: None, + projection: None, + batch_size: 1024, + limit: None, + predicate: None, + metadata_size_hint: None, + metrics: ExecutionPlanMetricsSet::new(), + pushdown_filters: false, + reorder_filters: false, + force_filter_selections: false, + enable_page_index: false, + enable_bloom_filter: false, + enable_row_group_stats_pruning: false, + coerce_int96: None, + max_predicate_cache_size: None, + reverse_row_groups: false, + preserve_order: false, + } + } + + /// Set the object store (required for building). + fn with_store(mut self, store: Arc) -> Self { + self.store = Some(store); + self + } + + /// Create a simple table schema from a file schema (for files without partition columns). + fn with_schema(mut self, file_schema: SchemaRef) -> Self { + self.table_schema = Some(TableSchema::from_file_schema(file_schema)); + self + } + + /// Set a custom table schema (for files with partition columns). + fn with_table_schema(mut self, table_schema: TableSchema) -> Self { + self.table_schema = Some(table_schema); + self + } + + /// Set projection by column indices (convenience method for common case). + fn with_projection_indices(mut self, indices: &[usize]) -> Self { + self.projection_indices = Some(indices.to_vec()); + self + } + + /// Set the predicate. + fn with_predicate(mut self, predicate: Arc) -> Self { + self.predicate = Some(predicate); + self + } + + /// Enable pushdown filters. + fn with_pushdown_filters(mut self, enable: bool) -> Self { + self.pushdown_filters = enable; + self + } + + /// Enable filter reordering. + fn with_reorder_filters(mut self, enable: bool) -> Self { + self.reorder_filters = enable; + self + } + + /// Enable row group stats pruning. + fn with_row_group_stats_pruning(mut self, enable: bool) -> Self { + self.enable_row_group_stats_pruning = enable; + self + } + + /// Set reverse row groups flag. + fn with_reverse_row_groups(mut self, enable: bool) -> Self { + self.reverse_row_groups = enable; + self + } + + /// Build the ParquetOpener instance. + /// + /// # Panics + /// + /// Panics if required fields (store, schema/table_schema) are not set. + fn build(self) -> ParquetOpener { + let store = self + .store + .expect("ParquetOpenerBuilder: store must be set via with_store()"); + let table_schema = self.table_schema.expect( + "ParquetOpenerBuilder: table_schema must be set via with_schema() or with_table_schema()", + ); + let file_schema = Arc::clone(table_schema.file_schema()); + + let projection = if let Some(projection) = self.projection { + projection + } else if let Some(indices) = self.projection_indices { + ProjectionExprs::from_indices(&indices, &file_schema) + } else { + // Default: project all columns + let all_indices: Vec = (0..file_schema.fields().len()).collect(); + ProjectionExprs::from_indices(&all_indices, &file_schema) + }; + + ParquetOpener { + partition_index: self.partition_index, + projection, + batch_size: self.batch_size, + limit: self.limit, + predicate: self.predicate, + table_schema, + metadata_size_hint: self.metadata_size_hint, + metrics: self.metrics, + parquet_file_reader_factory: Arc::new( + DefaultParquetFileReaderFactory::new(store), + ), + pushdown_filters: self.pushdown_filters, + reorder_filters: self.reorder_filters, + force_filter_selections: self.force_filter_selections, + enable_page_index: self.enable_page_index, + enable_bloom_filter: self.enable_bloom_filter, + enable_row_group_stats_pruning: self.enable_row_group_stats_pruning, + coerce_int96: self.coerce_int96, + #[cfg(feature = "parquet_encryption")] + file_decryption_properties: None, + expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory), + #[cfg(feature = "parquet_encryption")] + encryption_factory: None, + max_predicate_cache_size: self.max_predicate_cache_size, + reverse_row_groups: self.reverse_row_groups, + preserve_order: self.preserve_order, + } + } + } + + fn constant_int_stats() -> (Statistics, SchemaRef) { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + let statistics = Statistics { + num_rows: Precision::Exact(3), + total_byte_size: Precision::Absent, + column_statistics: vec![ + ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::from(5i32)), + min_value: Precision::Exact(ScalarValue::from(5i32)), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + byte_size: Precision::Absent, + }, + ColumnStatistics::new_unknown(), + ], + }; + (statistics, schema) + } + + #[test] + fn extract_constant_columns_non_null() { + let (statistics, schema) = constant_int_stats(); + let constants = constant_columns_from_stats(Some(&statistics), &schema); + assert_eq!(constants.len(), 1); + assert_eq!(constants.get("a"), Some(&ScalarValue::from(5i32))); + assert!(!constants.contains_key("b")); + } + + #[test] + fn extract_constant_columns_all_null() { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)])); + let statistics = Statistics { + num_rows: Precision::Exact(2), + total_byte_size: Precision::Absent, + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(2), + max_value: Precision::Absent, + min_value: Precision::Absent, + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + byte_size: Precision::Absent, + }], + }; + + let constants = constant_columns_from_stats(Some(&statistics), &schema); + assert_eq!( + constants.get("a"), + Some(&ScalarValue::Utf8(None)), + "all-null column should be treated as constant null" + ); + } + + #[test] + fn rewrite_projection_to_literals() { + let (statistics, schema) = constant_int_stats(); + let constants = constant_columns_from_stats(Some(&statistics), &schema); + let projection = ProjectionExprs::from_indices(&[0, 1], &schema); + + let rewritten = projection + .try_map_exprs(|expr| replace_columns_with_literals(expr, &constants)) + .unwrap(); + let exprs = rewritten.as_ref(); + assert!(exprs[0].expr.as_any().downcast_ref::().is_some()); + assert!(exprs[1].expr.as_any().downcast_ref::().is_some()); + + // Only column `b` should remain in the projection mask + assert_eq!(rewritten.column_indices(), vec![1]); + } + + #[test] + fn rewrite_physical_expr_literal() { + let mut constants = ConstantColumns::new(); + constants.insert("a".to_string(), ScalarValue::from(7i32)); + let expr: Arc = Arc::new(Column::new("a", 0)); + + let rewritten = replace_columns_with_literals(expr, &constants).unwrap(); + assert!(rewritten.as_any().downcast_ref::().is_some()); + } async fn count_batches_and_rows( mut stream: std::pin::Pin< diff --git a/datafusion/datasource-parquet/src/row_group_filter.rs b/datafusion/datasource-parquet/src/row_group_filter.rs index f0d483ba35b10..698866a212ce0 100644 --- a/datafusion/datasource-parquet/src/row_group_filter.rs +++ b/datafusion/datasource-parquet/src/row_group_filter.rs @@ -24,8 +24,8 @@ use arrow::datatypes::Schema; use datafusion_common::pruning::PruningStatistics; use datafusion_common::{Column, Result, ScalarValue}; use datafusion_datasource::FileRange; -use datafusion_physical_expr::expressions::NotExpr; use datafusion_physical_expr::PhysicalExprSimplifier; +use datafusion_physical_expr::expressions::NotExpr; use datafusion_pruning::PruningPredicate; use parquet::arrow::arrow_reader::statistics::StatisticsConverter; use parquet::arrow::parquet_column; @@ -48,7 +48,8 @@ use parquet::{ pub struct RowGroupAccessPlanFilter { /// which row groups should be accessed access_plan: ParquetAccessPlan, - /// which row groups are fully contained within the pruning predicate + /// Row groups where ALL rows are known to match the pruning predicate + /// (the predicate does not filter any rows) is_fully_matched: Vec, } @@ -84,6 +85,93 @@ impl RowGroupAccessPlanFilter { } /// Prunes the access plan based on the limit and fully contained row groups. + /// + /// The pruning works by leveraging the concept of fully matched row groups. Consider a query like: + /// `WHERE species LIKE 'Alpine%' AND s >= 50 LIMIT N` + /// + /// After initial filtering, row groups can be classified into three states: + /// + /// 1. Not Matching / Pruned + /// 2. Partially Matching (Row Group/Page contains some matches) + /// 3. Fully Matching (Entire range is within predicate) + /// + /// +-----------------------------------------------------------------------+ + /// | NOT MATCHING | + /// | Row group 1 | + /// | +-----------------------------------+-----------------------------+ | + /// | | SPECIES | S | | + /// | +-----------------------------------+-----------------------------+ | + /// | | Snow Vole | 7 | | + /// | | Brown Bear | 133 ✅ | | + /// | | Gray Wolf | 82 ✅ | | + /// | +-----------------------------------+-----------------------------+ | + /// +-----------------------------------------------------------------------+ + /// + /// +---------------------------------------------------------------------------+ + /// | PARTIALLY MATCHING | + /// | | + /// | Row group 2 Row group 4 | + /// | +------------------+--------------+ +------------------+----------+ | + /// | | SPECIES | S | | SPECIES | S | | + /// | +------------------+--------------+ +------------------+----------+ | + /// | | Lynx | 71 ✅ | | Europ. Mole | 4 | | + /// | | Red Fox | 40 | | Polecat | 16 | | + /// | | Alpine Bat ✅ | 6 | | Alpine Ibex ✅ | 97 ✅ | | + /// | +------------------+--------------+ +------------------+----------+ | + /// +---------------------------------------------------------------------------+ + /// + /// +-----------------------------------------------------------------------+ + /// | FULLY MATCHING | + /// | Row group 3 | + /// | +-----------------------------------+-----------------------------+ | + /// | | SPECIES | S | | + /// | +-----------------------------------+-----------------------------+ | + /// | | Alpine Ibex ✅ | 101 ✅ | | + /// | | Alpine Goat ✅ | 76 ✅ | | + /// | | Alpine Sheep ✅ | 83 ✅ | | + /// | +-----------------------------------+-----------------------------+ | + /// +-----------------------------------------------------------------------+ + /// + /// ### Identification of Fully Matching Row Groups + /// + /// DataFusion identifies row groups where ALL rows satisfy the filter by inverting the + /// predicate and checking if statistics prove the inverted version is false for the group. + /// + /// For example, prefix matches like `species LIKE 'Alpine%'` are pruned using ranges: + /// 1. Candidate Range: `species >= 'Alpine' AND species < 'Alpinf'` + /// 2. Inverted Condition (to prove full match): `species < 'Alpine' OR species >= 'Alpinf'` + /// 3. Statistical Evaluation (check if any row *could* satisfy the inverted condition): + /// `min < 'Alpine' OR max >= 'Alpinf'` + /// + /// If this evaluation is **false**, it proves no row can fail the original filter, + /// so the row group is **FULLY MATCHING**. + /// + /// ### Impact of Statistics Truncation + /// + /// The precision of pruning depends on the metadata quality. Truncated statistics + /// may prevent the system from proving a full match. + /// + /// **Example**: `WHERE species LIKE 'Alpine%'` (Target range: `['Alpine', 'Alpinf')`) + /// + /// | Truncation Length | min / max | Inverted Evaluation | Status | + /// |-------------------|---------------------|---------------------------------------------------------------------|------------------------| + /// | **Length 6** | `Alpine` / `Alpine` | `"Alpine" < "Alpine" (F) OR "Alpine" >= "Alpinf" (F)` -> **false** | **FULLY MATCHING** | + /// | **Length 3** | `Alp` / `Alq` | `"Alp" < "Alpine" (T) OR "Alq" >= "Alpinf" (T)` -> **true** | **PARTIALLY MATCHING** | + /// + /// Even though Row Group 3 only contains matching rows, truncation to length 3 makes + /// the statistics `[Alp, Alq]` too broad to prove it (they could include "Alpha"). + /// The system must conservatively scan the group. + /// + /// Without limit pruning: Scan Partition 2 → Partition 3 → Partition 4 (until limit reached) + /// With limit pruning: If Partition 3 contains enough rows to satisfy the limit, + /// skip Partitions 2 and 4 entirely and go directly to Partition 3. + /// + /// This optimization is particularly effective when: + /// - The limit is small relative to the total dataset size + /// - There are row groups that are fully matched by the filter predicates + /// - The fully matched row groups contain sufficient rows to satisfy the limit + /// + /// For more information, see the [paper](https://arxiv.org/pdf/2504.11540)'s "Pruning for LIMIT Queries" part pub fn prune_by_limit( &mut self, limit: usize, @@ -93,7 +181,8 @@ impl RowGroupAccessPlanFilter { let mut fully_matched_row_group_indexes: Vec = Vec::new(); let mut fully_matched_rows_count: usize = 0; - // Iterate through the currently accessible row groups + // Iterate through the currently accessible row groups and try to + // find a set of matching row groups that can satisfy the limit for &idx in self.access_plan.row_group_indexes().iter() { if self.is_fully_matched[idx] { let row_group_row_count = rg_metadata[idx].num_rows() as usize; @@ -105,13 +194,15 @@ impl RowGroupAccessPlanFilter { } } + // If we can satisfy the limit with fully matching row groups, + // rewrite the plan to do so if fully_matched_rows_count >= limit { let original_num_accessible_row_groups = self.access_plan.row_group_indexes().len(); let new_num_accessible_row_groups = fully_matched_row_group_indexes.len(); let pruned_count = original_num_accessible_row_groups .saturating_sub(new_num_accessible_row_groups); - metrics.limit_pruned_row_groups.add(pruned_count); + metrics.limit_pruned_row_groups.add_pruned(pruned_count); let mut new_access_plan = ParquetAccessPlan::new_none(rg_metadata.len()); for &idx in &fully_matched_row_group_indexes { @@ -197,45 +288,15 @@ impl RowGroupAccessPlanFilter { } } - // Note: this part of code shouldn't be expensive with a limited number of row groups - // If we do find it's expensive, we can consider optimizing it further. - if !fully_contained_candidates_original_idx.is_empty() { - // Use NotExpr to create the inverted predicate - let inverted_expr = - Arc::new(NotExpr::new(Arc::clone(predicate.orig_expr()))); - // Simplify the NOT expression (e.g., NOT(c1 = 0) -> c1 != 0) - // before building the pruning predicate - let mut simplifier = PhysicalExprSimplifier::new(arrow_schema); - let inverted_expr = simplifier.simplify(inverted_expr).unwrap(); - if let Ok(inverted_predicate) = PruningPredicate::try_new( - inverted_expr, - Arc::clone(predicate.schema()), - ) { - let inverted_pruning_stats = RowGroupPruningStatistics { - parquet_schema, - row_group_metadatas: fully_contained_candidates_original_idx - .iter() - .map(|&i| &groups[i]) - .collect::>(), - arrow_schema, - }; - - if let Ok(inverted_values) = - inverted_predicate.prune(&inverted_pruning_stats) - { - for (i, &original_row_group_idx) in - fully_contained_candidates_original_idx.iter().enumerate() - { - // If the inverted predicate *also* prunes this row group (meaning inverted_values[i] is false), - // it implies that *all* rows in this group satisfy the original predicate. - if !inverted_values[i] { - self.is_fully_matched[original_row_group_idx] = true; - metrics.row_groups_fully_matched_statistics.add(1); - } - } - } - } - } + // Check if any of the matched row groups are fully contained by the predicate + self.identify_fully_matched_row_groups( + &fully_contained_candidates_original_idx, + arrow_schema, + parquet_schema, + groups, + predicate, + metrics, + ); } // stats filter array could not be built, so we can't prune Err(e) => { @@ -245,6 +306,68 @@ impl RowGroupAccessPlanFilter { } } + /// Identifies row groups that are fully matched by the predicate. + /// + /// This optimization checks whether all rows in a row group satisfy the predicate + /// by inverting the predicate and checking if it prunes the row group. If the + /// inverted predicate prunes a row group, it means no rows match the inverted + /// predicate, which implies all rows match the original predicate. + /// + /// Note: This optimization is relatively inexpensive for a limited number of row groups. + fn identify_fully_matched_row_groups( + &mut self, + candidate_row_group_indices: &[usize], + arrow_schema: &Schema, + parquet_schema: &SchemaDescriptor, + groups: &[RowGroupMetaData], + predicate: &PruningPredicate, + metrics: &ParquetFileMetrics, + ) { + if candidate_row_group_indices.is_empty() { + return; + } + + // Use NotExpr to create the inverted predicate + let inverted_expr = Arc::new(NotExpr::new(Arc::clone(predicate.orig_expr()))); + + // Simplify the NOT expression (e.g., NOT(c1 = 0) -> c1 != 0) + // before building the pruning predicate + let simplifier = PhysicalExprSimplifier::new(arrow_schema); + let Ok(inverted_expr) = simplifier.simplify(inverted_expr) else { + return; + }; + + let Ok(inverted_predicate) = + PruningPredicate::try_new(inverted_expr, Arc::clone(predicate.schema())) + else { + return; + }; + + let inverted_pruning_stats = RowGroupPruningStatistics { + parquet_schema, + row_group_metadatas: candidate_row_group_indices + .iter() + .map(|&i| &groups[i]) + .collect::>(), + arrow_schema, + }; + + let Ok(inverted_values) = inverted_predicate.prune(&inverted_pruning_stats) + else { + return; + }; + + for (i, &original_row_group_idx) in candidate_row_group_indices.iter().enumerate() + { + // If the inverted predicate *also* prunes this row group (meaning inverted_values[i] is false), + // it implies that *all* rows in this group satisfy the original predicate. + if !inverted_values[i] { + self.is_fully_matched[original_row_group_idx] = true; + metrics.row_groups_pruned_statistics.add_fully_matched(1); + } + } + } + /// Prune remaining row groups using available bloom filters and the /// [`PruningPredicate`]. /// diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 339d36b57cc35..6281259c191f0 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -564,6 +564,7 @@ impl FileSource for ParquetSource { .batch_size .expect("Batch size must set before creating ParquetOpener"), limit: base_config.limit, + preserve_order: base_config.preserve_order, predicate: self.predicate.clone(), logical_file_schema: Arc::clone(base_config.file_schema()), partition_fields: base_config.table_partition_cols().clone(), diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 02d9762a4a396..5365ddda52760 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -187,6 +187,11 @@ pub struct FileScanConfig { /// The maximum number of records to read from this plan. If `None`, /// all records after filtering are returned. pub limit: Option, + /// Whether the scan's limit is order sensitive + /// When `true`, files must be read in the exact order specified to produce + /// correct results (e.g., for `ORDER BY ... LIMIT` queries). When `false`, + /// DataFusion may reorder file processing for optimization without affecting correctness. + pub preserve_order: bool, /// All equivalent lexicographical orderings that describe the schema. pub output_ordering: Vec, /// File compression type @@ -269,7 +274,7 @@ pub struct FileScanConfigBuilder { table_schema: TableSchema, file_source: Arc, limit: Option, - projection_indices: Option>, + preserve_order: bool, constraints: Option, file_groups: Vec, statistics: Option, @@ -304,7 +309,7 @@ impl FileScanConfigBuilder { file_compression_type: None, new_lines_in_values: None, limit: None, - projection_indices: None, + preserve_order: false, constraints: None, batch_size: None, expr_adapter_factory: None, @@ -319,6 +324,15 @@ impl FileScanConfigBuilder { self } + /// Set whether the limit should be order-sensitive. + /// When `true`, files must be read in the exact order specified to produce + /// correct results (e.g., for `ORDER BY ... LIMIT` queries). When `false`, + /// DataFusion may reorder file processing for optimization without affecting correctness. + pub fn with_preserve_order(mut self, order_sensitive: bool) -> Self { + self.preserve_order = order_sensitive; + self + } + /// Set the file source for scanning files. /// /// This method allows you to change the file source implementation (e.g. ParquetSource, CsvSource, etc.) @@ -465,7 +479,7 @@ impl FileScanConfigBuilder { table_schema, file_source, limit, - projection_indices, + preserve_order, constraints, file_groups, statistics, @@ -494,12 +508,15 @@ impl FileScanConfigBuilder { ProjectionExprs::from_indices(&indices, table_schema.table_schema()) }); + // If there is an output ordering, we should preserve it. + let preserve_order = preserve_order || !output_ordering.is_empty(); + FileScanConfig { object_store_url, table_schema, file_source, limit, - projection_exprs, + preserve_order, constraints, file_groups, output_ordering, @@ -524,9 +541,7 @@ impl From for FileScanConfigBuilder { file_compression_type: Some(config.file_compression_type), new_lines_in_values: Some(config.new_lines_in_values), limit: config.limit, - projection_indices: config - .projection_exprs - .map(|p| p.ordered_column_indices()), + preserve_order: config.preserve_order, constraints: Some(config.constraints), batch_size: config.batch_size, expr_adapter_factory: config.expr_adapter_factory, @@ -757,6 +772,44 @@ impl DataSource for FileScanConfig { } } } + + fn try_pushdown_sort( + &self, + order: &[PhysicalSortExpr], + ) -> Result>> { + // Delegate to FileSource to check if reverse scanning can satisfy the request. + let pushdown_result = self + .file_source + .try_reverse_output(order, &self.eq_properties())?; + + match pushdown_result { + SortOrderPushdownResult::Exact { inner } => { + Ok(SortOrderPushdownResult::Exact { + inner: self.rebuild_with_source(inner, true)?, + }) + } + SortOrderPushdownResult::Inexact { inner } => { + Ok(SortOrderPushdownResult::Inexact { + inner: self.rebuild_with_source(inner, false)?, + }) + } + SortOrderPushdownResult::Unsupported => { + Ok(SortOrderPushdownResult::Unsupported) + } + } + } + + fn with_preserve_order(&self, preserve_order: bool) -> Option> { + if self.preserve_order == preserve_order { + return Some(Arc::new(self.clone())); + } + + let new_config = FileScanConfig { + preserve_order, + ..self.clone() + }; + Some(Arc::new(new_config)) + } } impl FileScanConfig { diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 7169997bd0316..54369ece4fd9e 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -189,6 +189,30 @@ pub trait DataSource: Send + Sync + Debug { vec![PushedDown::No; filters.len()], )) } + + /// Try to create a new DataSource that produces data in the specified sort order. + /// + /// # Arguments + /// * `order` - The desired output ordering + /// + /// # Returns + /// * `Ok(SortOrderPushdownResult::Exact { .. })` - Created a source that guarantees exact ordering + /// * `Ok(SortOrderPushdownResult::Inexact { .. })` - Created a source optimized for the ordering + /// * `Ok(SortOrderPushdownResult::Unsupported)` - Cannot optimize for this ordering + /// * `Err(e)` - Error occurred + /// + /// Default implementation returns `Unsupported`. + fn try_pushdown_sort( + &self, + _order: &[PhysicalSortExpr], + ) -> Result>> { + Ok(SortOrderPushdownResult::Unsupported) + } + + /// Returns a variant of this `DataSource` that is aware of order-sensitivity. + fn with_preserve_order(&self, _preserve_order: bool) -> Option> { + None + } } /// [`ExecutionPlan`] that reads one or more files @@ -371,6 +395,31 @@ impl ExecutionPlan for DataSourceExec { }), } } + + fn try_pushdown_sort( + &self, + order: &[PhysicalSortExpr], + ) -> Result>> { + // Delegate to the data source and wrap result with DataSourceExec + self.data_source + .try_pushdown_sort(order)? + .try_map(|new_data_source| { + let new_exec = self.clone().with_data_source(new_data_source); + Ok(Arc::new(new_exec) as Arc) + }) + } + + fn with_preserve_order( + &self, + preserve_order: bool, + ) -> Option> { + self.data_source + .with_preserve_order(preserve_order) + .map(|new_data_source| { + Arc::new(self.clone().with_data_source(new_data_source)) + as Arc + }) + } } impl DataSourceExec { diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 5db71417bc8fd..59a9d703f670a 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -268,15 +268,10 @@ fn optimize_projections( Some(projection) => indices.into_mapped_indices(|idx| projection[idx]), None => indices.into_inner(), }; - return TableScan::try_new( - table_name, - source, - Some(projection), - filters, - fetch, - ) - .map(LogicalPlan::TableScan) - .map(Transformed::yes); + let new_scan = + TableScan::try_new(table_name, source, Some(projection), filters, fetch)?; + + return Ok(Transformed::yes(LogicalPlan::TableScan(new_scan))); } // Other node types are handled below _ => {} diff --git a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs index 28d187bbf8930..a42abad1dcc1d 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs @@ -584,11 +584,17 @@ fn analyze_immediate_sort_removal( // Remove the sort: node.children = node.children.swap_remove(0).children; if let Some(fetch) = sort_exec.fetch() { + let required_ordering = sort_exec.properties().output_ordering().cloned(); // If the sort has a fetch, we need to add a limit: if properties.output_partitioning().partition_count() == 1 { - Arc::new(GlobalLimitExec::new(Arc::clone(sort_input), 0, Some(fetch))) + let mut global_limit = + GlobalLimitExec::new(Arc::clone(sort_input), 0, Some(fetch)); + global_limit.set_required_ordering(required_ordering); + Arc::new(global_limit) } else { - Arc::new(LocalLimitExec::new(Arc::clone(sort_input), fetch)) + let mut local_limit = LocalLimitExec::new(Arc::clone(sort_input), fetch); + local_limit.set_required_ordering(required_ordering); + Arc::new(local_limit) } } else { Arc::clone(sort_input) diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs b/datafusion/physical-optimizer/src/limit_pushdown.rs index 7469c3af9344c..e0079578a35db 100644 --- a/datafusion/physical-optimizer/src/limit_pushdown.rs +++ b/datafusion/physical-optimizer/src/limit_pushdown.rs @@ -50,6 +50,7 @@ pub struct GlobalRequirements { fetch: Option, skip: usize, satisfied: bool, + preserve_order: bool, } impl LimitPushdown { @@ -69,6 +70,7 @@ impl PhysicalOptimizerRule for LimitPushdown { fetch: None, skip: 0, satisfied: false, + preserve_order: false, }; pushdown_limits(plan, global_state) } @@ -111,6 +113,13 @@ impl LimitExec { Self::Local(_) => 0, } } + + fn preserve_order(&self) -> bool { + match self { + Self::Global(global) => global.required_ordering().is_some(), + Self::Local(local) => local.required_ordering().is_some(), + } + } } impl From for Arc { @@ -145,6 +154,7 @@ pub fn pushdown_limit_helper( ); global_state.skip = skip; global_state.fetch = fetch; + global_state.preserve_order = limit_exec.preserve_order(); // Now the global state has the most recent information, we can remove // the `LimitExec` plan. We will decide later if we should add it again @@ -241,17 +251,28 @@ pub fn pushdown_limit_helper( let maybe_fetchable = pushdown_plan.with_fetch(skip_and_fetch); if global_state.satisfied { if let Some(plan_with_fetch) = maybe_fetchable { - Ok((Transformed::yes(plan_with_fetch), global_state)) + let plan_with_preserve_order = plan_with_fetch + .with_preserve_order(global_state.preserve_order) + .unwrap_or(plan_with_fetch); + Ok((Transformed::yes(plan_with_preserve_order), global_state)) } else { Ok((Transformed::no(pushdown_plan), global_state)) } } else { global_state.satisfied = true; pushdown_plan = if let Some(plan_with_fetch) = maybe_fetchable { + let plan_with_preserve_order = plan_with_fetch + .with_preserve_order(global_state.preserve_order) + .unwrap_or(plan_with_fetch); + if global_skip > 0 { - add_global_limit(plan_with_fetch, global_skip, Some(global_fetch)) + add_global_limit( + plan_with_preserve_order, + global_skip, + Some(global_fetch), + ) } else { - plan_with_fetch + plan_with_preserve_order } } else { add_limit(pushdown_plan, global_skip, global_fetch) diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index 31e5a7369cab3..ebf97cee587c4 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -284,6 +284,19 @@ impl ExecutionPlan for CoalescePartitionsExec { })) } + fn with_preserve_order( + &self, + preserve_order: bool, + ) -> Option> { + self.input + .with_preserve_order(preserve_order) + .and_then(|new_input| { + Arc::new(self.clone()) + .with_new_children(vec![new_input]) + .ok() + }) + } + fn gather_filters_for_pushdown( &self, _phase: FilterPushdownPhase, diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 340e5662cebcb..e9f968a13f07b 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -688,6 +688,42 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { ) -> Option> { None } + + /// Try to push down sort ordering requirements to this node. + /// + /// This method is called during sort pushdown optimization to determine if this + /// node can optimize for a requested sort ordering. Implementations should: + /// + /// - Return [`SortOrderPushdownResult::Exact`] if the node can guarantee the exact + /// ordering (allowing the Sort operator to be removed) + /// - Return [`SortOrderPushdownResult::Inexact`] if the node can optimize for the + /// ordering but cannot guarantee perfect sorting (Sort operator is kept) + /// - Return [`SortOrderPushdownResult::Unsupported`] if the node cannot optimize + /// for the ordering + /// + /// For transparent nodes (that preserve ordering), implement this to delegate to + /// children and wrap the result with a new instance of this node. + /// + /// Default implementation returns `Unsupported`. + fn try_pushdown_sort( + &self, + _order: &[PhysicalSortExpr], + ) -> Result>> { + Ok(SortOrderPushdownResult::Unsupported) + } + + /// Returns a variant of this `ExecutionPlan` that is aware of order-sensitivity. + /// + /// This is used to signal to data sources that the output ordering must be + /// preserved, even if it might be more efficient to ignore it (e.g. by + /// skipping some row groups in Parquet). + /// + fn with_preserve_order( + &self, + _preserve_order: bool, + ) -> Option> { + None + } } /// [`ExecutionPlan`] Invariant Level diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 2ee7d283d8ad8..71ef668fcd348 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -570,6 +570,32 @@ impl ExecutionPlan for FilterExec { updated_node, }) } + + fn with_fetch(&self, fetch: Option) -> Option> { + Some(Arc::new(Self { + predicate: Arc::clone(&self.predicate), + input: Arc::clone(&self.input), + metrics: self.metrics.clone(), + default_selectivity: self.default_selectivity, + cache: self.cache.clone(), + projection: self.projection.clone(), + batch_size: self.batch_size, + fetch, + })) + } + + fn with_preserve_order( + &self, + preserve_order: bool, + ) -> Option> { + self.input + .with_preserve_order(preserve_order) + .and_then(|new_input| { + Arc::new(self.clone()) + .with_new_children(vec![new_input]) + .ok() + }) + } } impl EmbeddedProjection for FilterExec { diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index 5bdd697b3555b..fecb70be32133 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -35,6 +35,7 @@ use arrow::record_batch::RecordBatch; use datafusion_common::{internal_err, Result}; use datafusion_execution::TaskContext; +use datafusion_physical_expr::LexOrdering; use futures::stream::{Stream, StreamExt}; use log::trace; @@ -51,6 +52,9 @@ pub struct GlobalLimitExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, cache: PlanProperties, + /// Does the limit have to preserve the order of its input, and if so what is it? + /// Some optimizations may reorder the input if no particular sort is required + required_ordering: Option, } impl GlobalLimitExec { @@ -63,6 +67,7 @@ impl GlobalLimitExec { fetch, metrics: ExecutionPlanMetricsSet::new(), cache, + required_ordering: None, } } @@ -91,6 +96,16 @@ impl GlobalLimitExec { Boundedness::Bounded, ) } + + /// Get the required ordering from limit + pub fn required_ordering(&self) -> &Option { + &self.required_ordering + } + + /// Set the required ordering for limit + pub fn set_required_ordering(&mut self, required_ordering: Option) { + self.required_ordering = required_ordering; + } } impl DisplayAs for GlobalLimitExec { @@ -230,6 +245,9 @@ pub struct LocalLimitExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, cache: PlanProperties, + /// If the child plan is a sort node, after the sort node is removed during + /// physical optimization, we should add the required ordering to the limit node + required_ordering: Option, } impl LocalLimitExec { @@ -241,6 +259,7 @@ impl LocalLimitExec { fetch, metrics: ExecutionPlanMetricsSet::new(), cache, + required_ordering: None, } } @@ -264,6 +283,16 @@ impl LocalLimitExec { Boundedness::Bounded, ) } + + /// Get the required ordering from limit + pub fn required_ordering(&self) -> &Option { + &self.required_ordering + } + + /// Set the required ordering for limit + pub fn set_required_ordering(&mut self, required_ordering: Option) { + self.required_ordering = required_ordering; + } } impl DisplayAs for LocalLimitExec { diff --git a/datafusion/physical-plan/src/metrics/value.rs b/datafusion/physical-plan/src/metrics/value.rs index 298d63e5e216a..81d0f66db2c2b 100644 --- a/datafusion/physical-plan/src/metrics/value.rs +++ b/datafusion/physical-plan/src/metrics/value.rs @@ -372,14 +372,31 @@ impl Drop for ScopedTimerGuard<'_> { pub struct PruningMetrics { pruned: Arc, matched: Arc, + fully_matched: Arc, } impl Display for PruningMetrics { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { let matched = self.matched.load(Ordering::Relaxed); let total = self.pruned.load(Ordering::Relaxed) + matched; - - write!(f, "{total} total → {matched} matched") + let fully_matched = self.fully_matched.load(Ordering::Relaxed); + + if fully_matched != 0 { + write!( + f, + "{} total → {} matched -> {} fully matched", + human_readable_count(total), + human_readable_count(matched), + human_readable_count(fully_matched) + ) + } else { + write!( + f, + "{} total → {} matched", + human_readable_count(total), + human_readable_count(matched) + ) + } } } @@ -395,6 +412,7 @@ impl PruningMetrics { Self { pruned: Arc::new(AtomicUsize::new(0)), matched: Arc::new(AtomicUsize::new(0)), + fully_matched: Arc::new(AtomicUsize::new(0)), } } @@ -412,6 +430,13 @@ impl PruningMetrics { self.matched.fetch_add(n, Ordering::Relaxed); } + /// Add `n` to the metric's fully matched value + pub fn add_fully_matched(&self, n: usize) { + // relaxed ordering for operations on `value` poses no issues + // we're purely using atomic ops with no associated memory ops + self.fully_matched.fetch_add(n, Ordering::Relaxed); + } + /// Subtract `n` to the metric's matched value. pub fn subtract_matched(&self, n: usize) { // relaxed ordering for operations on `value` poses no issues @@ -428,6 +453,11 @@ impl PruningMetrics { pub fn matched(&self) -> usize { self.matched.load(Ordering::Relaxed) } + + /// Number of items fully matched + pub fn fully_matched(&self) -> usize { + self.fully_matched.load(Ordering::Relaxed) + } } /// Counters tracking ratio metrics (e.g. matched vs total) @@ -842,8 +872,11 @@ impl MetricValue { ) => { let pruned = other_pruning_metrics.pruned.load(Ordering::Relaxed); let matched = other_pruning_metrics.matched.load(Ordering::Relaxed); + let fully_matched = + other_pruning_metrics.fully_matched.load(Ordering::Relaxed); pruning_metrics.add_pruned(pruned); pruning_metrics.add_matched(matched); + pruning_metrics.add_fully_matched(fully_matched); } ( Self::Ratio { ratio_metrics, .. }, diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index cfdaa4e9d9fd4..fa1c4629d1795 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -346,6 +346,85 @@ impl ExecutionPlan for ProjectionExec { ) -> Result>> { Ok(FilterPushdownPropagation::if_all(child_pushdown_result)) } + + fn try_pushdown_sort( + &self, + order: &[PhysicalSortExpr], + ) -> Result>> { + let child = self.input(); + let mut child_order = Vec::new(); + + // Check and transform sort expressions + for sort_expr in order { + // Recursively transform the expression + let mut can_pushdown = true; + let transformed = Arc::clone(&sort_expr.expr).transform(|expr| { + if let Some(col) = expr.as_any().downcast_ref::() { + // Check if column index is valid. + // This should always be true but fail gracefully if it's not. + if col.index() >= self.expr().len() { + can_pushdown = false; + return Ok(Transformed::no(expr)); + } + + let proj_expr = &self.expr()[col.index()]; + + // Check if projection expression is a simple column + // We cannot push down order by clauses that depend on + // projected computations as they would have nothing to reference. + if let Some(child_col) = + proj_expr.expr.as_any().downcast_ref::() + { + // Replace with the child column + Ok(Transformed::yes(Arc::new(child_col.clone()) as _)) + } else { + // Projection involves computation, cannot push down + can_pushdown = false; + Ok(Transformed::no(expr)) + } + } else { + Ok(Transformed::no(expr)) + } + })?; + + if !can_pushdown { + return Ok(SortOrderPushdownResult::Unsupported); + } + + child_order.push(PhysicalSortExpr { + expr: transformed.data, + options: sort_expr.options, + }); + } + + // Recursively push down to child node + match child.try_pushdown_sort(&child_order)? { + SortOrderPushdownResult::Exact { inner } => { + let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?; + Ok(SortOrderPushdownResult::Exact { inner: new_exec }) + } + SortOrderPushdownResult::Inexact { inner } => { + let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?; + Ok(SortOrderPushdownResult::Inexact { inner: new_exec }) + } + SortOrderPushdownResult::Unsupported => { + Ok(SortOrderPushdownResult::Unsupported) + } + } + } + + fn with_preserve_order( + &self, + preserve_order: bool, + ) -> Option> { + self.input + .with_preserve_order(preserve_order) + .and_then(|new_input| { + Arc::new(self.clone()) + .with_new_children(vec![new_input]) + .ok() + }) + } } impl ProjectionStream { diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 11f42c8153e08..150f1e60cbbf8 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -245,6 +245,19 @@ impl ExecutionPlan for SortPreservingMergeExec { })) } + fn with_preserve_order( + &self, + preserve_order: bool, + ) -> Option> { + self.input + .with_preserve_order(preserve_order) + .and_then(|new_input| { + Arc::new(self.clone()) + .with_new_children(vec![new_input]) + .ok() + }) + } + fn required_input_distribution(&self) -> Vec { vec![Distribution::UnspecifiedDistribution] } diff --git a/datafusion/sqllogictest/test_files/limit_pruning.slt b/datafusion/sqllogictest/test_files/limit_pruning.slt new file mode 100644 index 0000000000000..8a94bf8adc75f --- /dev/null +++ b/datafusion/sqllogictest/test_files/limit_pruning.slt @@ -0,0 +1,81 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +statement ok +set datafusion.execution.parquet.pushdown_filters = true; + + +statement ok +CREATE TABLE tracking_data AS VALUES +-- ***** Row Group 0 ***** + ('Anow Vole', 7), + ('Brown Bear', 133), + ('Gray Wolf', 82), +-- ***** Row Group 1 ***** + ('Lynx', 71), + ('Red Fox', 40), + ('Alpine Bat', 6), +-- ***** Row Group 2 ***** + ('Nlpine Ibex', 101), + ('Nlpine Goat', 76), + ('Nlpine Sheep', 83), +-- ***** Row Group 3 ***** + ('Europ. Mole', 4), + ('Polecat', 16), + ('Alpine Ibex', 97); + +statement ok +COPY (SELECT column1 as species, column2 as s FROM tracking_data) +TO 'test_files/scratch/limit_pruning/data.parquet' +STORED AS PARQUET +OPTIONS ( + 'format.max_row_group_size' '3' +); + +statement ok +drop table tracking_data; + +statement ok +CREATE EXTERNAL TABLE tracking_data +STORED AS PARQUET +LOCATION 'test_files/scratch/limit_pruning/data.parquet'; + + +statement ok +set datafusion.explain.analyze_level = summary; + +# row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched +# limit_pruned_row_groups=2 total → 0 matched +query TT +explain analyze select * from tracking_data where species > 'M' AND s >= 50 limit 3; +---- +Plan with Metrics DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]}, projection=[species, s], limit=3, file_type=parquet, predicate=species@0 > M AND s@1 >= 50, pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50, required_guarantees=[], metrics=[output_rows=3, elapsed_compute=, output_bytes=, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_rows_pruned=3 total → 3 matched, limit_pruned_row_groups=2 total → 0 matched, bytes_scanned=, metadata_load_time=, scan_efficiency_ratio= (171/2.35 K)] + +# limit_pruned_row_groups=0 total → 0 matched +# because of order by, scan needs to preserve sort, so limit pruning is disabled +query TT +explain analyze select * from tracking_data where species > 'M' AND s >= 50 order by species limit 3; +---- +Plan with Metrics +01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep], metrics=[output_rows=3, elapsed_compute=, output_bytes=] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, elapsed_compute=, output_bytes=, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_rows_pruned=9 total → 9 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=, metadata_load_time=, scan_efficiency_ratio= (521/2.35 K)] + +statement ok +drop table tracking_data; + +statement ok +reset datafusion.explain.analyze_level; diff --git a/docs/source/user-guide/explain-usage.md b/docs/source/user-guide/explain-usage.md index 5a1184539c034..8fe83163813da 100644 --- a/docs/source/user-guide/explain-usage.md +++ b/docs/source/user-guide/explain-usage.md @@ -228,6 +228,7 @@ When predicate pushdown is enabled, `DataSourceExec` with `ParquetSource` gains - `page_index_rows_pruned`: number of rows evaluated by page index filters. The metric reports both how many rows were considered in total and how many matched (were not pruned). - `row_groups_pruned_bloom_filter`: number of row groups evaluated by Bloom Filters, reporting both total checked groups and groups that matched. - `row_groups_pruned_statistics`: number of row groups evaluated by row-group statistics (min/max), reporting both total checked groups and groups that matched. +- `limit_pruned_row_groups`: number of row groups pruned by the limit. - `pushdown_rows_matched`: rows that were tested by any of the above filters, and passed all of them. - `pushdown_rows_pruned`: rows that were tested by any of the above filters, and did not pass at least one of them. - `predicate_evaluation_errors`: number of times evaluating the filter expression failed (expected to be zero in normal operation) From fa75209dabb22023edc41556e27e3c36bdce4eaf Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Tue, 27 Jan 2026 11:30:55 +0800 Subject: [PATCH 2/5] update --- datafusion/core/tests/parquet/mod.rs | 23 +- .../core/tests/parquet/row_group_pruning.rs | 4 +- datafusion/datasource-parquet/src/metrics.rs | 8 - datafusion/datasource-parquet/src/opener.rs | 269 +----------------- .../src/row_group_filter.rs | 4 +- datafusion/datasource-parquet/src/source.rs | 1 - datafusion/datasource/src/file_scan_config.rs | 32 +-- datafusion/datasource/src/source.rs | 32 --- .../physical-plan/src/execution_plan.rs | 23 -- datafusion/physical-plan/src/filter.rs | 13 - datafusion/physical-plan/src/metrics/value.rs | 11 +- datafusion/physical-plan/src/projection.rs | 66 ----- 12 files changed, 28 insertions(+), 458 deletions(-) diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 39df40aae4106..85e11853956af 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -173,12 +173,13 @@ impl TestOutput { if let MetricValue::PruningMetrics { pruning_metrics, .. } = metric.value() - { - total_pruned += pruning_metrics.pruned(); - total_matched += pruning_metrics.matched(); - total_fully_matched += pruning_metrics.fully_matched(); + { + total_pruned += pruning_metrics.pruned(); + total_matched += pruning_metrics.matched(); + total_fully_matched += pruning_metrics.fully_matched(); - found = true; + found = true; + } } } @@ -215,18 +216,6 @@ impl TestOutput { .map(|pm| pm.total_fully_matched()) } - /* - /// The number of row_groups fully matched by statistics - fn row_groups_fully_matched_statistics(&self) -> Option { - self.metric_value("row_groups_fully_matched_statistics") - } - - /// The number of row groups pruned by limit pruning - fn limit_pruned_row_groups(&self) -> Option { - self.metric_value("limit_pruned_row_groups") - } - */ - /// The number of row_groups pruned by statistics fn row_groups_pruned_statistics(&self) -> Option { self.pruning_metric("row_groups_pruned_statistics") diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs b/datafusion/core/tests/parquet/row_group_pruning.rs index df0c5a7de9848..360d212f69e65 100644 --- a/datafusion/core/tests/parquet/row_group_pruning.rs +++ b/datafusion/core/tests/parquet/row_group_pruning.rs @@ -1900,8 +1900,8 @@ async fn test_limit_pruning_complex_filter() -> datafusion_common::error::Result } #[tokio::test] -async fn test_limit_pruning_multiple_fully_matched() --> datafusion_common::error::Result<()> { +async fn test_limit_pruning_multiple_fully_matched( +) -> datafusion_common::error::Result<()> { // Test Case 2: Limit requires multiple fully matched row groups // Row Group 0: a=[5,5,5,5] -> Fully matched, 4 rows // Row Group 1: a=[5,5,5,5] -> Fully matched, 4 rows diff --git a/datafusion/datasource-parquet/src/metrics.rs b/datafusion/datasource-parquet/src/metrics.rs index f6e2ba0641358..db0673651f6e3 100644 --- a/datafusion/datasource-parquet/src/metrics.rs +++ b/datafusion/datasource-parquet/src/metrics.rs @@ -52,12 +52,6 @@ pub struct ParquetFileMetrics { pub row_groups_pruned_statistics: PruningMetrics, /// Number of row groups whose bloom filters were checked and matched (not pruned) pub row_groups_matched_bloom_filter: Count, - /// Number of row groups pruned due to limit pruning. - pub limit_pruned_row_groups: Count, - /// Number of row groups whose statistics were checked and fully matched - pub row_groups_fully_matched_statistics: Count, - /// Number of row groups whose statistics were checked and matched (not pruned) - pub row_groups_matched_statistics: Count, /// Total number of bytes scanned pub bytes_scanned: Count, /// Total rows filtered out by predicates pushed into parquet scan @@ -174,8 +168,6 @@ impl ParquetFileMetrics { predicate_evaluation_errors, row_groups_matched_bloom_filter, row_groups_pruned_bloom_filter, - row_groups_fully_matched_statistics, - row_groups_matched_statistics, row_groups_pruned_statistics, limit_pruned_row_groups, bytes_scanned, diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 0099643649d07..a3cb112058939 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -62,8 +62,8 @@ use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader}; pub(super) struct ParquetOpener { /// Execution partition index pub(crate) partition_index: usize, - /// Projection to apply on top of the table schema (i.e. can reference partition columns). - pub projection: ProjectionExprs, + /// Column indexes in `table_schema` needed by the query + pub projection: Arc<[usize]>, /// Target number of rows in each output RecordBatch pub batch_size: usize, /// Optional limit on the number of rows to read @@ -101,8 +101,6 @@ pub(super) struct ParquetOpener { pub enable_row_group_stats_pruning: bool, /// Coerce INT96 timestamps to specific TimeUnit pub coerce_int96: Option, - /// Should limit pruning be applied - pub enable_limit_pruning: bool, /// Optional parquet FileDecryptionProperties #[cfg(feature = "parquet_encryption")] pub file_decryption_properties: Option>, @@ -155,7 +153,6 @@ impl FileOpener for ParquetOpener { let enable_bloom_filter = self.enable_bloom_filter; let enable_row_group_stats_pruning = self.enable_row_group_stats_pruning; let limit = self.limit; - let enable_limit_pruning = self.enable_limit_pruning; let predicate_creation_errors = MetricBuilder::new(&self.metrics) .global_counter("num_predicate_creation_errors"); @@ -167,8 +164,6 @@ impl FileOpener for ParquetOpener { #[cfg(feature = "parquet_encryption")] let encryption_context = self.get_encryption_context(); let max_predicate_cache_size = self.max_predicate_cache_size; - - let reverse_row_groups = self.reverse_row_groups; let preserve_order = self.preserve_order; Ok(Box::pin(async move { @@ -420,10 +415,8 @@ impl FileOpener for ParquetOpener { row_groups.prune_by_limit(limit, rg_metadata, &file_metrics); } - // -------------------------------------------------------- - // Step: prune pages from the kept row groups - // let mut access_plan = row_groups.build(); + // page index pruning: if all data on individual pages can // be ruled using page metadata, rows from other columns // with that range can be skipped as well @@ -798,249 +791,7 @@ mod test { use object_store::{memory::InMemory, path::Path, ObjectStore}; use parquet::arrow::ArrowWriter; - /// Builder for creating [`ParquetOpener`] instances with sensible defaults for tests. - /// This helps reduce code duplication and makes it clear what differs between test cases. - struct ParquetOpenerBuilder { - store: Option>, - table_schema: Option, - partition_index: usize, - projection_indices: Option>, - projection: Option, - batch_size: usize, - limit: Option, - predicate: Option>, - metadata_size_hint: Option, - metrics: ExecutionPlanMetricsSet, - pushdown_filters: bool, - reorder_filters: bool, - force_filter_selections: bool, - enable_page_index: bool, - enable_bloom_filter: bool, - enable_row_group_stats_pruning: bool, - coerce_int96: Option, - max_predicate_cache_size: Option, - reverse_row_groups: bool, - preserve_order: bool, - } - - impl ParquetOpenerBuilder { - /// Create a new builder with sensible defaults for tests. - fn new() -> Self { - Self { - store: None, - table_schema: None, - partition_index: 0, - projection_indices: None, - projection: None, - batch_size: 1024, - limit: None, - predicate: None, - metadata_size_hint: None, - metrics: ExecutionPlanMetricsSet::new(), - pushdown_filters: false, - reorder_filters: false, - force_filter_selections: false, - enable_page_index: false, - enable_bloom_filter: false, - enable_row_group_stats_pruning: false, - coerce_int96: None, - max_predicate_cache_size: None, - reverse_row_groups: false, - preserve_order: false, - } - } - - /// Set the object store (required for building). - fn with_store(mut self, store: Arc) -> Self { - self.store = Some(store); - self - } - - /// Create a simple table schema from a file schema (for files without partition columns). - fn with_schema(mut self, file_schema: SchemaRef) -> Self { - self.table_schema = Some(TableSchema::from_file_schema(file_schema)); - self - } - - /// Set a custom table schema (for files with partition columns). - fn with_table_schema(mut self, table_schema: TableSchema) -> Self { - self.table_schema = Some(table_schema); - self - } - - /// Set projection by column indices (convenience method for common case). - fn with_projection_indices(mut self, indices: &[usize]) -> Self { - self.projection_indices = Some(indices.to_vec()); - self - } - - /// Set the predicate. - fn with_predicate(mut self, predicate: Arc) -> Self { - self.predicate = Some(predicate); - self - } - - /// Enable pushdown filters. - fn with_pushdown_filters(mut self, enable: bool) -> Self { - self.pushdown_filters = enable; - self - } - - /// Enable filter reordering. - fn with_reorder_filters(mut self, enable: bool) -> Self { - self.reorder_filters = enable; - self - } - - /// Enable row group stats pruning. - fn with_row_group_stats_pruning(mut self, enable: bool) -> Self { - self.enable_row_group_stats_pruning = enable; - self - } - - /// Set reverse row groups flag. - fn with_reverse_row_groups(mut self, enable: bool) -> Self { - self.reverse_row_groups = enable; - self - } - - /// Build the ParquetOpener instance. - /// - /// # Panics - /// - /// Panics if required fields (store, schema/table_schema) are not set. - fn build(self) -> ParquetOpener { - let store = self - .store - .expect("ParquetOpenerBuilder: store must be set via with_store()"); - let table_schema = self.table_schema.expect( - "ParquetOpenerBuilder: table_schema must be set via with_schema() or with_table_schema()", - ); - let file_schema = Arc::clone(table_schema.file_schema()); - - let projection = if let Some(projection) = self.projection { - projection - } else if let Some(indices) = self.projection_indices { - ProjectionExprs::from_indices(&indices, &file_schema) - } else { - // Default: project all columns - let all_indices: Vec = (0..file_schema.fields().len()).collect(); - ProjectionExprs::from_indices(&all_indices, &file_schema) - }; - - ParquetOpener { - partition_index: self.partition_index, - projection, - batch_size: self.batch_size, - limit: self.limit, - predicate: self.predicate, - table_schema, - metadata_size_hint: self.metadata_size_hint, - metrics: self.metrics, - parquet_file_reader_factory: Arc::new( - DefaultParquetFileReaderFactory::new(store), - ), - pushdown_filters: self.pushdown_filters, - reorder_filters: self.reorder_filters, - force_filter_selections: self.force_filter_selections, - enable_page_index: self.enable_page_index, - enable_bloom_filter: self.enable_bloom_filter, - enable_row_group_stats_pruning: self.enable_row_group_stats_pruning, - coerce_int96: self.coerce_int96, - #[cfg(feature = "parquet_encryption")] - file_decryption_properties: None, - expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory), - #[cfg(feature = "parquet_encryption")] - encryption_factory: None, - max_predicate_cache_size: self.max_predicate_cache_size, - reverse_row_groups: self.reverse_row_groups, - preserve_order: self.preserve_order, - } - } - } - - fn constant_int_stats() -> (Statistics, SchemaRef) { - let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, false), - Field::new("b", DataType::Int32, false), - ])); - let statistics = Statistics { - num_rows: Precision::Exact(3), - total_byte_size: Precision::Absent, - column_statistics: vec![ - ColumnStatistics { - null_count: Precision::Exact(0), - max_value: Precision::Exact(ScalarValue::from(5i32)), - min_value: Precision::Exact(ScalarValue::from(5i32)), - sum_value: Precision::Absent, - distinct_count: Precision::Absent, - byte_size: Precision::Absent, - }, - ColumnStatistics::new_unknown(), - ], - }; - (statistics, schema) - } - - #[test] - fn extract_constant_columns_non_null() { - let (statistics, schema) = constant_int_stats(); - let constants = constant_columns_from_stats(Some(&statistics), &schema); - assert_eq!(constants.len(), 1); - assert_eq!(constants.get("a"), Some(&ScalarValue::from(5i32))); - assert!(!constants.contains_key("b")); - } - - #[test] - fn extract_constant_columns_all_null() { - let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)])); - let statistics = Statistics { - num_rows: Precision::Exact(2), - total_byte_size: Precision::Absent, - column_statistics: vec![ColumnStatistics { - null_count: Precision::Exact(2), - max_value: Precision::Absent, - min_value: Precision::Absent, - sum_value: Precision::Absent, - distinct_count: Precision::Absent, - byte_size: Precision::Absent, - }], - }; - - let constants = constant_columns_from_stats(Some(&statistics), &schema); - assert_eq!( - constants.get("a"), - Some(&ScalarValue::Utf8(None)), - "all-null column should be treated as constant null" - ); - } - - #[test] - fn rewrite_projection_to_literals() { - let (statistics, schema) = constant_int_stats(); - let constants = constant_columns_from_stats(Some(&statistics), &schema); - let projection = ProjectionExprs::from_indices(&[0, 1], &schema); - - let rewritten = projection - .try_map_exprs(|expr| replace_columns_with_literals(expr, &constants)) - .unwrap(); - let exprs = rewritten.as_ref(); - assert!(exprs[0].expr.as_any().downcast_ref::().is_some()); - assert!(exprs[1].expr.as_any().downcast_ref::().is_some()); - - // Only column `b` should remain in the projection mask - assert_eq!(rewritten.column_indices(), vec![1]); - } - - #[test] - fn rewrite_physical_expr_literal() { - let mut constants = ConstantColumns::new(); - constants.insert("a".to_string(), ScalarValue::from(7i32)); - let expr: Arc = Arc::new(Column::new("a", 0)); - - let rewritten = replace_columns_with_literals(expr, &constants).unwrap(); - assert!(rewritten.as_any().downcast_ref::().is_some()); - } + use crate::{opener::ParquetOpener, DefaultParquetFileReaderFactory}; async fn count_batches_and_rows( mut stream: std::pin::Pin< @@ -1134,6 +885,7 @@ mod test { projection: Arc::new([0, 1]), batch_size: 1024, limit: None, + preserve_order: false, predicate: Some(predicate), logical_file_schema: schema.clone(), metadata_size_hint: None, @@ -1146,7 +898,6 @@ mod test { reorder_filters: false, enable_page_index: false, enable_bloom_filter: false, - enable_limit_pruning: false, schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: true, coerce_int96: None, @@ -1204,6 +955,7 @@ mod test { projection: Arc::new([0]), batch_size: 1024, limit: None, + preserve_order: false, predicate: Some(predicate), logical_file_schema: file_schema.clone(), metadata_size_hint: None, @@ -1220,7 +972,6 @@ mod test { reorder_filters: false, enable_page_index: false, enable_bloom_filter: false, - enable_limit_pruning: false, schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: true, coerce_int96: None, @@ -1294,6 +1045,7 @@ mod test { projection: Arc::new([0]), batch_size: 1024, limit: None, + preserve_order: false, predicate: Some(predicate), logical_file_schema: file_schema.clone(), metadata_size_hint: None, @@ -1310,7 +1062,6 @@ mod test { reorder_filters: false, enable_page_index: false, enable_bloom_filter: false, - enable_limit_pruning: false, schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: true, coerce_int96: None, @@ -1387,6 +1138,7 @@ mod test { projection: Arc::new([0]), batch_size: 1024, limit: None, + preserve_order: false, predicate: Some(predicate), logical_file_schema: file_schema.clone(), metadata_size_hint: None, @@ -1403,7 +1155,6 @@ mod test { reorder_filters: true, enable_page_index: false, enable_bloom_filter: false, - enable_limit_pruning: false, schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: false, // note that this is false! coerce_int96: None, @@ -1480,6 +1231,7 @@ mod test { projection: Arc::new([0]), batch_size: 1024, limit: None, + preserve_order: false, predicate: Some(predicate), logical_file_schema: file_schema.clone(), metadata_size_hint: None, @@ -1496,7 +1248,6 @@ mod test { reorder_filters: false, enable_page_index: false, enable_bloom_filter: false, - enable_limit_pruning: false, schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: true, coerce_int96: None, @@ -1635,6 +1386,7 @@ mod test { projection: Arc::new([0, 1]), batch_size: 1024, limit: None, + preserve_order: false, predicate: Some(predicate), logical_file_schema: Arc::clone(&table_schema), metadata_size_hint: None, @@ -1647,7 +1399,6 @@ mod test { reorder_filters: false, enable_page_index: false, enable_bloom_filter: false, - enable_limit_pruning: false, schema_adapter_factory: Arc::new(CustomSchemaAdapterFactory), enable_row_group_stats_pruning: false, coerce_int96: None, diff --git a/datafusion/datasource-parquet/src/row_group_filter.rs b/datafusion/datasource-parquet/src/row_group_filter.rs index 698866a212ce0..6004da8ae65da 100644 --- a/datafusion/datasource-parquet/src/row_group_filter.rs +++ b/datafusion/datasource-parquet/src/row_group_filter.rs @@ -24,8 +24,8 @@ use arrow::datatypes::Schema; use datafusion_common::pruning::PruningStatistics; use datafusion_common::{Column, Result, ScalarValue}; use datafusion_datasource::FileRange; -use datafusion_physical_expr::PhysicalExprSimplifier; use datafusion_physical_expr::expressions::NotExpr; +use datafusion_physical_expr::PhysicalExprSimplifier; use datafusion_pruning::PruningPredicate; use parquet::arrow::arrow_reader::statistics::StatisticsConverter; use parquet::arrow::parquet_column; @@ -332,7 +332,7 @@ impl RowGroupAccessPlanFilter { // Simplify the NOT expression (e.g., NOT(c1 = 0) -> c1 != 0) // before building the pruning predicate - let simplifier = PhysicalExprSimplifier::new(arrow_schema); + let mut simplifier = PhysicalExprSimplifier::new(arrow_schema); let Ok(inverted_expr) = simplifier.simplify(inverted_expr) else { return; }; diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 6281259c191f0..ae1cab83ba470 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -576,7 +576,6 @@ impl FileSource for ParquetSource { enable_page_index: self.enable_page_index(), enable_bloom_filter: self.bloom_filter_on_read(), enable_row_group_stats_pruning: self.table_parquet_options.global.pruning, - enable_limit_pruning: base_config.limit_pruning, schema_adapter_factory, coerce_int96, #[cfg(feature = "parquet_encryption")] diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 5365ddda52760..20b4370772617 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -273,6 +273,7 @@ pub struct FileScanConfigBuilder { /// [`DataSourceExec`]: crate::source::DataSourceExec table_schema: TableSchema, file_source: Arc, + projection_indices: Option>, limit: Option, preserve_order: bool, constraints: Option, @@ -309,6 +310,7 @@ impl FileScanConfigBuilder { file_compression_type: None, new_lines_in_values: None, limit: None, + projection_indices: None, preserve_order: false, constraints: None, batch_size: None, @@ -479,6 +481,7 @@ impl FileScanConfigBuilder { table_schema, file_source, limit, + projection_indices, preserve_order, constraints, file_groups, @@ -516,6 +519,7 @@ impl FileScanConfigBuilder { table_schema, file_source, limit, + projection_exprs, preserve_order, constraints, file_groups, @@ -531,6 +535,7 @@ impl FileScanConfigBuilder { impl From for FileScanConfigBuilder { fn from(config: FileScanConfig) -> Self { + let projection_indices = config.projection_indices(); Self { object_store_url: config.object_store_url, table_schema: config.table_schema, @@ -541,6 +546,7 @@ impl From for FileScanConfigBuilder { file_compression_type: Some(config.file_compression_type), new_lines_in_values: Some(config.new_lines_in_values), limit: config.limit, + projection_indices: Some(projection_indices), preserve_order: config.preserve_order, constraints: Some(config.constraints), batch_size: config.batch_size, @@ -773,32 +779,6 @@ impl DataSource for FileScanConfig { } } - fn try_pushdown_sort( - &self, - order: &[PhysicalSortExpr], - ) -> Result>> { - // Delegate to FileSource to check if reverse scanning can satisfy the request. - let pushdown_result = self - .file_source - .try_reverse_output(order, &self.eq_properties())?; - - match pushdown_result { - SortOrderPushdownResult::Exact { inner } => { - Ok(SortOrderPushdownResult::Exact { - inner: self.rebuild_with_source(inner, true)?, - }) - } - SortOrderPushdownResult::Inexact { inner } => { - Ok(SortOrderPushdownResult::Inexact { - inner: self.rebuild_with_source(inner, false)?, - }) - } - SortOrderPushdownResult::Unsupported => { - Ok(SortOrderPushdownResult::Unsupported) - } - } - } - fn with_preserve_order(&self, preserve_order: bool) -> Option> { if self.preserve_order == preserve_order { return Some(Arc::new(self.clone())); diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 54369ece4fd9e..9227647bd07c8 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -190,25 +190,6 @@ pub trait DataSource: Send + Sync + Debug { )) } - /// Try to create a new DataSource that produces data in the specified sort order. - /// - /// # Arguments - /// * `order` - The desired output ordering - /// - /// # Returns - /// * `Ok(SortOrderPushdownResult::Exact { .. })` - Created a source that guarantees exact ordering - /// * `Ok(SortOrderPushdownResult::Inexact { .. })` - Created a source optimized for the ordering - /// * `Ok(SortOrderPushdownResult::Unsupported)` - Cannot optimize for this ordering - /// * `Err(e)` - Error occurred - /// - /// Default implementation returns `Unsupported`. - fn try_pushdown_sort( - &self, - _order: &[PhysicalSortExpr], - ) -> Result>> { - Ok(SortOrderPushdownResult::Unsupported) - } - /// Returns a variant of this `DataSource` that is aware of order-sensitivity. fn with_preserve_order(&self, _preserve_order: bool) -> Option> { None @@ -396,19 +377,6 @@ impl ExecutionPlan for DataSourceExec { } } - fn try_pushdown_sort( - &self, - order: &[PhysicalSortExpr], - ) -> Result>> { - // Delegate to the data source and wrap result with DataSourceExec - self.data_source - .try_pushdown_sort(order)? - .try_map(|new_data_source| { - let new_exec = self.clone().with_data_source(new_data_source); - Ok(Arc::new(new_exec) as Arc) - }) - } - fn with_preserve_order( &self, preserve_order: bool, diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index e9f968a13f07b..35866d1cfe8db 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -689,29 +689,6 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { None } - /// Try to push down sort ordering requirements to this node. - /// - /// This method is called during sort pushdown optimization to determine if this - /// node can optimize for a requested sort ordering. Implementations should: - /// - /// - Return [`SortOrderPushdownResult::Exact`] if the node can guarantee the exact - /// ordering (allowing the Sort operator to be removed) - /// - Return [`SortOrderPushdownResult::Inexact`] if the node can optimize for the - /// ordering but cannot guarantee perfect sorting (Sort operator is kept) - /// - Return [`SortOrderPushdownResult::Unsupported`] if the node cannot optimize - /// for the ordering - /// - /// For transparent nodes (that preserve ordering), implement this to delegate to - /// children and wrap the result with a new instance of this node. - /// - /// Default implementation returns `Unsupported`. - fn try_pushdown_sort( - &self, - _order: &[PhysicalSortExpr], - ) -> Result>> { - Ok(SortOrderPushdownResult::Unsupported) - } - /// Returns a variant of this `ExecutionPlan` that is aware of order-sensitivity. /// /// This is used to signal to data sources that the output ordering must be diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 71ef668fcd348..c86dd2ff166e2 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -571,19 +571,6 @@ impl ExecutionPlan for FilterExec { }) } - fn with_fetch(&self, fetch: Option) -> Option> { - Some(Arc::new(Self { - predicate: Arc::clone(&self.predicate), - input: Arc::clone(&self.input), - metrics: self.metrics.clone(), - default_selectivity: self.default_selectivity, - cache: self.cache.clone(), - projection: self.projection.clone(), - batch_size: self.batch_size, - fetch, - })) - } - fn with_preserve_order( &self, preserve_order: bool, diff --git a/datafusion/physical-plan/src/metrics/value.rs b/datafusion/physical-plan/src/metrics/value.rs index 81d0f66db2c2b..b71013833c725 100644 --- a/datafusion/physical-plan/src/metrics/value.rs +++ b/datafusion/physical-plan/src/metrics/value.rs @@ -385,17 +385,10 @@ impl Display for PruningMetrics { write!( f, "{} total → {} matched -> {} fully matched", - human_readable_count(total), - human_readable_count(matched), - human_readable_count(fully_matched) + total, matched, fully_matched ) } else { - write!( - f, - "{} total → {} matched", - human_readable_count(total), - human_readable_count(matched) - ) + write!(f, "{} total → {} matched", total, matched) } } } diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index fa1c4629d1795..4fe051dbacb4c 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -347,72 +347,6 @@ impl ExecutionPlan for ProjectionExec { Ok(FilterPushdownPropagation::if_all(child_pushdown_result)) } - fn try_pushdown_sort( - &self, - order: &[PhysicalSortExpr], - ) -> Result>> { - let child = self.input(); - let mut child_order = Vec::new(); - - // Check and transform sort expressions - for sort_expr in order { - // Recursively transform the expression - let mut can_pushdown = true; - let transformed = Arc::clone(&sort_expr.expr).transform(|expr| { - if let Some(col) = expr.as_any().downcast_ref::() { - // Check if column index is valid. - // This should always be true but fail gracefully if it's not. - if col.index() >= self.expr().len() { - can_pushdown = false; - return Ok(Transformed::no(expr)); - } - - let proj_expr = &self.expr()[col.index()]; - - // Check if projection expression is a simple column - // We cannot push down order by clauses that depend on - // projected computations as they would have nothing to reference. - if let Some(child_col) = - proj_expr.expr.as_any().downcast_ref::() - { - // Replace with the child column - Ok(Transformed::yes(Arc::new(child_col.clone()) as _)) - } else { - // Projection involves computation, cannot push down - can_pushdown = false; - Ok(Transformed::no(expr)) - } - } else { - Ok(Transformed::no(expr)) - } - })?; - - if !can_pushdown { - return Ok(SortOrderPushdownResult::Unsupported); - } - - child_order.push(PhysicalSortExpr { - expr: transformed.data, - options: sort_expr.options, - }); - } - - // Recursively push down to child node - match child.try_pushdown_sort(&child_order)? { - SortOrderPushdownResult::Exact { inner } => { - let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?; - Ok(SortOrderPushdownResult::Exact { inner: new_exec }) - } - SortOrderPushdownResult::Inexact { inner } => { - let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?; - Ok(SortOrderPushdownResult::Inexact { inner: new_exec }) - } - SortOrderPushdownResult::Unsupported => { - Ok(SortOrderPushdownResult::Unsupported) - } - } - } - fn with_preserve_order( &self, preserve_order: bool, From cf73630f5a197a8646f00e1d996518292348f47f Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Sun, 23 Nov 2025 16:08:32 +0800 Subject: [PATCH 3/5] feat(small): Support `` marker in `sqllogictest` for non-deterministic expected parts (#18857) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Which issue does this PR close? Part of https://github.com/apache/datafusion/issues/17612 ## Rationale for this change `sqllogictest`s are in general easier to maintain than rust tests, however it's not able to test `EXPLAIN ANALYZE` results, because their results include changing part: (in datafusion-cli) The `elapsed_compute` measurement changes from run to run. ``` > EXPLAIN ANALYZE SELECT * FROM generate_series(100); +-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | Plan with Metrics | LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=0, end=100, batch_size=8192], metrics=[output_rows=101, elapsed_compute=74.042µs, output_bytes=64.0 KB, output_batches=1] | | | | +-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ 1 row(s) fetched. Elapsed 0.006 seconds. ``` We can add a special marker to `sqllogictest` to skip those non-deterministic parts. ## What changes are included in this PR? - Changed `sqllogictest` validator to recognize `` marker - doc - slt test ## Are these changes tested? ## Are there any user-facing changes? --------- Co-authored-by: Martin Grigorov --- datafusion/sqllogictest/README.md | 11 +++ datafusion/sqllogictest/src/util.rs | 47 ++++++++++++ .../test_files/explain_analyze.slt | 27 +++++++ .../sqllogictest/test_files/slt_features.slt | 74 +++++++++++++++++++ 4 files changed, 159 insertions(+) create mode 100644 datafusion/sqllogictest/test_files/explain_analyze.slt create mode 100644 datafusion/sqllogictest/test_files/slt_features.slt diff --git a/datafusion/sqllogictest/README.md b/datafusion/sqllogictest/README.md index a389ae1ef60e2..8768deee3d87e 100644 --- a/datafusion/sqllogictest/README.md +++ b/datafusion/sqllogictest/README.md @@ -142,6 +142,17 @@ select substr('Andrew Lamb', 1, 6), '|' Andrew | ``` +## Cookbook: Ignoring volatile output + +Sometimes parts of a result change every run (timestamps, counters, etc.). To keep the rest of the snapshot checked in, replace those fragments with the `` marker inside the expected block. During validation the marker acts like a wildcard, so only the surrounding text must match. + +```text +query TT +EXPLAIN ANALYZE SELECT * FROM generate_series(100); +---- +Plan with Metrics LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=0, end=100, batch_size=8192], metrics=[output_rows=101, elapsed_compute=, output_bytes=] +``` + # Reference ## Running tests: Validation Mode diff --git a/datafusion/sqllogictest/src/util.rs b/datafusion/sqllogictest/src/util.rs index 695fe463fa676..721c559ab11ef 100644 --- a/datafusion/sqllogictest/src/util.rs +++ b/datafusion/sqllogictest/src/util.rs @@ -82,6 +82,10 @@ pub fn df_value_validator( actual: &[Vec], expected: &[String], ) -> bool { + // Support ignore marker to skip volatile parts of output. + const IGNORE_MARKER: &str = ""; + let contains_ignore_marker = expected.iter().any(|line| line.contains(IGNORE_MARKER)); + let normalized_expected = expected.iter().map(normalizer).collect::>(); let normalized_actual = actual .iter() @@ -89,6 +93,32 @@ pub fn df_value_validator( .map(|str| str.trim_end().to_string()) .collect_vec(); + // If ignore marker present, perform fragment-based matching on the full snapshot. + if contains_ignore_marker { + let expected_snapshot = normalized_expected.join("\n"); + let actual_snapshot = normalized_actual.join("\n"); + let fragments: Vec<&str> = expected_snapshot.split(IGNORE_MARKER).collect(); + let mut pos = 0; + for (i, frag) in fragments.iter().enumerate() { + if frag.is_empty() { + continue; + } + if let Some(idx) = actual_snapshot[pos..].find(frag) { + // Edge case: The following example is expected to fail + // Actual - 'foo bar baz' + // Expected - 'bar ' + if (i == 0) && (idx != 0) { + return false; + } + + pos += idx + frag.len(); + } else { + return false; + } + } + return true; + } + if log_enabled!(Warn) && normalized_actual != normalized_expected { warn!("df validation failed. actual vs expected:"); for i in 0..normalized_actual.len() { @@ -110,3 +140,20 @@ pub fn df_value_validator( pub fn is_spark_path(relative_path: &Path) -> bool { relative_path.starts_with("spark/") } + +#[cfg(test)] +mod tests { + use super::*; + + // Validation should fail for the below case: + // Actual - 'foo bar baz' + // Expected - 'bar ' + #[test] + fn ignore_marker_does_not_skip_leading_text() { + // Actual snapshot contains unexpected prefix before the expected fragment. + let actual = vec![vec!["foo bar baz".to_string()]]; + let expected = vec!["bar ".to_string()]; + + assert!(!df_value_validator(value_normalizer, &actual, &expected)); + } +} diff --git a/datafusion/sqllogictest/test_files/explain_analyze.slt b/datafusion/sqllogictest/test_files/explain_analyze.slt new file mode 100644 index 0000000000000..b213cd9565c86 --- /dev/null +++ b/datafusion/sqllogictest/test_files/explain_analyze.slt @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +statement ok +set datafusion.explain.analyze_level = summary; + +query TT +EXPLAIN ANALYZE SELECT * FROM generate_series(100); +---- +Plan with Metrics LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=0, end=100, batch_size=8192], metrics=[output_rows=101, elapsed_compute=, output_bytes=] + +statement ok +reset datafusion.explain.analyze_level; diff --git a/datafusion/sqllogictest/test_files/slt_features.slt b/datafusion/sqllogictest/test_files/slt_features.slt new file mode 100644 index 0000000000000..f3d467ea0d93a --- /dev/null +++ b/datafusion/sqllogictest/test_files/slt_features.slt @@ -0,0 +1,74 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# ================================= +# Test sqllogictest runner features +# ================================= + +# -------------------------- +# Test `` marker +# -------------------------- +query T +select 'DataFusion' +---- + + +query T +select 'DataFusion' +---- +Data + +query T +select 'DataFusion' +---- +Fusion + +query T +select 'Apache DataFusion'; +---- +Data + +query T +select 'DataFusion' +---- +DataFusion + +query T +select 'DataFusion' +---- +DataFusion + +query T +select 'DataFusion' +---- +DataFusion + +query I +select * from generate_series(3); +---- +0 +1 + +3 + +query I +select * from generate_series(3); +---- + +1 + + \ No newline at end of file From 8b795dd9db10b307c435170d6beedd8f7c09d1e1 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Tue, 27 Jan 2026 12:21:41 +0800 Subject: [PATCH 4/5] fix tests --- datafusion/physical-plan/src/metrics/value.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/metrics/value.rs b/datafusion/physical-plan/src/metrics/value.rs index b71013833c725..b66119b86913f 100644 --- a/datafusion/physical-plan/src/metrics/value.rs +++ b/datafusion/physical-plan/src/metrics/value.rs @@ -384,11 +384,10 @@ impl Display for PruningMetrics { if fully_matched != 0 { write!( f, - "{} total → {} matched -> {} fully matched", - total, matched, fully_matched + "{total} total → {matched} matched -> {fully_matched} fully matched", ) } else { - write!(f, "{} total → {} matched", total, matched) + write!(f, "{total} total → {matched} matched") } } } From 677bebec4a33ead697f131e3ad8071b01d665856 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Tue, 27 Jan 2026 13:43:58 +0800 Subject: [PATCH 5/5] fix test --- .../test_files/explain_analyze.slt | 27 ------------------- .../sqllogictest/test_files/limit_pruning.slt | 7 ++--- 2 files changed, 2 insertions(+), 32 deletions(-) delete mode 100644 datafusion/sqllogictest/test_files/explain_analyze.slt diff --git a/datafusion/sqllogictest/test_files/explain_analyze.slt b/datafusion/sqllogictest/test_files/explain_analyze.slt deleted file mode 100644 index b213cd9565c86..0000000000000 --- a/datafusion/sqllogictest/test_files/explain_analyze.slt +++ /dev/null @@ -1,27 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -statement ok -set datafusion.explain.analyze_level = summary; - -query TT -EXPLAIN ANALYZE SELECT * FROM generate_series(100); ----- -Plan with Metrics LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=0, end=100, batch_size=8192], metrics=[output_rows=101, elapsed_compute=, output_bytes=] - -statement ok -reset datafusion.explain.analyze_level; diff --git a/datafusion/sqllogictest/test_files/limit_pruning.slt b/datafusion/sqllogictest/test_files/limit_pruning.slt index 8a94bf8adc75f..90ee528ecf9b9 100644 --- a/datafusion/sqllogictest/test_files/limit_pruning.slt +++ b/datafusion/sqllogictest/test_files/limit_pruning.slt @@ -63,7 +63,7 @@ set datafusion.explain.analyze_level = summary; query TT explain analyze select * from tracking_data where species > 'M' AND s >= 50 limit 3; ---- -Plan with Metrics DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]}, projection=[species, s], limit=3, file_type=parquet, predicate=species@0 > M AND s@1 >= 50, pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50, required_guarantees=[], metrics=[output_rows=3, elapsed_compute=, output_bytes=, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_rows_pruned=3 total → 3 matched, limit_pruned_row_groups=2 total → 0 matched, bytes_scanned=, metadata_load_time=, scan_efficiency_ratio= (171/2.35 K)] +Plan with Metrics DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]}, projection=[species, s], limit=3, file_type=parquet, predicate=species@0 > M AND s@1 >= 50, pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50, required_guarantees=[], metrics=[output_rows=3, elapsed_compute=, output_bytes=, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_rows_pruned=3 total → 3 matched, limit_pruned_row_groups=2 total → 0 matched, bytes_scanned=, metadata_load_time=] # limit_pruned_row_groups=0 total → 0 matched # because of order by, scan needs to preserve sort, so limit pruning is disabled @@ -72,10 +72,7 @@ explain analyze select * from tracking_data where species > 'M' AND s >= 50 orde ---- Plan with Metrics 01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep], metrics=[output_rows=3, elapsed_compute=, output_bytes=] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, elapsed_compute=, output_bytes=, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_rows_pruned=9 total → 9 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=, metadata_load_time=, scan_efficiency_ratio= (521/2.35 K)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, elapsed_compute=, output_bytes=, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_rows_pruned=9 total → 9 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=, metadata_load_time=] statement ok drop table tracking_data; - -statement ok -reset datafusion.explain.analyze_level;