Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 66 additions & 22 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use arrow::{
record_batch::RecordBatch,
util::pretty::pretty_format_batches,
};
use arrow_schema::SchemaRef;
use chrono::{Datelike, Duration, TimeDelta};
use datafusion::{
datasource::{provider_as_source, TableProvider},
Expand Down Expand Up @@ -109,6 +110,26 @@ struct ContextWithParquet {
ctx: SessionContext,
}

struct PruningMetric {
total_pruned: usize,
total_matched: usize,
total_fully_matched: usize,
}

impl PruningMetric {
pub fn total_pruned(&self) -> usize {
self.total_pruned
}

pub fn total_matched(&self) -> usize {
self.total_matched
}

pub fn total_fully_matched(&self) -> usize {
self.total_fully_matched
}
}

/// The output of running one of the test cases
struct TestOutput {
/// The input query SQL
Expand All @@ -126,8 +147,8 @@ struct TestOutput {
impl TestOutput {
/// retrieve the value of the named metric, if any
fn metric_value(&self, metric_name: &str) -> Option<usize> {
if let Some((pruned, _matched)) = self.pruning_metric(metric_name) {
return Some(pruned);
if let Some(pm) = self.pruning_metric(metric_name) {
return Some(pm.total_pruned());
}

self.parquet_metrics
Expand All @@ -140,9 +161,10 @@ impl TestOutput {
})
}

fn pruning_metric(&self, metric_name: &str) -> Option<(usize, usize)> {
fn pruning_metric(&self, metric_name: &str) -> Option<PruningMetric> {
let mut total_pruned = 0;
let mut total_matched = 0;
let mut total_fully_matched = 0;
let mut found = false;

for metric in self.parquet_metrics.iter() {
Expand All @@ -154,13 +176,19 @@ impl TestOutput {
{
total_pruned += pruning_metrics.pruned();
total_matched += pruning_metrics.matched();
total_fully_matched += pruning_metrics.fully_matched();

found = true;
}
}
}

if found {
Some((total_pruned, total_matched))
Some(PruningMetric {
total_pruned,
total_matched,
total_fully_matched,
})
} else {
None
}
Expand All @@ -172,39 +200,33 @@ impl TestOutput {
}

/// The number of row_groups pruned / matched by bloom filter
fn row_groups_bloom_filter(&self) -> Option<(usize, usize)> {
fn row_groups_bloom_filter(&self) -> Option<PruningMetric> {
self.pruning_metric("row_groups_pruned_bloom_filter")
}

/// The number of row_groups matched by statistics
fn row_groups_matched_statistics(&self) -> Option<usize> {
self.pruning_metric("row_groups_pruned_statistics")
.map(|(_pruned, matched)| matched)
.map(|pm| pm.total_matched())
}

/*
/// The number of row_groups fully matched by statistics
fn row_groups_fully_matched_statistics(&self) -> Option<usize> {
self.metric_value("row_groups_fully_matched_statistics")
}

/// The number of row groups pruned by limit pruning
fn limit_pruned_row_groups(&self) -> Option<usize> {
self.metric_value("limit_pruned_row_groups")
self.pruning_metric("row_groups_pruned_statistics")
.map(|pm| pm.total_fully_matched())
}
*/

/// The number of row_groups pruned by statistics
fn row_groups_pruned_statistics(&self) -> Option<usize> {
self.pruning_metric("row_groups_pruned_statistics")
.map(|(pruned, _matched)| pruned)
.map(|pm| pm.total_pruned())
}

/// Metric `files_ranges_pruned_statistics` tracks both pruned and matched count,
/// for testing purpose, here it only aggregate the `pruned` count.
fn files_ranges_pruned_statistics(&self) -> Option<usize> {
self.pruning_metric("files_ranges_pruned_statistics")
.map(|(pruned, _matched)| pruned)
.map(|pm| pm.total_pruned())
}

/// The number of row_groups matched by bloom filter or statistics
Expand All @@ -213,22 +235,27 @@ impl TestOutput {
/// filter: 7 total -> 3 matched, this function returns 3 for the final matched
/// count.
fn row_groups_matched(&self) -> Option<usize> {
self.row_groups_bloom_filter()
.map(|(_pruned, matched)| matched)
self.row_groups_bloom_filter().map(|pm| pm.total_matched())
}

/// The number of row_groups pruned
fn row_groups_pruned(&self) -> Option<usize> {
self.row_groups_bloom_filter()
.map(|(pruned, _matched)| pruned)
.map(|pm| pm.total_pruned())
.zip(self.row_groups_pruned_statistics())
.map(|(a, b)| a + b)
}

/// The number of row pages pruned
fn row_pages_pruned(&self) -> Option<usize> {
self.pruning_metric("page_index_rows_pruned")
.map(|(pruned, _matched)| pruned)
.map(|pm| pm.total_pruned())
}

/// The number of row groups pruned by limit pruning
fn limit_pruned_row_groups(&self) -> Option<usize> {
self.pruning_metric("limit_pruned_row_groups")
.map(|pm| pm.total_pruned())
}

fn description(&self) -> String {
Expand All @@ -247,6 +274,23 @@ impl ContextWithParquet {
Self::with_config(scenario, unit, SessionConfig::new(), None, None).await
}

/// Set custom schema and batches for the test
pub async fn with_custom_data(
scenario: Scenario,
unit: Unit,
schema: Arc<Schema>,
batches: Vec<RecordBatch>,
) -> Self {
Self::with_config(
scenario,
unit,
SessionConfig::new(),
Some(schema),
Some(batches),
)
.await
}

// Set custom schema and batches for the test
/*
pub async fn with_custom_data(
Expand All @@ -270,7 +314,7 @@ impl ContextWithParquet {
scenario: Scenario,
unit: Unit,
mut config: SessionConfig,
custom_schema: Option<Arc<Schema>>,
custom_schema: Option<SchemaRef>,
custom_batches: Option<Vec<RecordBatch>>,
) -> Self {
// Use a single partition for deterministic results no matter how many CPUs the host has
Expand Down Expand Up @@ -1109,7 +1153,7 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
async fn make_test_file_rg(
scenario: Scenario,
row_per_group: usize,
custom_schema: Option<Arc<Schema>>,
custom_schema: Option<SchemaRef>,
custom_batches: Option<Vec<RecordBatch>>,
) -> NamedTempFile {
let mut output_file = tempfile::Builder::new()
Expand Down
93 changes: 75 additions & 18 deletions datafusion/core/tests/parquet/row_group_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
//! This file contains an end to end test of parquet pruning. It writes
//! data into a parquet file and then verifies row groups are pruned as
//! expected.
use std::sync::Arc;

use arrow::array::{ArrayRef, Int32Array, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use datafusion::prelude::SessionConfig;
use datafusion_common::ScalarValue;
use datafusion_common::{DataFusionError, ScalarValue};
use itertools::Itertools;

use crate::parquet::Unit::RowGroup;
Expand All @@ -30,12 +34,12 @@ struct RowGroupPruningTest {
query: String,
expected_errors: Option<usize>,
expected_row_group_matched_by_statistics: Option<usize>,
// expected_row_group_fully_matched_by_statistics: Option<usize>,
expected_row_group_fully_matched_by_statistics: Option<usize>,
expected_row_group_pruned_by_statistics: Option<usize>,
expected_files_pruned_by_statistics: Option<usize>,
expected_row_group_matched_by_bloom_filter: Option<usize>,
expected_row_group_pruned_by_bloom_filter: Option<usize>,
// expected_limit_pruned_row_groups: Option<usize>,
expected_limit_pruned_row_groups: Option<usize>,
expected_rows: usize,
}
impl RowGroupPruningTest {
Expand All @@ -47,11 +51,11 @@ impl RowGroupPruningTest {
expected_errors: None,
expected_row_group_matched_by_statistics: None,
expected_row_group_pruned_by_statistics: None,
// expected_row_group_fully_matched_by_statistics: None,
expected_row_group_fully_matched_by_statistics: None,
expected_files_pruned_by_statistics: None,
expected_row_group_matched_by_bloom_filter: None,
expected_row_group_pruned_by_bloom_filter: None,
// expected_limit_pruned_row_groups: None,
expected_limit_pruned_row_groups: None,
expected_rows: 0,
}
}
Expand Down Expand Up @@ -81,7 +85,6 @@ impl RowGroupPruningTest {
}

// Set the expected fully matched row groups by statistics
/*
fn with_fully_matched_by_stats(
mut self,
fully_matched_by_stats: Option<usize>,
Expand All @@ -90,12 +93,6 @@ impl RowGroupPruningTest {
self
}

fn with_limit_pruned_row_groups(mut self, pruned_by_limit: Option<usize>) -> Self {
self.expected_limit_pruned_row_groups = pruned_by_limit;
self
}
*/

// Set the expected pruned row groups by statistics
fn with_pruned_by_stats(mut self, pruned_by_stats: Option<usize>) -> Self {
self.expected_row_group_pruned_by_statistics = pruned_by_stats;
Expand All @@ -119,6 +116,11 @@ impl RowGroupPruningTest {
self
}

fn with_limit_pruned_row_groups(mut self, pruned_by_limit: Option<usize>) -> Self {
self.expected_limit_pruned_row_groups = pruned_by_limit;
self
}

/// Set the number of expected rows from the output of this test
fn with_expected_rows(mut self, rows: usize) -> Self {
self.expected_rows = rows;
Expand Down Expand Up @@ -155,12 +157,12 @@ impl RowGroupPruningTest {
);
let bloom_filter_metrics = output.row_groups_bloom_filter();
assert_eq!(
bloom_filter_metrics.map(|(_pruned, matched)| matched),
bloom_filter_metrics.as_ref().map(|pm| pm.total_matched()),
self.expected_row_group_matched_by_bloom_filter,
"mismatched row_groups_matched_bloom_filter",
);
assert_eq!(
bloom_filter_metrics.map(|(pruned, _matched)| pruned),
bloom_filter_metrics.map(|pm| pm.total_pruned()),
self.expected_row_group_pruned_by_bloom_filter,
"mismatched row_groups_pruned_bloom_filter",
);
Expand All @@ -175,6 +177,64 @@ impl RowGroupPruningTest {
);
}

// Execute the test with the current configuration
async fn test_row_group_prune_with_custom_data(
self,
schema: Arc<Schema>,
batches: Vec<RecordBatch>,
max_row_per_group: usize,
) {
let output = ContextWithParquet::with_custom_data(
self.scenario,
RowGroup(max_row_per_group),
schema,
batches,
)
.await
.query(&self.query)
.await;

println!("{}", output.description());
assert_eq!(
output.predicate_evaluation_errors(),
self.expected_errors,
"mismatched predicate_evaluation error"
);
assert_eq!(
output.row_groups_matched_statistics(),
self.expected_row_group_matched_by_statistics,
"mismatched row_groups_matched_statistics",
);
assert_eq!(
output.row_groups_fully_matched_statistics(),
self.expected_row_group_fully_matched_by_statistics,
"mismatched row_groups_fully_matched_statistics",
);
assert_eq!(
output.row_groups_pruned_statistics(),
self.expected_row_group_pruned_by_statistics,
"mismatched row_groups_pruned_statistics",
);
assert_eq!(
output.files_ranges_pruned_statistics(),
self.expected_files_pruned_by_statistics,
"mismatched files_ranges_pruned_statistics",
);
assert_eq!(
output.limit_pruned_row_groups(),
self.expected_limit_pruned_row_groups,
"mismatched limit_pruned_row_groups",
);
assert_eq!(
output.result_rows,
self.expected_rows,
"Expected {} rows, got {}: {}",
output.result_rows,
self.expected_rows,
output.description(),
);
}

// Execute the test with the current configuration
/*
async fn test_row_group_prune_with_custom_data(
Expand Down Expand Up @@ -1723,7 +1783,6 @@ async fn test_bloom_filter_decimal_dict() {
.await;
}

/*
// Helper function to create a batch with a single Int32 column.
fn make_i32_batch(
name: &str,
Expand Down Expand Up @@ -1950,15 +2009,13 @@ async fn test_limit_pruning_exceeds_fully_matched() -> datafusion_common::error:
.with_scenario(Scenario::Int)
.with_query(query)
.with_expected_errors(Some(0))
.with_expected_rows(10) // Total: 1 + 3 + 4 + 1 = 9 (less than limit)
.with_expected_rows(10) // Total: 1 + 4 + 4 + 1 = 10
.with_pruned_files(Some(0))
.with_matched_by_stats(Some(4)) // RG0,1,2,3 matched
.with_fully_matched_by_stats(Some(2))
.with_pruned_by_stats(Some(1)) // RG4 pruned
.with_limit_pruned_row_groups(Some(0)) // No limit pruning since we need all RGs
.test_row_group_prune_with_custom_data(schema, batches, 4)
.await;

Ok(())
}
*/
Loading