Skip to content

Commit c0e8bb5

Browse files
authored
Push down InList or hash table references from HashJoinExec depending on the size of the build side (#18393)
This PR is part of an EPIC to push down hash table references from HashJoinExec into scans. The EPIC is tracked in #17171. A "target state" is tracked in #18393 (*this PR*). There is a series of PRs to get us to this target state in smaller more reviewable changes that are still valuable on their own: - #18448 - #18449 (depends on #18448) - #18451 As those are merged I will rebase this PR to keep track of the "remaining work", and we can use this PR to explore big picture ideas or benchmarks of the final state.
1 parent 20870c1 commit c0e8bb5

File tree

16 files changed

+1193
-123
lines changed

16 files changed

+1193
-123
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/common/src/config.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1024,6 +1024,36 @@ config_namespace! {
10241024
/// will be collected into a single partition
10251025
pub hash_join_single_partition_threshold_rows: usize, default = 1024 * 128
10261026

1027+
/// Maximum size in bytes for the build side of a hash join to be pushed down as an InList expression for dynamic filtering.
1028+
/// Build sides larger than this will use hash table lookups instead.
1029+
/// Set to 0 to always use hash table lookups.
1030+
///
1031+
/// InList pushdown can be more efficient for small build sides because it can result in better
1032+
/// statistics pruning as well as use any bloom filters present on the scan side.
1033+
/// InList expressions are also more transparent and easier to serialize over the network in distributed uses of DataFusion.
1034+
/// On the other hand InList pushdown requires making a copy of the data and thus adds some overhead to the build side and uses more memory.
1035+
///
1036+
/// This setting is per-partition, so we may end up using `hash_join_inlist_pushdown_max_size` * `target_partitions` memory.
1037+
///
1038+
/// The default is 128kB per partition.
1039+
/// This should allow point lookup joins (e.g. joining on a unique primary key) to use InList pushdown in most cases
1040+
/// but avoids excessive memory usage or overhead for larger joins.
1041+
pub hash_join_inlist_pushdown_max_size: usize, default = 128 * 1024
1042+
1043+
/// Maximum number of distinct values (rows) in the build side of a hash join to be pushed down as an InList expression for dynamic filtering.
1044+
/// Build sides with more rows than this will use hash table lookups instead.
1045+
/// Set to 0 to always use hash table lookups.
1046+
///
1047+
/// This provides an additional limit beyond `hash_join_inlist_pushdown_max_size` to prevent
1048+
/// very large IN lists that might not provide much benefit over hash table lookups.
1049+
///
1050+
/// This uses the deduplicated row count once the build side has been evaluated.
1051+
///
1052+
/// The default is 150 values per partition.
1053+
/// This is inspired by Trino's `max-filter-keys-per-column` setting.
1054+
/// See: <https://trino.io/docs/current/admin/dynamic-filtering.html#dynamic-filter-collection-thresholds>
1055+
pub hash_join_inlist_pushdown_max_distinct_values: usize, default = 150
1056+
10271057
/// The default filter selectivity used by Filter Statistics
10281058
/// when an exact selectivity cannot be determined. Valid values are
10291059
/// between 0 (no selectivity) and 100 (all rows are selected).

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 448 additions & 8 deletions
Large diffs are not rendered by default.

datafusion/physical-expr/src/expressions/in_list.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,14 @@ impl InListExpr {
321321
&self.list
322322
}
323323

324+
pub fn is_empty(&self) -> bool {
325+
self.list.is_empty()
326+
}
327+
328+
pub fn len(&self) -> usize {
329+
self.list.len()
330+
}
331+
324332
/// Is this negated e.g. NOT IN LIST
325333
pub fn negated(&self) -> bool {
326334
self.negated

datafusion/physical-plan/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ datafusion-common = { workspace = true }
5757
datafusion-common-runtime = { workspace = true, default-features = true }
5858
datafusion-execution = { workspace = true }
5959
datafusion-expr = { workspace = true }
60+
datafusion-functions = { workspace = true }
6061
datafusion-functions-aggregate-common = { workspace = true }
6162
datafusion-functions-window-common = { workspace = true }
6263
datafusion-physical-expr = { workspace = true, default-features = true }

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@ use crate::filter_pushdown::{
2626
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
2727
FilterPushdownPropagation,
2828
};
29+
use crate::joins::hash_join::inlist_builder::build_struct_inlist_values;
2930
use crate::joins::hash_join::shared_bounds::{
30-
ColumnBounds, PartitionBounds, SharedBuildAccumulator,
31+
ColumnBounds, PartitionBounds, PushdownStrategy, SharedBuildAccumulator,
3132
};
3233
use crate::joins::hash_join::stream::{
3334
BuildSide, BuildSideInitialState, HashJoinStream, HashJoinStreamState,
@@ -86,7 +87,7 @@ use futures::TryStreamExt;
8687
use parking_lot::Mutex;
8788

8889
/// Hard-coded seed to ensure hash values from the hash join differ from `RepartitionExec`, avoiding collisions.
89-
const HASH_JOIN_SEED: RandomState =
90+
pub(crate) const HASH_JOIN_SEED: RandomState =
9091
RandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64);
9192

9293
/// HashTable and input data for the left (build side) of a join
@@ -112,6 +113,9 @@ pub(super) struct JoinLeftData {
112113
/// If the partition is empty (no rows) this will be None.
113114
/// If the partition has some rows this will be Some with the bounds for each join key column.
114115
pub(super) bounds: Option<PartitionBounds>,
116+
/// Membership testing strategy for filter pushdown
117+
/// Contains either InList values for small build sides or hash table reference for large build sides
118+
pub(super) membership: PushdownStrategy,
115119
}
116120

117121
impl JoinLeftData {
@@ -135,6 +139,11 @@ impl JoinLeftData {
135139
&self.visited_indices_bitmap
136140
}
137141

142+
/// returns a reference to the InList values for filter pushdown
143+
pub(super) fn membership(&self) -> &PushdownStrategy {
144+
&self.membership
145+
}
146+
138147
/// Decrements the counter of running threads, and returns `true`
139148
/// if caller is the last running thread
140149
pub(super) fn report_probe_completed(&self) -> bool {
@@ -929,6 +938,16 @@ impl ExecutionPlan for HashJoinExec {
929938
need_produce_result_in_final(self.join_type),
930939
self.right().output_partitioning().partition_count(),
931940
enable_dynamic_filter_pushdown,
941+
context
942+
.session_config()
943+
.options()
944+
.optimizer
945+
.hash_join_inlist_pushdown_max_size,
946+
context
947+
.session_config()
948+
.options()
949+
.optimizer
950+
.hash_join_inlist_pushdown_max_distinct_values,
932951
))
933952
})?,
934953
PartitionMode::Partitioned => {
@@ -947,6 +966,16 @@ impl ExecutionPlan for HashJoinExec {
947966
need_produce_result_in_final(self.join_type),
948967
1,
949968
enable_dynamic_filter_pushdown,
969+
context
970+
.session_config()
971+
.options()
972+
.optimizer
973+
.hash_join_inlist_pushdown_max_size,
974+
context
975+
.session_config()
976+
.options()
977+
.optimizer
978+
.hash_join_inlist_pushdown_max_distinct_values,
950979
))
951980
}
952981
PartitionMode::Auto => {
@@ -1346,6 +1375,8 @@ async fn collect_left_input(
13461375
with_visited_indices_bitmap: bool,
13471376
probe_threads_count: usize,
13481377
should_compute_dynamic_filters: bool,
1378+
max_inlist_size: usize,
1379+
max_inlist_distinct_values: usize,
13491380
) -> Result<JoinLeftData> {
13501381
let schema = left_stream.schema();
13511382

@@ -1469,6 +1500,29 @@ async fn collect_left_input(
14691500
// Convert Box to Arc for sharing with SharedBuildAccumulator
14701501
let hash_map: Arc<dyn JoinHashMapType> = hashmap.into();
14711502

1503+
let membership = if num_rows == 0 {
1504+
PushdownStrategy::Empty
1505+
} else {
1506+
// If the build side is small enough we can use IN list pushdown.
1507+
// If it's too big we fall back to pushing down a reference to the hash table.
1508+
// See `PushdownStrategy` for more details.
1509+
let estimated_size = left_values
1510+
.iter()
1511+
.map(|arr| arr.get_array_memory_size())
1512+
.sum::<usize>();
1513+
if left_values.is_empty()
1514+
|| left_values[0].is_empty()
1515+
|| estimated_size > max_inlist_size
1516+
|| hash_map.len() > max_inlist_distinct_values
1517+
{
1518+
PushdownStrategy::HashTable(Arc::clone(&hash_map))
1519+
} else if let Some(in_list_values) = build_struct_inlist_values(&left_values)? {
1520+
PushdownStrategy::InList(in_list_values)
1521+
} else {
1522+
PushdownStrategy::HashTable(Arc::clone(&hash_map))
1523+
}
1524+
};
1525+
14721526
let data = JoinLeftData {
14731527
hash_map,
14741528
batch,
@@ -1477,6 +1531,7 @@ async fn collect_left_input(
14771531
probe_threads_counter: AtomicUsize::new(probe_threads_count),
14781532
_reservation: reservation,
14791533
bounds,
1534+
membership,
14801535
};
14811536

14821537
Ok(data)
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Utilities for building InList expressions from hash join build side data
19+
20+
use std::sync::Arc;
21+
22+
use arrow::array::{ArrayRef, StructArray};
23+
use arrow::datatypes::{Field, FieldRef, Fields};
24+
use arrow::downcast_dictionary_array;
25+
use arrow_schema::DataType;
26+
use datafusion_common::Result;
27+
28+
pub(super) fn build_struct_fields(data_types: &[DataType]) -> Result<Fields> {
29+
data_types
30+
.iter()
31+
.enumerate()
32+
.map(|(i, dt)| Ok(Field::new(format!("c{i}"), dt.clone(), true)))
33+
.collect()
34+
}
35+
36+
/// Flattens dictionary-encoded arrays to their underlying value arrays.
37+
/// Non-dictionary arrays are returned as-is.
38+
fn flatten_dictionary_array(array: &ArrayRef) -> ArrayRef {
39+
downcast_dictionary_array! {
40+
array => {
41+
// Recursively flatten in case of nested dictionaries
42+
flatten_dictionary_array(array.values())
43+
}
44+
_ => Arc::clone(array)
45+
}
46+
}
47+
48+
/// Builds InList values from join key column arrays.
49+
///
50+
/// If `join_key_arrays` is:
51+
/// 1. A single array, let's say Int32, this will produce a flat
52+
/// InList expression where the lookup is expected to be scalar Int32 values,
53+
/// that is: this will produce `IN LIST (1, 2, 3)` expected to be used as `2 IN LIST (1, 2, 3)`.
54+
/// 2. An Int32 array and a Utf8 array, this will produce a Struct InList expression
55+
/// where the lookup is expected to be Struct values with two fields (Int32, Utf8),
56+
/// that is: this will produce `IN LIST ((1, "a"), (2, "b"))` expected to be used as `(2, "b") IN LIST ((1, "a"), (2, "b"))`.
57+
/// The field names of the struct are auto-generated as "c0", "c1", ... and should match the struct expression used in the join keys.
58+
///
59+
/// Note that this function does not deduplicate values - deduplication will happen later
60+
/// when building an InList expression from this array via `InListExpr::try_new_from_array`.
61+
///
62+
/// Returns `None` if the estimated size exceeds `max_size_bytes` or if the number of rows
63+
/// exceeds `max_distinct_values`.
64+
pub(super) fn build_struct_inlist_values(
65+
join_key_arrays: &[ArrayRef],
66+
) -> Result<Option<ArrayRef>> {
67+
// Flatten any dictionary-encoded arrays
68+
let flattened_arrays: Vec<ArrayRef> = join_key_arrays
69+
.iter()
70+
.map(flatten_dictionary_array)
71+
.collect();
72+
73+
// Build the source array/struct
74+
let source_array: ArrayRef = if flattened_arrays.len() == 1 {
75+
// Single column: use directly
76+
Arc::clone(&flattened_arrays[0])
77+
} else {
78+
// Multi-column: build StructArray once from all columns
79+
let fields = build_struct_fields(
80+
&flattened_arrays
81+
.iter()
82+
.map(|arr| arr.data_type().clone())
83+
.collect::<Vec<_>>(),
84+
)?;
85+
86+
// Build field references with proper Arc wrapping
87+
let arrays_with_fields: Vec<(FieldRef, ArrayRef)> = fields
88+
.iter()
89+
.cloned()
90+
.zip(flattened_arrays.iter().cloned())
91+
.collect();
92+
93+
Arc::new(StructArray::from(arrays_with_fields))
94+
};
95+
96+
Ok(Some(source_array))
97+
}
98+
99+
#[cfg(test)]
100+
mod tests {
101+
use super::*;
102+
use arrow::array::{Int32Array, StringArray};
103+
use arrow_schema::DataType;
104+
use std::sync::Arc;
105+
106+
#[test]
107+
fn test_build_single_column_inlist_array() {
108+
let array = Arc::new(Int32Array::from(vec![1, 2, 3, 2, 1])) as ArrayRef;
109+
let result = build_struct_inlist_values(std::slice::from_ref(&array))
110+
.unwrap()
111+
.unwrap();
112+
113+
assert!(array.eq(&result));
114+
}
115+
116+
#[test]
117+
fn test_build_multi_column_inlist() {
118+
let array1 = Arc::new(Int32Array::from(vec![1, 2, 3, 2, 1])) as ArrayRef;
119+
let array2 =
120+
Arc::new(StringArray::from(vec!["a", "b", "c", "b", "a"])) as ArrayRef;
121+
122+
let result = build_struct_inlist_values(&[array1, array2])
123+
.unwrap()
124+
.unwrap();
125+
126+
assert_eq!(
127+
*result.data_type(),
128+
DataType::Struct(
129+
build_struct_fields(&[DataType::Int32, DataType::Utf8]).unwrap()
130+
)
131+
);
132+
}
133+
}

datafusion/physical-plan/src/joins/hash_join/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
pub use exec::HashJoinExec;
2121

2222
mod exec;
23+
mod inlist_builder;
2324
mod partitioned_hash_eval;
2425
mod shared_bounds;
2526
mod stream;

0 commit comments

Comments
 (0)