Skip to content

Commit d173b74

Browse files
committed
feat: concurrent data file fetches, parallel RecordBatch processing
1 parent 80c1399 commit d173b74

File tree

3 files changed

+123
-56
lines changed

3 files changed

+123
-56
lines changed

crates/iceberg/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ arrow-schema = { workspace = true }
5050
arrow-select = { workspace = true }
5151
arrow-string = { workspace = true }
5252
async-std = { workspace = true, optional = true, features = ["attributes"] }
53-
async-stream = { workspace = true }
5453
async-trait = { workspace = true }
5554
bimap = { workspace = true }
5655
bitvec = { workspace = true }

crates/iceberg/src/arrow/reader.rs

Lines changed: 101 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,11 @@ use arrow_array::{ArrayRef, BooleanArray, RecordBatch};
2727
use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
2828
use arrow_schema::{ArrowError, DataType, SchemaRef as ArrowSchemaRef};
2929
use arrow_string::like::starts_with;
30-
use async_stream::try_stream;
3130
use bytes::Bytes;
3231
use fnv::FnvHashSet;
32+
use futures::channel::mpsc::{channel, Sender};
3333
use futures::future::BoxFuture;
34-
use futures::stream::StreamExt;
35-
use futures::{try_join, TryFutureExt};
34+
use futures::{try_join, SinkExt, StreamExt, TryFutureExt, TryStreamExt};
3635
use parquet::arrow::arrow_reader::{ArrowPredicateFn, RowFilter};
3736
use parquet::arrow::async_reader::{AsyncFileReader, MetadataLoader};
3837
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY};
@@ -44,25 +43,39 @@ use crate::error::Result;
4443
use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor};
4544
use crate::expr::{BoundPredicate, BoundReference};
4645
use crate::io::{FileIO, FileMetadata, FileRead};
47-
use crate::scan::{ArrowRecordBatchStream, FileScanTaskStream};
46+
use crate::runtime::spawn;
47+
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
4848
use crate::spec::{Datum, Schema};
4949
use crate::{Error, ErrorKind};
5050

5151
/// Builder to create ArrowReader
5252
pub struct ArrowReaderBuilder {
5353
batch_size: Option<usize>,
5454
file_io: FileIO,
55+
concurrency_limit_data_files: usize,
5556
}
5657

