Skip to content

Commit 2758961

Browse files
authored
Concurrent data file fetching and parallel RecordBatch processing (#515)
* feat: concurrent data file fetches, parallel RecordBatch processing * refactor: centralize infallible `available_parallelism` fn. Use better channel size limit in arrow read
1 parent a3f9aec commit 2758961

File tree

5 files changed

+167
-59
lines changed

5 files changed

+167
-59
lines changed

crates/iceberg/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ arrow-schema = { workspace = true }
5151
arrow-select = { workspace = true }
5252
arrow-string = { workspace = true }
5353
async-std = { workspace = true, optional = true, features = ["attributes"] }
54-
async-stream = { workspace = true }
5554
async-trait = { workspace = true }
5655
bimap = { workspace = true }
5756
bitvec = { workspace = true }

crates/iceberg/src/arrow/reader.rs

Lines changed: 100 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,11 @@ use arrow_array::{ArrayRef, BooleanArray, RecordBatch};
2727
use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
2828
use arrow_schema::{ArrowError, DataType, SchemaRef as ArrowSchemaRef};
2929
use arrow_string::like::starts_with;
30-
use async_stream::try_stream;
3130
use bytes::Bytes;
3231
use fnv::FnvHashSet;
32+
use futures::channel::mpsc::{channel, Sender};
3333
use futures::future::BoxFuture;
34-
use futures::stream::StreamExt;
35-
use futures::{try_join, TryFutureExt};
34+
use futures::{try_join, SinkExt, StreamExt, TryFutureExt, TryStreamExt};
3635
use parquet::arrow::arrow_reader::{ArrowPredicateFn, RowFilter};
3736
use parquet::arrow::async_reader::{AsyncFileReader, MetadataLoader};
3837
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY};
@@ -44,25 +43,38 @@ use crate::error::Result;
4443
use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor};
4544
use crate::expr::{BoundPredicate, BoundReference};
4645
use crate::io::{FileIO, FileMetadata, FileRead};
47-
use crate::scan::{ArrowRecordBatchStream, FileScanTaskStream};
46+
use crate::runtime::spawn;
47+
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
4848
use crate::spec::{Datum, Schema};
49+
use crate::utils::available_parallelism;
4950
use crate::{Error, ErrorKind};
5051

5152
/// Builder to create ArrowReader
5253
pub struct ArrowReaderBuilder {
5354
batch_size: Option<usize>,
5455
file_io: FileIO,
56+
concurrency_limit_data_files: usize,
5557
}
5658

