Skip to content

Commit 8d4851f

Browse files
feat(reader): Add PartitionSpec support to FileScanTask and RecordBatchTransformer (#1821)
## Which issue does this PR close? Partially address #1749. ## What changes are included in this PR? This PR adds partition spec handling to `FileScanTask` and `RecordBatchTransformer` to correctly implement the Iceberg spec's "Column Projection" rules for fields "not present" in data files. ### Problem Statement Prior to this PR, `iceberg-rust`'s `FileScanTask` had no mechanism to pass partition information to `RecordBatchTransformer`, causing two issues: 1. **Incorrect handling of bucket partitioning**: Couldn't distinguish identity transforms (which should use partition metadata constants) from non-identity transforms like bucket/truncate/year/month (which must read from data file). For example, `bucket(4, id)` stores `id_bucket = 2` (bucket number) in partition metadata, but actual `id` values (100, 200, 300) are only in the data file. iceberg-rust was incorrectly treating bucket-partitioned source columns as constants, breaking runtime filtering and returning incorrect query results. 2. **Field ID conflicts in add_files scenarios**: When importing Hive tables via `add_files`, partition columns could have field IDs conflicting with Parquet data columns. Example: Parquet has field_id=1→"name", but Iceberg expects field_id=1→"id" (partition). Per spec, the correct field is "not present" and requires name mapping fallback. ### Iceberg Specification Requirements Per the Iceberg spec (https://iceberg.apache.org/spec/#column-projection), when a field ID is "not present" in a data file, it must be resolved using these rules: 1. Return the value from partition metadata if an **Identity Transform** exists 2. Use `schema.name-mapping.default` metadata to map field id to columns without field id 3. Return the default value if it has a defined `initial-default` 4. Return null in all other cases **Why this matters:** - **Identity transforms** (e.g., `identity(dept)`) store actual column values in partition metadata that can be used as constants without reading the data file - **Non-identity transforms** (e.g., `bucket(4, id)`, `day(timestamp)`) store transformed values in partition metadata (e.g., bucket number 2, not the actual `id` values 100, 200, 300) and must read source columns from the data file ### Changes Made 1. **Added partition fields to `FileScanTask`** (`scan/task.rs`): - `partition: Option<Struct>` - Partition data from manifest entry - `partition_spec: Option<Arc<PartitionSpec>>` - For transform-aware constant detection - `name_mapping: Option<Arc<NameMapping>>` - Name mapping from table metadata 2. **Implemented `constants_map()` function** (`arrow/record_batch_transformer.rs`): - Replicates Java's `PartitionUtil.constantsMap()` behavior - Only includes fields where transform is `Transform::Identity` - Used to determine which fields use partition metadata constants vs. reading from data files 3. **Enhanced `RecordBatchTransformer`** (`arrow/record_batch_transformer.rs`): - Added `build_with_partition_data()` method to accept partition spec, partition data, and name mapping - Implements all 4 spec rules for column resolution with identity-transform awareness - Detects field ID conflicts by verifying both field ID AND name match - Falls back to name mapping when field IDs are missing/conflicting (spec rule #2) 4. **Updated `ArrowReader`** (`arrow/reader.rs`): - Uses `build_with_partition_data()` when partition information is available - Falls back to `build()` when not available 5. **Updated manifest entry processing** (`scan/context.rs`): - Populates partition fields in `FileScanTask` from manifest entry data ### Tests Added 1. **`bucket_partitioning_reads_source_column_from_file`** - Verifies that bucket-partitioned source columns are read from data files (not treated as constants from partition metadata) 2. **`identity_partition_uses_constant_from_metadata`** - Verifies that identity-transformed fields correctly use partition metadata constants 3. **`test_bucket_partitioning_with_renamed_source_column`** - Verifies field-ID-based mapping works despite column rename 4. **`add_files_partition_columns_without_field_ids`** - Verifies name mapping resolution for Hive table imports without field IDs (spec rule #2) 5. **`add_files_with_true_field_id_conflict`** - Verifies correct field ID conflict detection with name mapping fallback (spec rule #2) 6. **`test_all_four_spec_rules`** - Integration test verifying all 4 spec rules work together ## Are these changes tested? Yes, there are 6 new unit tests covering all 4 Iceberg spec rules. This also resolved approximately 50 Iceberg Java tests when running with DataFusion Comet's experimental apache/datafusion-comet#2528 PR. --------- Co-authored-by: Renjie Liu <liurenjie2008@gmail.com>
1 parent d1927db commit 8d4851f

File tree

8 files changed

+1222
-91
lines changed

8 files changed

+1222
-91
lines changed

crates/iceberg/src/arrow/caching_delete_file_loader.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -881,6 +881,9 @@ mod tests {
881881
project_field_ids: vec![2, 3],
882882
predicate: None,
883883
deletes: vec![pos_del, eq_del],
884+
partition: None,
885+
partition_spec: None,
886+
name_mapping: None,
884887
};
885888

886889
// Load the deletes - should handle both types without error

crates/iceberg/src/arrow/delete_file_loader.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use std::sync::Arc;
2020
use futures::{StreamExt, TryStreamExt};
2121

2222
use crate::arrow::ArrowReader;
23-
use crate::arrow::record_batch_transformer::RecordBatchTransformer;
23+
use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
2424
use crate::io::FileIO;
2525
use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
2626
use crate::spec::{Schema, SchemaRef};
@@ -82,7 +82,7 @@ impl BasicDeleteFileLoader {
8282
equality_ids: &[i32],
8383
) -> Result<ArrowRecordBatchStream> {
8484
let mut record_batch_transformer =
85-
RecordBatchTransformer::build(target_schema.clone(), equality_ids);
85+
RecordBatchTransformerBuilder::new(target_schema.clone(), equality_ids).build();
8686

8787
let record_batch_stream = record_batch_stream.map(move |record_batch| {
8888
record_batch.and_then(|record_batch| {

crates/iceberg/src/arrow/delete_filter.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,9 @@ pub(crate) mod tests {
341341
project_field_ids: vec![],
342342
predicate: None,
343343
deletes: vec![pos_del_1, pos_del_2.clone()],
344+
partition: None,
345+
partition_spec: None,
346+
name_mapping: None,
344347
},
345348
FileScanTask {
346349
start: 0,
@@ -352,6 +355,9 @@ pub(crate) mod tests {
352355
project_field_ids: vec![],
353356
predicate: None,
354357
deletes: vec![pos_del_3],
358+
partition: None,
359+
partition_spec: None,
360+
name_mapping: None,
355361
},
356362
];
357363

0 commit comments

Comments
 (0)