5758
impl ArrowReaderBuilder {
5859
/// Create a new ArrowReaderBuilder
5960
pub(crate) fn new(file_io: FileIO) -> Self {
61+
let num_cpus = std::thread::available_parallelism()
62+
.expect("failed to get number of CPUs")
63+
.get();
64+
6065
ArrowReaderBuilder {
6166
batch_size: None,
6267
file_io,
68+
concurrency_limit_data_files: num_cpus,
6369
}
6470
}
6571

72+
/// Sets the max number of in flight data files that are being fetched
73+
pub fn with_data_file_concurrency_limit(mut self, val: usize) -> Self {
74+
self.concurrency_limit_data_files = val;
75+
76+
self
77+
}
78+
6679
/// Sets the desired size of batches in the response
6780
/// to something other than the default
6881
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
@@ -75,6 +88,7 @@ impl ArrowReaderBuilder {
7588
ArrowReader {
7689
batch_size: self.batch_size,
7790
file_io: self.file_io,
91+
concurrency_limit_data_files: self.concurrency_limit_data_files,
7892
}
7993
}
8094
}
@@ -84,73 +98,113 @@ impl ArrowReaderBuilder {
8498
pub struct ArrowReader {
8599
batch_size: Option<usize>,
86100
file_io: FileIO,
101+
102+
/// the maximum number of data files that can be fetched at the same time
103+
concurrency_limit_data_files: usize,
87104
}
88105

89106
impl ArrowReader {
90107
/// Take a stream of FileScanTasks and reads all the files.
91108
/// Returns a stream of Arrow RecordBatches containing the data from the files
92-
pub fn read(self, mut tasks: FileScanTaskStream) -> crate::Result<ArrowRecordBatchStream> {
109+
pub fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> {
93110
let file_io = self.file_io.clone();
94-
95-
Ok(try_stream! {
96-
while let Some(task_result) = tasks.next().await {
97-
match task_result {
98-
Ok(task) => {
99-
// Collect Parquet column indices from field ids
100-
let mut collector = CollectFieldIdVisitor {
101-
field_ids: HashSet::default(),
102-
};
103-
if let Some(predicates) = task.predicate() {
104-
visit(&mut collector, predicates)?;
111+
let batch_size = self.batch_size;
112+
let max_concurrent_fetching_datafiles = self.concurrency_limit_data_files;
113+
114+
let (tx, rx) = channel(10);
115+
let mut channel_for_error = tx.clone();
116+
117+
spawn(async move {
118+
let result = tasks
119+
.map(|task| Ok((task, file_io.clone(), tx.clone())))
120+
.try_for_each_concurrent(
121+
max_concurrent_fetching_datafiles,
122+
|(file_scan_task, file_io, tx)| async move {
123+
match file_scan_task {
124+
Ok(task) => {
125+
let file_path = task.data_file_path().to_string();
126+
127+
spawn(async move {
128+
Self::process_file_scan_task(task, batch_size, file_io, tx)
129+
.await
130+
})
131+
.await
132+
.map_err(|e| e.with_context("file_path", file_path))
133+
}
134+
Err(err) => Err(err),
105135
}
136+
},
137+
)
138+
.await;
139+
140+
if let Err(error) = result {
141+
let _ = channel_for_error.send(Err(error)).await;
142+
}
143+
});
144+
145+
return Ok(rx.boxed());
146+
}
147+
148+
async fn process_file_scan_task(
149+
task: FileScanTask,
150+
batch_size: Option<usize>,
151+
file_io: FileIO,
152+
mut tx: Sender<Result<RecordBatch>>,
153+
) -> Result<()> {
154+
// Collect Parquet column indices from field ids
155+
let mut collector = CollectFieldIdVisitor {
156+
field_ids: HashSet::default(),
157+
};
106158

107-
let parquet_file = file_io
108-
.new_input(task.data_file_path())?;
159+
if let Some(predicates) = task.predicate() {
160+
visit(&mut collector, predicates)?;
161+
}
109162

110-
let (parquet_metadata, parquet_reader) = try_join!(parquet_file.metadata(), parquet_file.reader())?;
111-
let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);
163+
let parquet_file = file_io.new_input(task.data_file_path())?;
112164

113-
let mut batch_stream_builder = ParquetRecordBatchStreamBuilder::new(arrow_file_reader)
114-
.await?;
165+
let (parquet_metadata, parquet_reader) =
166+
try_join!(parquet_file.metadata(), parquet_file.reader())?;
167+
let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);
115168

116-
let parquet_schema = batch_stream_builder.parquet_schema();
117-
let arrow_schema = batch_stream_builder.schema();
118-
let projection_mask = self.get_arrow_projection_mask(task.project_field_ids(),task.schema(),parquet_schema, arrow_schema)?;
119-
batch_stream_builder = batch_stream_builder.with_projection(projection_mask);
169+
let mut batch_stream_builder =
170+
ParquetRecordBatchStreamBuilder::new(arrow_file_reader).await?;
120171

121-
let parquet_schema = batch_stream_builder.parquet_schema();
122-
let row_filter = self.get_row_filter(task.predicate(),parquet_schema, &collector)?;
172+
let parquet_schema = batch_stream_builder.parquet_schema();
173+
let arrow_schema = batch_stream_builder.schema();
174+
let projection_mask = Self::get_arrow_projection_mask(
175+
task.project_field_ids(),
176+
task.schema(),
177+
parquet_schema,
178+
arrow_schema,
179+
)?;
180+
batch_stream_builder = batch_stream_builder.with_projection(projection_mask);
123181

124-
if let Some(row_filter) = row_filter {
125-
batch_stream_builder = batch_stream_builder.with_row_filter(row_filter);
126-
}
182+
let parquet_schema = batch_stream_builder.parquet_schema();
183+
let row_filter = Self::get_row_filter(task.predicate(), parquet_schema, &collector)?;
127184

128-
if let Some(batch_size) = self.batch_size {
129-
batch_stream_builder = batch_stream_builder.with_batch_size(batch_size);
130-
}
185+
if let Some(row_filter) = row_filter {
186+
batch_stream_builder = batch_stream_builder.with_row_filter(row_filter);
187+
}
131188

132-
let mut batch_stream = batch_stream_builder.build()?;
189+
if let Some(batch_size) = batch_size {
190+
batch_stream_builder = batch_stream_builder.with_batch_size(batch_size);
191+
}
133192

134-
while let Some(batch) = batch_stream.next().await {
135-
yield batch?;
136-
}
137-
}
138-
Err(e) => {
139-
Err(e)?
140-
}
141-
}
142-
}
193+
let mut batch_stream = batch_stream_builder.build()?;
194+
195+
while let Some(batch) = batch_stream.try_next().await? {
196+
tx.send(Ok(batch)).await?
143197
}
144-
.boxed())
198+
199+
Ok(())
145200
}
146201

147202
fn get_arrow_projection_mask(
148-
&self,
149203
field_ids: &[i32],
150204
iceberg_schema_of_task: &Schema,
151205
parquet_schema: &SchemaDescriptor,
152206
arrow_schema: &ArrowSchemaRef,
153-
) -> crate::Result<ProjectionMask> {
207+
) -> Result<ProjectionMask> {
154208
if field_ids.is_empty() {
155209
Ok(ProjectionMask::all())
156210
} else {
@@ -216,7 +270,6 @@ impl ArrowReader {
216270
}
217271

218272
fn get_row_filter(
219-
&self,
220273
predicates: Option<&BoundPredicate>,
221274
parquet_schema: &SchemaDescriptor,
222275
collector: &CollectFieldIdVisitor,

crates/iceberg/src/scan.rs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,9 @@ pub struct TableScanBuilder<'a> {
5555
batch_size: Option<usize>,
5656
case_sensitive: bool,
5757
filter: Option<Predicate>,
58-
concurrency_limit_manifest_files: usize,
58+
concurrency_limit_data_files: usize,
5959
concurrency_limit_manifest_entries: usize,
60+
concurrency_limit_manifest_files: usize,
6061
}
6162

6263
impl<'a> TableScanBuilder<'a> {
@@ -72,8 +73,9 @@ impl<'a> TableScanBuilder<'a> {
7273
batch_size: None,
7374
case_sensitive: true,
7475
filter: None,
75-
concurrency_limit_manifest_files: num_cpus,
76+
concurrency_limit_data_files: num_cpus,
7677
concurrency_limit_manifest_entries: num_cpus,
78+
concurrency_limit_manifest_files: num_cpus,
7779
}
7880
}
7981

@@ -124,12 +126,13 @@ impl<'a> TableScanBuilder<'a> {
124126
pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
125127
self.concurrency_limit_manifest_files = limit;
126128
self.concurrency_limit_manifest_entries = limit;
129+
self.concurrency_limit_data_files = limit;
127130
self
128131
}
129132

130-
/// Sets the manifest file concurrency limit for this scan
131-
pub fn with_manifest_file_concurrency_limit(mut self, limit: usize) -> Self {
132-
self.concurrency_limit_manifest_files = limit;
133+
/// Sets the data file concurrency limit for this scan
134+
pub fn with_data_file_concurrency_limit(mut self, limit: usize) -> Self {
135+
self.concurrency_limit_data_files = limit;
133136
self
134137
}
135138

@@ -139,6 +142,12 @@ impl<'a> TableScanBuilder<'a> {
139142
self
140143
}
141144

145+
/// Sets the manifest file concurrency limit for this scan
146+
pub fn with_manifest_file_concurrency_limit(mut self, limit: usize) -> Self {
147+
self.concurrency_limit_manifest_files = limit;
148+
self
149+
}
150+
142151
/// Build the table scan.
143152
pub fn build(self) -> Result<TableScan> {
144153
let snapshot = match self.snapshot_id {
@@ -244,10 +253,11 @@ impl<'a> TableScanBuilder<'a> {
244253
Ok(TableScan {
245254
batch_size: self.batch_size,
246255
column_names: self.column_names,
247-
concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
248256
file_io: self.table.file_io().clone(),
249257
plan_context,
258+
concurrency_limit_data_files: self.concurrency_limit_data_files,
250259
concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
260+
concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
251261
})
252262
}
253263
}
@@ -266,6 +276,10 @@ pub struct TableScan {
266276
/// The maximum number of [`ManifestEntry`]s that will
267277
/// be processed in parallel
268278
concurrency_limit_manifest_entries: usize,
279+
280+
/// The maximum number of [`ManifestEntry`]s that will
281+
/// be processed in parallel
282+
concurrency_limit_data_files: usize,
269283
}
270284

271285
/// PlanContext wraps a [`SnapshotRef`] alongside all the other
@@ -338,7 +352,8 @@ impl TableScan {
338352

339353
/// Returns an [`ArrowRecordBatchStream`].
340354
pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
341-
let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone());
355+
let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone())
356+
.with_data_file_concurrency_limit(self.concurrency_limit_data_files);
342357

343358
if let Some(batch_size) = self.batch_size {
344359
arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);

0 commit comments

Comments
 (0)