Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -804,3 +804,41 @@ Getting results...
cancelling thread
done dropping runtime in 83.531417ms
```

## Sorted Data Benchmarks

### Data Sorted ClickBench

Benchmark for queries on pre-sorted data to test sort order optimization.
This benchmark uses a subset of the ClickBench dataset (hits.parquet, ~14GB) that has been pre-sorted by the EventTime column. The queries are designed to test DataFusion's performance when the data is already sorted as is common in timeseries workloads.

The benchmark includes queries that:
- Scan pre-sorted data with ORDER BY clauses that match the sort order
- Test reverse scans on sorted data
- Verify the performance result

#### Generating Sorted Data

The sorted dataset is automatically generated from the ClickBench partitioned dataset. You can configure the memory used during the sorting process with the `DATAFUSION_MEMORY_GB` environment variable. The default memory limit is 12GB.
```bash
./bench.sh data data_sorted_clickbench
```

To create the sorted dataset, for example with 16GB of memory, run:

```bash
DATAFUSION_MEMORY_GB=16 ./bench.sh data data_sorted_clickbench
```

This command will:
1. Download the ClickBench partitioned dataset if not present
2. Sort hits.parquet by EventTime in ascending order
3. Save the sorted file as hits_sorted.parquet

#### Running the Benchmark

```bash
./bench.sh run data_sorted_clickbench
```

This runs queries against the pre-sorted dataset with the `--sorted-by EventTime` flag, which informs DataFusion that the data is pre-sorted, allowing it to optimize away redundant sort operations.
118 changes: 117 additions & 1 deletion benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ clickbench_partitioned: ClickBench queries against partitioned (100 files) parqu
clickbench_pushdown: ClickBench queries against partitioned (100 files) parquet w/ filter_pushdown enabled
clickbench_extended: ClickBench \"inspired\" queries against a single parquet (DataFusion specific)

# Sorted Data Benchmarks (ORDER BY Optimization)
clickbench_sorted: ClickBench queries on pre-sorted data using prefer_existing_sort (tests sort elimination optimization)

# H2O.ai Benchmarks (Group By, Join, Window)
h2o_small: h2oai benchmark with small dataset (1e7 rows) for groupby, default file format is csv
h2o_medium: h2oai benchmark with medium dataset (1e8 rows) for groupby, default file format is csv
Expand Down Expand Up @@ -314,6 +317,9 @@ main() {
compile_profile)
data_tpch "1"
;;
clickbench_sorted)
clickbench_sorted
;;
*)
echo "Error: unknown benchmark '$BENCHMARK' for data generation"
usage
Expand Down Expand Up @@ -445,7 +451,7 @@ main() {
h2o_medium_window)
run_h2o_window "MEDIUM" "CSV" "window"
;;
h2o_big_window)
h2o_big_window)
run_h2o_window "BIG" "CSV" "window"
;;
h2o_small_parquet)
Expand Down Expand Up @@ -497,6 +503,9 @@ main() {
compile_profile)
run_compile_profile "${PROFILE_ARGS[@]}"
;;
clickbench_sorted)
run_clickbench_sorted
;;
*)
echo "Error: unknown benchmark '$BENCHMARK' for run"
usage
Expand Down Expand Up @@ -1189,6 +1198,113 @@ compare_benchmarks() {

}

# Creates sorted ClickBench data from hits.parquet (full dataset)
# The data is sorted by EventTime in ascending order
# Uses datafusion-cli to reduce dependencies
clickbench_sorted() {
SORTED_FILE="${DATA_DIR}/hits_sorted.parquet"
ORIGINAL_FILE="${DATA_DIR}/hits.parquet"

# Default memory limit is 12GB, can be overridden with DATAFUSION_MEMORY_GB env var
MEMORY_LIMIT_GB=${DATAFUSION_MEMORY_GB:-12}

echo "Creating sorted ClickBench dataset from hits.parquet..."
echo "Configuration:"
echo " Memory limit: ${MEMORY_LIMIT_GB}G"
echo " Row group size: 64K rows"
echo " Compression: uncompressed"

if [ ! -f "${ORIGINAL_FILE}" ]; then
echo "hits.parquet not found. Running data_clickbench_1 first..."
data_clickbench_1
fi

if [ -f "${SORTED_FILE}" ]; then
echo "Sorted hits.parquet already exists at ${SORTED_FILE}"
return 0
fi

echo "Sorting hits.parquet by EventTime (this may take several minutes)..."

pushd "${DATAFUSION_DIR}" > /dev/null
echo "Building datafusion-cli..."
cargo build --release --bin datafusion-cli
DATAFUSION_CLI="${DATAFUSION_DIR}/target/release/datafusion-cli"
popd > /dev/null


START_TIME=$(date +%s)
echo "Start time: $(date '+%Y-%m-%d %H:%M:%S')"
echo "Using datafusion-cli to create sorted parquet file..."
"${DATAFUSION_CLI}" << EOF
-- Memory and performance configuration
SET datafusion.runtime.memory_limit = '${MEMORY_LIMIT_GB}G';
SET datafusion.execution.spill_compression = 'uncompressed';
SET datafusion.execution.sort_spill_reservation_bytes = 10485760; -- 10MB
SET datafusion.execution.batch_size = 8192;
SET datafusion.execution.target_partitions = 1;

-- Parquet output configuration
SET datafusion.execution.parquet.max_row_group_size = 65536;
SET datafusion.execution.parquet.compression = 'uncompressed';

-- Execute sort and write
COPY (SELECT * FROM '${ORIGINAL_FILE}' ORDER BY "EventTime")
TO '${SORTED_FILE}'
STORED AS PARQUET;
EOF

local result=$?

END_TIME=$(date +%s)
DURATION=$((END_TIME - START_TIME))
echo "End time: $(date '+%Y-%m-%d %H:%M:%S')"

if [ $result -eq 0 ]; then
echo "✓ Successfully created sorted ClickBench dataset"

INPUT_SIZE=$(stat -f%z "${ORIGINAL_FILE}" 2>/dev/null || stat -c%s "${ORIGINAL_FILE}" 2>/dev/null)
OUTPUT_SIZE=$(stat -f%z "${SORTED_FILE}" 2>/dev/null || stat -c%s "${SORTED_FILE}" 2>/dev/null)
INPUT_MB=$((INPUT_SIZE / 1024 / 1024))
OUTPUT_MB=$((OUTPUT_SIZE / 1024 / 1024))

echo " Input: ${INPUT_MB} MB"
echo " Output: ${OUTPUT_MB} MB"

echo ""
echo "Time Statistics:"
echo " Total duration: ${DURATION} seconds ($(printf '%02d:%02d:%02d' $((DURATION/3600)) $((DURATION%3600/60)) $((DURATION%60))))"
echo " Throughput: $((INPUT_MB / DURATION)) MB/s"

return 0
else
echo "✗ Error: Failed to create sorted dataset"
echo "💡 Tip: Try increasing memory with: DATAFUSION_MEMORY_GB=16 ./bench.sh data clickbench_sorted"
return 1
fi
}

# Runs the sorted data benchmark with prefer_existing_sort configuration
run_clickbench_sorted() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_sorted.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running sorted data benchmark with prefer_existing_sort optimization..."

# Ensure sorted data exists
clickbench_sorted

# Run benchmark with prefer_existing_sort configuration
# This allows DataFusion to optimize away redundant sorts while maintaining parallelism
debug_run $CARGO_COMMAND --bin dfbench -- clickbench \
--iterations 5 \
--path "${DATA_DIR}/hits_sorted.parquet" \
--queries-path "${SCRIPT_DIR}/queries/clickbench/queries/sorted_data" \
--sorted-by "EventTime" \
-c datafusion.optimizer.prefer_existing_sort=true \
-o "${RESULTS_FILE}" \
${QUERY_ARG}
}

setup_venv() {
python3 -m venv "$VIRTUAL_ENV"
PATH=$VIRTUAL_ENV/bin:$PATH python3 -m pip install -r requirements.txt
Expand Down
3 changes: 3 additions & 0 deletions benchmarks/queries/clickbench/queries/sorted_data/q0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- Must set for ClickBench hits_partitioned dataset. See https://github.com/apache/datafusion/issues/16591
-- set datafusion.execution.parquet.binary_as_string = true
SELECT * FROM hits ORDER BY "EventTime" DESC limit 10;
115 changes: 106 additions & 9 deletions benchmarks/src/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,27 @@ pub struct RunOpt {
/// If present, write results json here
#[structopt(parse(from_os_str), short = "o", long = "output")]
output_path: Option<PathBuf>,

/// Column name that the data is sorted by (e.g., "EventTime")
/// If specified, DataFusion will be informed that the data has this sort order
/// using CREATE EXTERNAL TABLE with WITH ORDER clause.
///
/// Recommended to use with: -c datafusion.optimizer.prefer_existing_sort=true
/// This allows DataFusion to optimize away redundant sorts while maintaining
/// multi-core parallelism for other operations.
#[structopt(long = "sorted-by")]
sorted_by: Option<String>,

/// Sort order: ASC or DESC (default: ASC)
#[structopt(long = "sort-order", default_value = "ASC")]
sort_order: String,

/// Configuration options in the format key=value
/// Can be specified multiple times.
///
/// Example: -c datafusion.optimizer.prefer_existing_sort=true
#[structopt(short = "c", long = "config")]
config_options: Vec<String>,
}

/// Get the SQL file path
Expand Down Expand Up @@ -125,6 +146,37 @@ impl RunOpt {

// configure parquet options
let mut config = self.common.config()?;

if self.sorted_by.is_some() {
println!("ℹ️ Data is registered with sort order");

let has_prefer_sort = self
.config_options
.iter()
.any(|opt| opt.contains("prefer_existing_sort=true"));

if !has_prefer_sort {
println!("ℹ️ Consider using -c datafusion.optimizer.prefer_existing_sort=true");
println!("ℹ️ to optimize queries while maintaining parallelism");
}
}

// Apply user-provided configuration options
for config_opt in &self.config_options {
let parts: Vec<&str> = config_opt.splitn(2, '=').collect();
if parts.len() != 2 {
return Err(exec_datafusion_err!(
"Invalid config option format: '{}'. Expected 'key=value'",
config_opt
));
}
let key = parts[0];
let value = parts[1];

println!("Setting config: {key} = {value}");
config = config.set_str(key, value);
}

{
let parquet_options = &mut config.options_mut().execution.parquet;
// The hits_partitioned dataset specifies string columns
Expand All @@ -136,10 +188,18 @@ impl RunOpt {
parquet_options.pushdown_filters = true;
parquet_options.reorder_filters = true;
}

if self.sorted_by.is_some() {
// We should compare the dynamic topk optimization when data is sorted, so we make the
// assumption that filter pushdown is also enabled in this case.
parquet_options.pushdown_filters = true;
parquet_options.reorder_filters = true;
}
}

let rt_builder = self.common.runtime_env_builder()?;
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);

self.register_hits(&ctx).await?;

let mut benchmark_run = BenchmarkRun::new();
Expand Down Expand Up @@ -214,17 +274,54 @@ impl RunOpt {
}

/// Registers the `hits.parquet` as a table named `hits`
/// If sorted_by is specified, uses CREATE EXTERNAL TABLE with WITH ORDER
async fn register_hits(&self, ctx: &SessionContext) -> Result<()> {
let options = Default::default();
let path = self.path.as_os_str().to_str().unwrap();
ctx.register_parquet("hits", path, options)
.await
.map_err(|e| {
DataFusionError::Context(
format!("Registering 'hits' as {path}"),
Box::new(e),
)
})

// If sorted_by is specified, use CREATE EXTERNAL TABLE with WITH ORDER
if let Some(ref sort_column) = self.sorted_by {
println!(
"Registering table with sort order: {} {}",
sort_column, self.sort_order
);

// Escape column name with double quotes
let escaped_column = if sort_column.contains('"') {
sort_column.clone()
} else {
format!("\"{sort_column}\"")
};

// Build CREATE EXTERNAL TABLE DDL with WITH ORDER clause
// Schema will be automatically inferred from the Parquet file
let create_table_sql = format!(
"CREATE EXTERNAL TABLE hits \
STORED AS PARQUET \
LOCATION '{}' \
WITH ORDER ({} {})",
path,
escaped_column,
self.sort_order.to_uppercase()
);

println!("Executing: {create_table_sql}");

// Execute the CREATE EXTERNAL TABLE statement
ctx.sql(&create_table_sql).await?.collect().await?;

Ok(())
} else {
// Original registration without sort order
let options = Default::default();
ctx.register_parquet("hits", path, options)
.await
.map_err(|e| {
DataFusionError::Context(
format!("Registering 'hits' as {path}"),
Box::new(e),
)
})
}
}

fn iterations(&self) -> usize {
Expand Down
15 changes: 15 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -983,6 +983,21 @@ config_namespace! {
/// then the output will be coerced to a non-view.
/// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
pub expand_views_at_output: bool, default = false

/// Enable sort pushdown optimization.
/// When enabled, attempts to push sort requirements down to data sources
/// that can natively handle them (e.g., by reversing file/row group read order).
///
/// Returns **inexact ordering**: Sort operator is kept for correctness,
/// but optimized input enables early termination for TopK queries (ORDER BY ... LIMIT N),
/// providing significant speedup.
///
/// Memory: No additional overhead (only changes read order).
///
/// Future: Will add option to detect perfectly sorted data and eliminate Sort completely.
///
/// Default: true
pub enable_sort_pushdown: bool, default = true
}
}

Expand Down
1 change: 1 addition & 0 deletions datafusion/core/tests/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ mod limit_pushdown;
mod limited_distinct_aggregation;
mod partition_statistics;
mod projection_pushdown;
mod pushdown_sort;
mod replace_with_order_preserving_variants;
mod sanity_checker;
mod test_utils;
Expand Down
Loading