5759
impl ArrowReaderBuilder {
5860
/// Create a new ArrowReaderBuilder
5961
pub(crate) fn new(file_io: FileIO) -> Self {
62+
let num_cpus = available_parallelism().get();
63+
6064
ArrowReaderBuilder {
6165
batch_size: None,
6266
file_io,
67+
concurrency_limit_data_files: num_cpus,
6368
}
6469
}
6570

71+
/// Sets the max number of in flight data files that are being fetched
72+
pub fn with_data_file_concurrency_limit(mut self, val: usize) -> Self {
73+
self.concurrency_limit_data_files = val;
74+
75+
self
76+
}
77+
6678
/// Sets the desired size of batches in the response
6779
/// to something other than the default
6880
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
@@ -75,6 +87,7 @@ impl ArrowReaderBuilder {
7587
ArrowReader {
7688
batch_size: self.batch_size,
7789
file_io: self.file_io,
90+
concurrency_limit_data_files: self.concurrency_limit_data_files,
7891
}
7992
}
8093
}
@@ -84,73 +97,113 @@ impl ArrowReaderBuilder {
8497
pub struct ArrowReader {
8598
batch_size: Option<usize>,
8699
file_io: FileIO,
100+
101+
/// the maximum number of data files that can be fetched at the same time
102+
concurrency_limit_data_files: usize,
87103
}
88104

89105
impl ArrowReader {
90106
/// Take a stream of FileScanTasks and reads all the files.
91107
/// Returns a stream of Arrow RecordBatches containing the data from the files
92-
pub fn read(self, mut tasks: FileScanTaskStream) -> crate::Result<ArrowRecordBatchStream> {
108+
pub fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> {
93109
let file_io = self.file_io.clone();
94-
95-
Ok(try_stream! {
96-
while let Some(task_result) = tasks.next().await {
97-
match task_result {
98-
Ok(task) => {
99-
// Collect Parquet column indices from field ids
100-
let mut collector = CollectFieldIdVisitor {
101-
field_ids: HashSet::default(),
102-
};
103-
if let Some(predicates) = task.predicate() {
104-
visit(&mut collector, predicates)?;
110+
let batch_size = self.batch_size;
111+
let concurrency_limit_data_files = self.concurrency_limit_data_files;
112+
113+
let (tx, rx) = channel(concurrency_limit_data_files);
114+
let mut channel_for_error = tx.clone();
115+
116+
spawn(async move {
117+
let result = tasks
118+
.map(|task| Ok((task, file_io.clone(), tx.clone())))
119+
.try_for_each_concurrent(
120+
concurrency_limit_data_files,
121+
|(file_scan_task, file_io, tx)| async move {
122+
match file_scan_task {
123+
Ok(task) => {
124+
let file_path = task.data_file_path().to_string();
125+
126+
spawn(async move {
127+
Self::process_file_scan_task(task, batch_size, file_io, tx)
128+
.await
129+
})
130+
.await
131+
.map_err(|e| e.with_context("file_path", file_path))
132+
}
133+
Err(err) => Err(err),
105134
}
135+
},
136+
)
137+
.await;
138+
139+
if let Err(error) = result {
140+
let _ = channel_for_error.send(Err(error)).await;
141+
}
142+
});
143+
144+
return Ok(rx.boxed());
145+
}
146+
147+
async fn process_file_scan_task(
148+
task: FileScanTask,
149+
batch_size: Option<usize>,
150+
file_io: FileIO,
151+
mut tx: Sender<Result<RecordBatch>>,
152+
) -> Result<()> {
153+
// Collect Parquet column indices from field ids
154+
let mut collector = CollectFieldIdVisitor {
155+
field_ids: HashSet::default(),
156+
};
106157

107-
let parquet_file = file_io
108-
.new_input(task.data_file_path())?;
158+
if let Some(predicates) = task.predicate() {
159+
visit(&mut collector, predicates)?;
160+
}
109161

110-
let (parquet_metadata, parquet_reader) = try_join!(parquet_file.metadata(), parquet_file.reader())?;
111-
let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);
162+
let parquet_file = file_io.new_input(task.data_file_path())?;
112163

113-
let mut batch_stream_builder = ParquetRecordBatchStreamBuilder::new(arrow_file_reader)
114-
.await?;
164+
let (parquet_metadata, parquet_reader) =
165+
try_join!(parquet_file.metadata(), parquet_file.reader())?;
166+
let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);
115167

116-
let parquet_schema = batch_stream_builder.parquet_schema();
117-
let arrow_schema = batch_stream_builder.schema();
118-
let projection_mask = self.get_arrow_projection_mask(task.project_field_ids(),task.schema(),parquet_schema, arrow_schema)?;
119-
batch_stream_builder = batch_stream_builder.with_projection(projection_mask);
168+
let mut batch_stream_builder =
169+
ParquetRecordBatchStreamBuilder::new(arrow_file_reader).await?;
120170

121-
let parquet_schema = batch_stream_builder.parquet_schema();
122-
let row_filter = self.get_row_filter(task.predicate(),parquet_schema, &collector)?;
171+
let parquet_schema = batch_stream_builder.parquet_schema();
172+
let arrow_schema = batch_stream_builder.schema();
173+
let projection_mask = Self::get_arrow_projection_mask(
174+
task.project_field_ids(),
175+
task.schema(),
176+
parquet_schema,
177+
arrow_schema,
178+
)?;
179+
batch_stream_builder = batch_stream_builder.with_projection(projection_mask);
123180

124-
if let Some(row_filter) = row_filter {
125-
batch_stream_builder = batch_stream_builder.with_row_filter(row_filter);
126-
}
181+
let parquet_schema = batch_stream_builder.parquet_schema();
182+
let row_filter = Self::get_row_filter(task.predicate(), parquet_schema, &collector)?;
127183

128-
if let Some(batch_size) = self.batch_size {
129-
batch_stream_builder = batch_stream_builder.with_batch_size(batch_size);
130-
}
184+
if let Some(row_filter) = row_filter {
185+
batch_stream_builder = batch_stream_builder.with_row_filter(row_filter);
186+
}
131187

132-
let mut batch_stream = batch_stream_builder.build()?;
188+
if let Some(batch_size) = batch_size {
189+
batch_stream_builder = batch_stream_builder.with_batch_size(batch_size);
190+
}
133191

134-
while let Some(batch) = batch_stream.next().await {
135-
yield batch?;
136-
}
137-
}
138-
Err(e) => {
139-
Err(e)?
140-
}
141-
}
142-
}
192+
let mut batch_stream = batch_stream_builder.build()?;
193+
194+
while let Some(batch) = batch_stream.try_next().await? {
195+
tx.send(Ok(batch)).await?
143196
}
144-
.boxed())
197+
198+
Ok(())
145199
}
146200

147201
fn get_arrow_projection_mask(
148-
&self,
149202
field_ids: &[i32],
150203
iceberg_schema_of_task: &Schema,
151204
parquet_schema: &SchemaDescriptor,
152205
arrow_schema: &ArrowSchemaRef,
153-
) -> crate::Result<ProjectionMask> {
206+
) -> Result<ProjectionMask> {
154207
if field_ids.is_empty() {
155208
Ok(ProjectionMask::all())
156209
} else {
@@ -216,7 +269,6 @@ impl ArrowReader {
216269
}
217270

218271
fn get_row_filter(
219-
&self,
220272
predicates: Option<&BoundPredicate>,
221273
parquet_schema: &SchemaDescriptor,
222274
collector: &CollectFieldIdVisitor,

crates/iceberg/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,4 +81,5 @@ pub mod transform;
8181
mod runtime;
8282

8383
pub mod arrow;
84+
mod utils;
8485
pub mod writer;

crates/iceberg/src/scan.rs

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use crate::spec::{
3939
SchemaRef, SnapshotRef, TableMetadataRef,
4040
};
4141
use crate::table::Table;
42+
use crate::utils::available_parallelism;
4243
use crate::{Error, ErrorKind, Result};
4344

4445
/// A stream of [`FileScanTask`].
@@ -55,15 +56,14 @@ pub struct TableScanBuilder<'a> {
5556
batch_size: Option<usize>,
5657
case_sensitive: bool,
5758
filter: Option<Predicate>,
58-
concurrency_limit_manifest_files: usize,
59+
concurrency_limit_data_files: usize,
5960
concurrency_limit_manifest_entries: usize,
61+
concurrency_limit_manifest_files: usize,
6062
}
6163

6264
impl<'a> TableScanBuilder<'a> {
6365
pub(crate) fn new(table: &'a Table) -> Self {
64-
let num_cpus = std::thread::available_parallelism()
65-
.expect("failed to get number of CPUs")
66-
.get();
66+
let num_cpus = available_parallelism().get();
6767

6868
Self {
6969
table,
@@ -72,8 +72,9 @@ impl<'a> TableScanBuilder<'a> {
7272
batch_size: None,
7373
case_sensitive: true,
7474
filter: None,
75-
concurrency_limit_manifest_files: num_cpus,
75+
concurrency_limit_data_files: num_cpus,
7676
concurrency_limit_manifest_entries: num_cpus,
77+
concurrency_limit_manifest_files: num_cpus,
7778
}
7879
}
7980

@@ -124,12 +125,13 @@ impl<'a> TableScanBuilder<'a> {
124125
pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
125126
self.concurrency_limit_manifest_files = limit;
126127
self.concurrency_limit_manifest_entries = limit;
128+
self.concurrency_limit_data_files = limit;
127129
self
128130
}
129131

130-
/// Sets the manifest file concurrency limit for this scan
131-
pub fn with_manifest_file_concurrency_limit(mut self, limit: usize) -> Self {
132-
self.concurrency_limit_manifest_files = limit;
132+
/// Sets the data file concurrency limit for this scan
133+
pub fn with_data_file_concurrency_limit(mut self, limit: usize) -> Self {
134+
self.concurrency_limit_data_files = limit;
133135
self
134136
}
135137

@@ -139,6 +141,12 @@ impl<'a> TableScanBuilder<'a> {
139141
self
140142
}
141143

144+
/// Sets the manifest file concurrency limit for this scan
145+
pub fn with_manifest_file_concurrency_limit(mut self, limit: usize) -> Self {
146+
self.concurrency_limit_manifest_files = limit;
147+
self
148+
}
149+
142150
/// Build the table scan.
143151
pub fn build(self) -> Result<TableScan> {
144152
let snapshot = match self.snapshot_id {
@@ -244,10 +252,11 @@ impl<'a> TableScanBuilder<'a> {
244252
Ok(TableScan {
245253
batch_size: self.batch_size,
246254
column_names: self.column_names,
247-
concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
248255
file_io: self.table.file_io().clone(),
249256
plan_context,
257+
concurrency_limit_data_files: self.concurrency_limit_data_files,
250258
concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
259+
concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
251260
})
252261
}
253262
}
@@ -266,6 +275,10 @@ pub struct TableScan {
266275
/// The maximum number of [`ManifestEntry`]s that will
267276
/// be processed in parallel
268277
concurrency_limit_manifest_entries: usize,
278+
279+
/// The maximum number of [`ManifestEntry`]s that will
280+
/// be processed in parallel
281+
concurrency_limit_data_files: usize,
269282
}
270283

271284
/// PlanContext wraps a [`SnapshotRef`] alongside all the other
@@ -350,7 +363,8 @@ impl TableScan {
350363

351364
/// Returns an [`ArrowRecordBatchStream`].
352365
pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
353-
let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone());
366+
let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone())
367+
.with_data_file_concurrency_limit(self.concurrency_limit_data_files);
354368

355369
if let Some(batch_size) = self.batch_size {
356370
arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);

crates/iceberg/src/utils.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::num::NonZero;
19+
20+
// Use a default value of 1 as the safest option.
21+
// See https://doc.rust-lang.org/std/thread/fn.available_parallelism.html#limitations
22+
// for more details.
23+
const DEFAULT_PARALLELISM: usize = 1;
24+
25+
/// Uses [`std::thread::available_parallelism`] in order to
26+
/// retrieve an estimate of the default amount of parallelism
27+
/// that should be used. Note that [`std::thread::available_parallelism`]
28+
/// returns a `Result` as it can fail, so here we use
29+
/// a default value instead.
30+
/// Note: we don't use a OnceCell or LazyCell here as there
31+
/// are circumstances where the level of available
32+
/// parallelism can change during the lifetime of an executing
33+
/// process, but this should not be called in a hot loop.
34+
pub(crate) fn available_parallelism() -> NonZero<usize> {
35+
std::thread::available_parallelism().unwrap_or_else(|_err| {
36+
// Failed to get the level of parallelism.
37+
// TODO: log/trace when this fallback occurs.
38+
39+
// Using a default value.
40+
NonZero::new(DEFAULT_PARALLELISM).unwrap()
41+
})
42+
}

0 commit comments

Comments
 (0)